Ремарка
Текущая реализация представляет собой сырой прототип, направленный исключительно на демонстрацию возможности отображения логического плана из Apache Spark в Apache Atlas. Lfyysq прототип, по сути, является «прототипом прототипа» и служит лишь начальной отправной точкой для более глубокого анализа и разработки.
В данной работе Автор не стремимся представить окончательное или оптимальное решение. основной фокус заключается в демонстрации принципа и наметке необходимых методов для интеграции логических планов с метаданными в Apache Atlas.
Автор не призываем использовать данный подход в производственной среде в его текущем виде. Для полноценного решения задачи требуется дальнейшая проработка, включая создание специализированных библиотек, улучшение архитектуры. И все прочие прочие ...
Цель работы
Целью данной работы является создание прототипа, демонстрирующего возможность интеграции логических планов Apache Spark с метаданными в Apache Atlas , подобно тому как это происходит в данной статье с Apache NIFI .
Тестовая задача для иллюстрации и парсинг плана в AST
Определим небольшой файл cars.csv со следующим содержанием:
model,manufacturer
Model S,Tesla
Model 3,Tesla
Mustang,Ford
Civic,Honda
И напишем даг выведем его логический план:
val spark = SparkSession.builder()
.appName("Logical Plan Example")
.master("local")
.getOrCreate()
import spark.implicits._
val carsCSV = spark
.read
.option("header", "true")
.csv("src/main/resources/cars.csv")
val carsSeq = List(
("i8", "BMW"),
("A4", "Audi"),
("911", "Porsche"),
("Corolla", "Toyota")
).toDF("model", "manufacturer")
val unioncars = carsCSV.union(carsSeq)
val resDF = unioncars
.where(col("manufacturer") =!= "Audi")
.select("model", "manufacturer")
.withColumn("processedDDTM", lit(LocalDateTime.now()))
val logicalPlan = resDF.queryExecution.logical
println(logicalPlan)
/* вывод
Project [model#17, manufacturer#18, 2024-09-12 13:00:46.880141 AS processedDDTM#36]
+- Project [model#17, manufacturer#18]
+- Filter NOT (manufacturer#18 = Audi)
+- Union false, false
:- Relation [model#17,manufacturer#18] csv
+- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
+- LocalRelation [_1#23, _2#24]
*/
}
Логический план представляет собой дерево, и для дальнейшей работы его необходимо преобразовать в удобную форму (AST).
Для этого мы определим класс AST, который будет отражать структуру плана в формате, удобном для последующей обработки.
// Определение корневого класса или типа для всех узлов дерева
sealed trait Node {
// Метод для получения имени узла на основе его типа
def getName: String = this.getClass.toString
}
// Узел типа "Проект", содержащий последовательность столбцов
case class ProjectNode(columns: Seq[String]) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "Project"
}
// Узел типа "Фильтр", содержащий условие фильтрации
case class FilterNode(condition: String) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "Filter"
}
// Узел типа "Объединение", указывающий, следует ли объединять все записи и по какому признаку
case class UnionNode(isAll: Boolean, byName: Boolean) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "Union"
}
// Узел типа "Логическое отношение", содержащий последовательность столбцов
case class LogicalRelationNode(columns: Seq[String]) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "LogicalRelation"
}
case class LocalRelationNode(columns: Seq[String]) extends Node {
override def getName: String = "LocalRelation"
}
// Узел типа "Локальное отношение", содержащий последовательность столбцов
case class LocalRelationNode(columns: Seq[String]) extends Node {
// Переопределение метода getName для возврата конкретного имени узла
override def getName: String = "LocalRelation"
}
// Класс для представления абстрактного синтаксического дерева (AST), где каждый узел имеет тип Node,
// список дочерних узлов, номер уровня и выражение уровня (необходим для индонтефикации нод на одном уровне)
case class AST(node: Node,
children: Seq[AST],
level_num: Int,
levelExpr: String)
И напишем парсер из логического плана в AST
// Объект для парсинга логических планов в AST
object ParserAST {
// Функция для преобразования логического плана в AST
// Возвращает Option[AST], где None означает, что план не может быть преобразован
private def parseAST(plan: LogicalPlan): Option[AST] = {
// Рекурсивная функция для обхода логического плана и создания узлов AST
// Параметры:
// - logicalPlan: текущий логический план для обработки
// - levelnum: уровень в дереве AST
// - levelExpr: строковое представление уровня и индекса
// Возвращает Option[AST], где None означает, что логический план не может быть преобразован
def loop(logicalPlan: LogicalPlan, levelnum: Int, levelExpr: String): Option[AST] = {
// Определение узла на основе типа логического плана
val node: Option[Node] = logicalPlan match {
case p: Project =>
// Обработка узла типа Project и создание узла AST с именем "Project"
val columns = p.projectList.map(_.sql)
Some(ProjectNode(columns))
case f: Filter =>
// Обработка узла типа Filter и создание узла AST с именем "Filter"
val condition = f.condition.sql
Some(FilterNode(condition))
case u: Union =>
// Обработка узла типа Union и создание узла AST с именем "Union"
val isAll = u.allowMissingCol
val byName = u.byName
Some(UnionNode(isAll, byName))
case lr: LocalRelation =>
// Обработка узла типа LocalRelation и создание узла AST с именем "LocalRelation"
val columns = lr.output.map(_.sql)
Some(LocalRelationNode(columns))
case lr: LogicalRelation =>
// Обработка узла типа LogicalRelation и создание узла AST с именем "LogicalRelation"
val columns = lr.output.map(_.sql)
Some(LogicalRelationNode(columns))
case _ =>
// Если логический план не совпадает ни с одним из известных типов, возвращаем None
None
}
// Если узел успешно создан, создаем AST и рекурсивно обрабатываем детей
node.map { n =>
// Создание списка дочерних узлов AST, рекурсивно обрабатывая каждый дочерний план
val children = logicalPlan.children.zipWithIndex.flatMap {
case (ch, i) => loop(ch, levelnum + 1, f"${levelnum + 1}_${i}")
}.toList
// Создание узла AST с текущим узлом и его дочерними узлами
AST(n, children, levelnum, levelExpr)
}
}
// Запуск рекурсивного обхода с начальным уровнем и строковым представлением
loop(plan, 1, "1_0")
}
// Неявное преобразование для класса LogicalPlan, добавляющее метод для получения AST
implicit class parser(lp: LogicalPlan) {
def AST(): Option[AST] = {
parseAST(lp)
}
}
}
теперь можно получать AST следующим образом logicalPlan.AST().get
Определим сущности в Атласе для построения Lianage

Подобно тому, как в языках программирования на базе Java все классы наследуются от Object, в Apache Atlas все сущности наследуются от Referenceable. Однако построение lineage (линейности данных) происходит только для типов Process и DataSet. Если тип не наследуется от одного из этих классов (например, если наследование происходит от Asset), то кнопка "Lineage" попросту не появится.
Кроме того, сам lineage строится на основе полей inputs и outputs для Process, аналогично и для DataSet. Здесь ничего не поделаешь — придется наследоваться от этих типов, хотя большинство полей будет оставаться пустыми.
Изначально моей целью было отразить преобразования, происходящие в Apache Spark, но структура Apache Atlas вынуждает окружать мои Process сущностями DataSet в полях inputs и outputs. Хотя меня изначально интересовали только Process, эти DataSet-ы могут быть использованы для отображения схем данных, с которыми процесс начинается и которые возвращает. Однако на данном этапе я не планирую парсить схемы и оставлю каждый DataSet пустым.
В Apache Atlas кастомные сущности можно описывать с помощью формата JSON. При этом важно соблюдать правильную последовательность определения типов, иначе возникнет ошибка 404 при попытке сослаться на тип, который еще не существует в системе.
Сначала определим тип для DataSet.
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "pico_spark_data_type",
"description": "A type inheriting from assets for Pico DataSet",
"superTypes": ["DataSet"],
"attributeDefs": [],
"relationshipDefs": []
}
],
"relationshipDefs": [],
"businessMetadataDefs": []
}
Комментарии:
-
enumDefs,structDefs,classificationDefs:Пустые массивы, так как перечисления, структуры и классификации не используются.
-
entityDefs:Определяет сущности в системе.
name: Имя сущности, которая представляет тип данных.description: Описание сущности.superTypes: Суперклассы, от которых наследуется данная сущность.attributeDefs: Пустой массив, так как атрибуты не указаны.relationshipDefs: Пустой массив, так как связи не определены.
-
relationshipDefs,businessMetadataDefs:Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.
{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "pico_spark_process_type",
"description": "A type inheriting from assets for Pico Spark abstraction",
"superTypes": ["Process"],
"attributeDefs": [
{
"name": "inputs",
"description": "List of inputs for the process",
"typeName": "array<pico_spark_data_type>",
"isOptional": true
},
{
"name": "outputs",
"description": "List of outputs for the process",
"typeName": "array<pico_spark_data_type>",
"isOptional": true
}
],
"relationshipDefs": []
}
],
"relationshipDefs": [],
"businessMetadataDefs": []
}
Комментарии:
-
enumDefs,structDefs,classificationDefs:Пустые массивы, так как перечисления, структуры и классификации не используются в данном определении.
-
entityDefs:Содержит определения сущностей.
name: Имя сущности, определяющей тип данных в контексте Pico Spark.description: Описание сущности.superTypes: Суперклассы, от которых сущность наследуется.attributeDefs: Пустой массив, так как атрибуты не добавлены.relationshipDefs: Пустой массив, так как связи не указаны.
-
relationshipDefs,businessMetadataDefs:Пустые массивы, так как глобальные определения отношений и бизнес-метаданные не заданы.
Для типа pico_spark_process_type я также создаю наследников для всех типов узлов (Filter, Project, Union и т.д.) в AST. Однако здесь я опущу это, поскольку это займет слишком много места и будет слишком однообразно.
В этих JSON-ах много пустых сущностей, но без них не обойтись, так как без них типы в Apache Atlas не создаются.
Взаимодействие с Apache Atlas по REST
Простого описания сущностей недостаточно — их нужно передать в Apache Atlas. У Atlas есть обширное REST API для взаимодействия с системой. Конкретно процесс создания нового типа выглядит следующим образом:
curl -X POST "http://<atlas-server-url>/api/atlas/v2/types/typedefs" \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-d '{
"enumDefs": [],
"structDefs": [],
"classificationDefs": [],
"entityDefs": [
{
"name": "pico_spark_data_type",
"description": "A type inheriting from assets for Pico DataSet",
"superTypes": ["DataSet"],
"attributeDefs": [],
"relationshipDefs": []
}
],
"relationshipDefs": [],
"businessMetadataDefs": []
}'
создаю JSON файл где будут перечислены тела запросов для всх необходимых кастомных типов под названием EntityTypes.json
и создам метод который читает этот файл и делает запрос на каждый EntityType
val atlasServerUrl = "http://localhost:21000/api/atlas/v2"
val authHeader: String = "Basic " + java.util.Base64.getEncoder.encodeToString("admin:admin".getBytes)
def generatePicoSparkTypes(): Unit = {
// Функция для чтения содержимого файла из ресурсов
def readFileFromResources(fileName: String): String = {
val source = Source.fromResource(fileName)
try source.mkString
finally source.close()
}
// Чтение JSON из файла ресурсов
val jsonString = readFileFromResources("EntityTypes.json")
// Попытка разобрать строку JSON в структуру данных
val parsedJson: Either[ParsingFailure, Json] = parse(jsonString)
// Преобразование разобранного JSON в список объектов JSON
val jsonObjects: Option[List[Json]] = parsedJson match {
case Right(json) =>
json.as[List[Json]] match {
case Right(jsonArray) =>
Some(jsonArray)
case Left(error) =>
// Обработка ошибки разбора массива JSON
println(s"Error parsing JSON array: $error")
None
}
case Left(error) =>
// Обработка ошибки разбора JSON
println(s"Error parsing JSON: $error")
None
}
// Отправка каждого объекта JSON на сервер Atlas
jsonObjects match {
case Some(jsonArray) =>
jsonArray.foreach { jsonBody =>
// Создание POST-запроса для создания типа в Apache Atlas
val createTypeRequest = basicRequest
.method(Method.POST, uri"$atlasServerUrl/types/typedefs") // Метод POST и URL для запроса
.header("Authorization", authHeader) // Заголовок авторизации
.header("Content-Type", "application/json") // Заголовок типа содержимого
.header("Accept", "application/json") // Заголовок для принятия ответа в формате JSON
.body(jsonBody.noSpaces) // Тело запроса с JSON-данными
.response(asString) // Ожидание ответа в формате строки
// Отправка запроса и вывод результата
val response = createTypeRequest.send(backend)
println(response.body) // Печать тела ответа
println(response.code) // Печать кода ответа
}
case None =>
// Сообщение, если JSON-объекты не были найдены
println("No JSON objects found.")
}
}
комментарии:
readFileFromResources: Функция для чтения содержимого файла JSON из ресурсов.jsonString: Получение строки JSON из файла.parsedJson: Попытка разобрать строку JSON в структуру данныхJson.jsonObjects: Преобразование разобранного JSON в список объектов JSON.jsonArray.foreach: Для каждого объекта JSON создается и отправляется POST-запрос на сервер Atlas.createTypeRequest: Создание POST-запроса с JSON-данными для создания типов в Apache Atlas.response: Отправка запроса и вывод результата, включая тело ответа и код ответа.
теперь для создания всех энтити в Apache Atlas достаточно вызвать метод generatePicoSparkTypes()
Поскольку DataSet сущности уже созданы, можно сразу приступить к созданию Process сущностей с заполненными полями inputs и outputs. Это важно, так как при попытках обновления сущностей через API ничего не сработало. Начнем с определения набора методов:

как видим все EntityType созданы
Создаем DataSet Entity
Перед тем как создавать сущности процессов нужно создать сущности DataSet-тов, поскольку первые ссылаются на вторые
На данном уже определен pico_spark_data_type который отвечает за входные / выходные схемы данных.
Для начала определимся с двумя вспомогательными методами
/**
* Создает функцию для отправки JSON данных на указанный эндпоинт в Apache Atlas.
*
* @param postfix Строка, добавляемая к базовому URL для формирования полного URL эндпоинта.
* @return Функция, принимающая JSON строку и отправляющая ее на сервер через HTTP POST запрос.
*/
def senderJsonToAtlasEndpoint(postfix: String): String => Unit = {
jsonBody => {
// Создание HTTP POST запроса для отправки JSON данных на сервер
val createTypeRequest = basicRequest
.method(Method.POST, uri"$atlasServerUrl/${postfix}")
.header("Authorization", authHeader)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.body(jsonBody)
.response(asString)
// Отправка запроса и получение ответа
val response = createTypeRequest.send(backend)
// Вывод тела ответа и кода статуса
println(response.body)
println(response.code)
}
}
/**
* Генерирует и отправляет сущности данных Spark в Apache Atlas для указанного домена.
*
* @param domain Домен, который будет использоваться в атрибутах сущностей.
* @param execJsonAtlas Функция для отправки JSON данных в Apache Atlas.
* @return Функция, принимающая AST и создающая JSON для каждой дочерней сущности.
*/
def generateSparkDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {
// Локальная функция для генерации и отправки сущностей данных Spark
def generateEntities(ast: AST): Unit = {
ast.children.foreach { inast =>
// Формирование JSON тела для сущности данных Spark
val jsonBody =
f"""
|{
| "entity": {
| "typeName": "pico_spark_data_type",
| "attributes": {
| "domain": "${domain}",
| "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
| "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
| "description": "A description for the spark_data"
| }
| }
|}
|""".stripMargin
// Отправка сформированного JSON тела на сервер
execJsonAtlas(jsonBody)
// Рекурсивный вызов для обработки дочерних узлов
generateEntities(inast)
}
}
// Возвращаем функцию для генерации сущностей
generateEntities
}
Пояснения
senderJsonToAtlasEndpoint: Эта функция создает и возвращает другую функцию, которая отправляет JSON данные на указанный эндпоинт в Apache Atlas. Комментарии объясняют параметры, создание запроса, отправку и обработку ответа.generateSparkDataEntities: Эта функция генерирует сущности данных Spark, формирует соответствующий JSON и отправляет его в Apache Atlas, используя переданную функцию для отправки. Комментарии описывают параметры и внутреннюю логику функции, включая рекурсивный вызов для обработки всех дочерних узлов.
Напишем еще 2 метода для запуска формирования Linage В Atlas
/**
* Преобразует AST (абстрактное синтаксическое дерево) в сущности Apache Atlas и отправляет их на сервер.
*
* @param ast Абстрактное синтаксическое дерево, представляющее структуру данных.
* @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
* @param topLevelExpr Выражение уровня, используемое для определения уровня в AST. В данном случае не используется.
*/
def ASTToAtlasEntity(ast: AST, domain: String, topLevelExpr: String): Unit = {
// Создание функции для отправки JSON данных на эндпоинт "entity" в Apache Atlas
val entitySender = senderJsonToAtlasEndpoint("entity")
// Создание функции для генерации сущностей данных Spark и отправки их в Apache Atlas
val sparkDataEntityGenerator = generateSparkDataEntities(domain, entitySender)
// Создание базовых сущностей вывода и отправка их на сервер
//ее реализацию опущу
createBaseOutput(domain, entitySender)
// Создание базовых сущностей ввода и отправка их на сервер
//ее реализацию опущу
createBaseInput(domain, entitySender)
// Генерация и отправка сущностей данных Spark на основе AST
sparkDataEntityGenerator(ast)
}
/**
* Имплементация расширения для преобразования AST в сущности Apache Atlas.
*
* @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
*/
implicit class converter(ast: AST) {
/**
* Преобразует текущее AST в сущности Apache Atlas и отправляет их на сервер.
*
* @param domain Домен, используемый для формирования квалифицированных имен и атрибутов сущностей.
*/
def EntityToAtlas(domain: String): Unit = {
ASTToAtlasEntity(ast, domain, "")
}
}
Пояснения:
ASTToAtlasEntity: Этот метод преобразует переданное AST в сущности Apache Atlas и отправляет их на сервер. Он использует вспомогательные функции для создания базовых сущностей и генерации сущностей данных Spark, а также отправляет их на сервер через созданную функциюentitySender.EntityToAtlas: Это метод расширения (implicit class) для типаAST, который упрощает вызов методаASTToAtlasEntityс дефолтным значением дляtopLevelExpr. Этот метод предоставляет удобный способ преобразования AST в сущности Apache Atlas, используя указанный домен.
Теперь при запуске ast.EntityToAtlas("picoDomain")В атласе появляется data entity

так как DataSet Entity уже созданы, то можно создавать Process Entity сразу с заролнеными inputs и outputs, это важно поскольку сколько я не тыкалась в Api для обновления Entuty ничего не работало.
начнем с того что определим пачку методов:
// Создает функцию для отправки сущностей в Apache Atlas
// Использует функцию преобразования AST в JSON и функцию отправки JSON
def senderEntity(nodeToAtlasCreateEntityJson: (AST, String) => String, execJsonAtlas: String => Unit): (AST, String) => Unit = {
// Возвращает функцию, которая преобразует AST в JSON и отправляет его в Atlas
(ast: AST, topLevelExpr: String) => {
val jsonBody = nodeToAtlasCreateEntityJson(ast, topLevelExpr)
execJsonAtlas(jsonBody)
}
}
// Генерирует JSON для сущностей в Atlas на основе AST и уровня
// Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
def generatotrProcessEntity(domain: String, qualifiedName: (Node, String) => String): (AST, String) => String = {
(ast: AST, topLevelExpr: String) => {
val node = ast.node
// Создает список входных сущностей, если есть дочерние элементы
val inputs = if (ast.children.nonEmpty) {
ast.children.map(_.levelExpr).map { expr =>
f"""
|
|{
| "typeName": "pico_spark_data_type",
| "uniqueAttributes": {
| "qualifiedName": "pico_spark_data_${ast.levelExpr}-${expr}@${domain}"
| }
|}
|
|""".stripMargin
}.mkString(", ")
} else {
f"""
| {
| "typeName": "pico_spark_data_type",
| "uniqueAttributes": {
| "qualifiedName": "pico_spark_data_input@${domain}"
| }
| }
|""".stripMargin
}
// Создает JSON для выходных сущностей, если задан topLevelExpr
val output = if (topLevelExpr.nonEmpty) {
f"""
| {
| "typeName": "pico_spark_data_type",
| "uniqueAttributes": {
| "qualifiedName": "pico_spark_data_${topLevelExpr}-${ast.levelExpr}@${domain}"
| }
| }
|""".stripMargin
} else {
f"""
| {
| "typeName": "pico_spark_data_type",
| "uniqueAttributes": {
| "qualifiedName": "pico_spark_data_output@${domain}"
| }
| }
|""".stripMargin
}
// Определяет JSON для различных типов узлов, таких как ProjectNode, FilterNode и т.д.
node match {
case p: ProjectNode =>
f"""
|{
|"entity": {
| "typeName": "pico_spark_project_type",
| "attributes": {
| "qualifiedName": "${qualifiedName(node, ast.levelExpr)}",
| "name": "pico_project_${ast.levelExpr}",
| "description": "This is an project for the pico_spark_project_type",
| "columns": [${p.columns.map(col => "\"" + col + "\"").mkString(", ")}],
| "inputs":[ ${inputs} ],
| "outputs":[ ${output} ]
| }
| }
|}
|""".stripMargin
case ...
}
}
}
// Создает функцию для генерации и отправки сущностей в Apache Atlas
// Использует предоставленные функции для создания JSON и отправки его в Atlas
def generatorDataEntities(domain: String, execJsonAtlas: String => Unit): AST => Unit = {
def sparkDataEntitys(ast: AST): Unit = {
ast.children.foreach { inast =>
val jsonBody =
f"""
|{
| "entity": {
| "typeName": "pico_spark_data_type",
| "attributes": {
| "domain": "${domain}",
| "qualifiedName": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
| "name": "pico_spark_data_${ast.levelExpr}-${inast.levelExpr}@${domain}",
| "description": "A description for the spark_data"
| }
| }
|}
|""".stripMargin
execJsonAtlas(jsonBody)
sparkDataEntitys(inast)
}
}
// Возвращает функцию, которая генерирует и отправляет сущности данных для Spark
sparkDataEntitys
}
Пояснения:
senderEntity: Функция, которая создает и отправляет JSON для сущностей в Apache Atlas, используя предоставленные функции преобразования и отправки.generatotrProcessEntity: Функция, которая генерирует JSON для различных типов узлов в AST и преобразует их в формат, пригодный для Apache Atlas.generatorDataEntities: Функция, которая создает и отправляет данные сущностей для Spark, рекурсивно обрабатывая детей узлов в AST.
И обновляем методы для работы с AST
// Преобразует AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
def ASTToAtlasEntity(ast: AST, domain: String): Unit = {
// Создает функцию отправки JSON-данных для сущностей в Apache Atlas
val entitySender = senderJsonToAtlasEndpoint("entity")
// Создает функцию для генерации квалифицированного имени
val qualifiedName = generatorQualifiedName(domain)
// Создает функцию для генерации JSON-сущностей для процессов
val generatorProcessEntity = generatotrProcessEntity(domain, qualifiedName)
// Создает функцию для отправки JSON-данных сущностей в Atlas
val sendEntity = senderEntity(generatorProcessEntity, entitySender)
// Создает функцию для генерации данных сущностей и отправки их в Atlas
val generateDataEntity = generatorDataEntities(domain, entitySender)
// Обрабатывает один узел AST, отправляя его как сущность в Atlas
def processNode(ast: AST, intopLevelExpr: String): Unit = {
sendEntity(ast, intopLevelExpr)
}
// Рекурсивно проходит по всему дереву AST, обрабатывая каждый узел
def traverseAST(ast: AST, intopLevelExpr: String): Unit = {
processNode(ast, intopLevelExpr)
ast.children.foreach(ch => traverseAST(ch, ast.levelExpr))
}
// Создает базовые выходные и входные сущности для указанного домена и отправляет их в Atlas
createBaseOutput(domain, entitySender)
createBaseInput(domain, entitySender)
// Генерирует данные сущностей для AST и отправляет их в Atlas
generateDataEntity(ast)
// Запускает рекурсивное прохождение AST
traverseAST(ast, "")
}
// Обогащает класс AST функцией для преобразования его в сущности Apache Atlas
implicit class converter(ast: AST) {
// Преобразует текущий узел AST в сущности Apache Atlas и отправляет их на указанный эндпоинт
def EntityToAtlas(domain: String): Unit = {
ASTToAtlasEntity(ast, domain)
}
}
Пояснения:
-
ASTToAtlasEntity: Основной метод, который:Создает функции для преобразования AST в JSON и отправки его в Apache Atlas.
Определяет вспомогательные функции для обработки узлов AST и рекурсивного обхода дерева.
Создает и отправляет базовые сущности (входные и выходные) в Atlas.
Рекурсивно проходит по дереву AST и отправляет каждую сущность в Atlas.
-
implicit class converter(ast: AST): Обогащает классAST, добавляя метод для преобразования AST в сущности Apache Atlas.EntityToAtlas: Использует методASTToAtlasEntityдля преобразования текущего узлаASTв сущности Atlas и отправки их в указанный домен.
Теперь после запуска В Apache Atlas таки появиться Linage

Что ж, на изначальный logical план вроде похоже
Project [model#17, manufacturer#18, 2024-09-12 16:57:34.046609 AS processedDDTM#36]
+- Project [model#17, manufacturer#18]
+- Filter NOT (manufacturer#18 = Audi)
+- Union false, false
:- Relation [model#17,manufacturer#18] csv
+- Project [_1#23 AS model#28, _2#24 AS manufacturer#29]
+- LocalRelation [_1#23, _2#24]
P.S. код можно глянуть тут
P.P.S докер фалы для запуска Apache Atlas можно взять тут
dolfinus
А что даст такое хранение плана dataframe в Atlas? В реальных ETL процессах постоянно происходит какие-то изменения, план сложно назвать постоянным. Задача была в изучении того, как планы меняются во времени?