Apache Airflow долгое время ассоциировался с таким стилем описания workflow:

# объявляем задачи-таски
task1 = PythonOperator(...)
task2 = BashOperator(...)

# проставляем зависимости между ними
task1 >> task2

Это рабочий и до сих пор актуальный подход, но с Airflow 2.0.0 появился TaskFlow API — способ описывать DAG‑и через обычные Python функции и декораторы:

@dag(dag_id="linear_demo")
def tutorial_dag()
    @task
    def extract():
       return 42
    
    @task
    def transform(x):
       return x * 2
    
    # описываем зависимости и строим Flow
    y = transform(extract())

# создаем даг    
tutorial_dag()

TaskFlow в Airflow позволяет описывать DAG как обычный Python‑код: @dag задает сам workflow/DAG, а @task превращает Python‑функции в задачи Airflow. При вызове декорированных функций, например transform(extract()), выполняется не сам расчет, а создаются объекты задач, связи между ними и ссылки на их будущие результаты (через объект XComArg).

То есть TaskFlow — это декларативный DSL(domain‑specific language) для построения DAG, где вызовы функций не выполняют вычисления, а описывают граф зависимостей.

Задачи статьи

В этой статье попробуем:

  1. Заглянуть внутрь Airflow и понять, как работает TaskFlow API (для версии 3.2.1)

  2. На основе этих идей написать собственный микро‑фреймворк для закрепления понимания.

  3. Сохранить названия и общую логику внутренних объектов Airflow.

  4. Понять главный архитектурный принцип: описание DAG ≠ выполнение DAG.

Итак, давайте еще раз рассмотрим основные фазы TaskFlow:

  1. Создается DAG через @dag и вызов функции DAG‑а.
    В этот момент Airflow создает объект DAG, входит в его контекст и начинает исполнять тело функции DAG‑а для сборки графа.

  2. Внутри тела DAG‑а @task декорирует Python‑функции.
    То есть extract, transform становятся не обычными функциями, а объектами‑декораторами (_TaskDecorator), которые умеют создавать Airflow‑задачи.

  3. При вызове декорированных функций, например

    y = transform(extract())

    создаются объекты задач (операторы) и зависимости между задачами.
    Именно здесь фактически собирается граф DAG.

  4. Позже scheduler и worker исполняют уже собранный DAG.

Что делает @task

Объект task в Airflow является специальным вызываемым объектом (TaskDecoratorCollection). При декорировании функции он создаёт другой объект _TaskDecorator. см. исходники в [task‑sdk\src\airflow\sdk\definitions\decorators\__init__.py]

Упрощённо:

class TaskDecoratorCollection:
   def __call__(self, function):
       return _TaskDecorator(function)

task = TaskDecoratorCollection()

Напомню:

@task
def extract():

эквивалентно: extract = task(extract)

А значит будет вызван:

TaskDecoratorCollection.__call__()

который вернёт _TaskDecorator, объявлен в [task‑sdk\src\airflow\sdk\bases\decorator.py]

Переход от TaskDecoratorCollection к TaskDecorator немного запутан, если проследить — через _getattr__(«python»), достает Python task decorator из provider registry

def python_task(...):
[providers\standard\src\airflow\providers\standard\decorators\python.py]

Далее вызывает task_decorator_factory(...), который и возвращает _TaskDecorator

Что такое _TaskDecorator

Это внутренний объект, который стоит за @task. Он хранит исходную функцию и умеет превращать её вызов в задачу DAG. 

class _TaskDecorator:
    # храним функцию
    def __init__(self, function):
        self.function = function

  # создается оператор Airflow и оборачивается в XComArg
  def __call__(self, ...):
      op = BaseOperator(
          python_callable = self.function,
          ...
      )

      return XComArg(op)

Результат

  • создаёт operator

  • регистрирует его в DAG

  • возвращает ссылку на результат

Что такое XComArg

XComArg — это ленивая ссылка на будущий результат задачи, то есть это не само значение, а декларативная ссылка на результат upstream‑задачи, которая будет разрешена только во время выполнения DAG.

В нашем простом случае мы просто обернем оператор в блок init:

class XComArg:
   def __init__(self, operator):
       self.operator = operator

Смысл такой: _TaskDecorator создаёт operator, наружу возвращается XComArg(operator) и в логике проверяя аргументы, если видим XComArg, то понимаем что это задача.

То есть:

BaseOperator = задача в DAG

XComArg = ссылка на output этой задачи

Что такое оператор в Airflow

Оператор — это объект задачи в DAG. Идея:

operator = узел графа + правила его выполнения

Оператор описывает:

  • что запускать

  • от чего зависит задача

  • как её выполнять

  • параметры retries / pools / queue / timeout

В нашем учебном примере оператор хранит Python‑функцию, а в реальном Airflow это может быть любая логика выполнения, не только Python‑функция. Также нужен даг (чтобы зарегистрировать там задачу). В нашем примере это будет просто глобальная переменная _CURRENTDAG.

При создании объекта оператора мы также проставляем зависимости между задачами в текущем даге:

  • upstream_task_ids — Идентификаторы upstream‑задач, от которых зависит текущая задача

  • downstream_task_ids — Идентификаторы downstream‑задач, которые зависят от текущей задачи

  • dag.add_task(self) — добавляем текущий оператор в текущий даг

# храним текущий даг здесь
_CURRENT_DAG = None

class BaseOperator:
    def __init__(self, python_callable, ...):
        # Идентификатор задачи.
        # В этом учебном примере это просто имя Python-функции.
        self.task_id = python_callable.__name__

        # Python-функция, которую будет выполнять эта задача.
        self.python_callable = python_callable

        # Для простоты берем текущий DAG из глобального контекста.
        self.dag = _CURRENT_DAG

        # Идентификаторы upstream-задач, от которых зависит текущая задача.
        self.upstream_task_ids = set()
        # Идентификаторы downstream-задач, которые зависят от текущей задачи.
        self.downstream_task_ids = set()

        # Ищем XComArg в аргументах задачи и по ним строим зависимости
        # upstream_task_ids и downstream_task_ids
        self._set_xcomarg_dependencies()

        # Добавляем задачу в DAG.
        self.dag.add_task(self)

    # Выполнение задачи.
    def execute(self, context):
        return self.python_callable()

Реальные наследники BaseOperator

В Airflow это, например:

  • PythonOperator

  • BashOperator

  • Sensor operators

  • SQL operators

Что такое DAG и @dag

DAG — объект, который хранит описание workflow как граф задач. Он отвечает за:

  • dag_id, идентификатор

  • список задач

  • зависимости между задачами

  • контекст with DAG(...) и так далее

В TaskFlow задается через декоратор @dag, но по сути @dag это удобная обертка над with DAG(...), см. [task‑sdk\src\airflow\sdk\definitions\dag.py] в реализации декоратора def dag.

То есть конструкция вида:

@dag(...) 
def tutorial_dag():     
  ...

по смыслу близка к:

def tutorial_dag():    
  with DAG(...) as dag_obj:
    ...     
    return dag_obj

Для примера я сделал лишь версию с контекстным менеджером, чтобы не усложнять. И напомню, что у нас это будет глобальная переменная, в которую будем писать название текущего дага при входе/выходе из контекста.:

_CURRENTDAG = None

class DAG:
    def __init__(self, dag_id):
        self.dag_id = dag_id
        # список задач для этого дага
        self.task_dict = {}
    
    # вызывается при создании оператора
    def add_task(self, task):
        self.task_dict[task.task_id] = task

    @property
    def tasks(self):
        return list(self.task_dict.values())
    
    # протокол контекстного менеджера
    def __enter__(self):
        global CURRENTDAG
        _CURRENTDAG = self
        return self

    def __exit__(self, exc_type, exc, tb):
        global CURRENTDAG
        _CURRENTDAG = None

Почему нужен with DAG(...)

Когда создаётся задача внутри блока:

with DAG("demo"):
   x = extract()

новая задача автоматически привязывается к текущему DAG.

Выполнение и что такое TaskInstance

Когда DAG уже описан, наступает runtime. Scheduler анализирует DAG и планирует выполнение задач, а worker исполняет конкретные TaskInstance — то есть конкретные запуски конкретных задач.

Идея:

  • BaseOperator — описание задачи

  • TaskInstance — конкретное исполнение этой задачи

Например, TaskInstance можно представить как:

  • task_id = “extract”

  • run_id или логическая дата запуска DAG‑а

  • try_number = 2

При выполнении задачи ее результат может автоматически сохраняться через механизм XCom (механизм передачи данных между задачами).

  • в учебном примере результат сохраняется в XComStore

  • в настоящем Airflow результат сохраняется в XCom backend / metadata database

# хранилище сохраненных значений
class XComStore:
    def __init__(self) -> None:
        self.values: dict[tuple[str, str, str], Any] = {}

    def push(self, dag_id: str, task_id: str, key: str, value: Any) -> None:
        self.values[(dag_id, task_id, key)] = value

    def pull(self, dag_id: str, task_id: str, key: str = XCOM_RETURN_KEY) -> Any:
        return self.values[(dag_id, task_id, key)]

class TaskInstance:
    def __init__(self, task: BaseOperator, xcom_store: XComStore):
        self.task = task
        self.xcom_store = xcom_store

    def xcom_push(self, key: str, value: Any) -> None:
        self.xcom_store.push(...)
        
    def xcom_pull(self, task_ids: str, key: str = XCOM_RETURN_KEY) -> Any:
        return self.xcom_store.pull(...)

    def run(self):
        # запускаем на исполнение
        result = self.task.execute(context={})
        # сохраняем результат в хранилище
        self.xcom_store.push(self.task.task_id, result)

Что такое XCom

XCom — механизм передачи данных между задачами. В примере это просто словарь c ключем (dag_id, task_id, key). В настоящем Airflow XCom хранится как записи в metadata database в модели XComModel, на практике это часто PostgreSQL.

Определяется в [airflow‑core\src\airflow\models\xcom.py]

То есть по имени дага, таски, ключу можно получить что там сохранили. Например:

@task
def extract():
   return 42

Возвращаемое значение TaskFlow‑задачи автоматически сериализуется и сохраняется как XCom под специальным ключом return_value. Следующая задача может получить его:

@task
def transform(x):
   return x * 2

TaskFlow API делает это автоматически. В классическом стиле Airflow можно делать вручную: ti.xcom_pull(task_ids="extract"),здесь ti — это экземпляр TaskInstance.

В реальном Airflow обычно недостаточно только (dag_id, task_id, key) — еще важны run_id и map_index

  • run_id — идентификатор конкретного запуска DAG‑а

  • map_index нужен для dynamic task mapping, когда одна задача разворачивается в несколько параллельных task instances

Что важно понимать

XCom хранится в metadata DB Airflow, поэтому передача больших объектов через XCom может резко замедлить scheduler и webserver. Поэтому XCom предназначен для небольших данных: числа, строки, json, id, пути к файлам, метаданные. Не стоит передавать большие DataFrame. Лучше:

  1. task1 пишет parquet

  2. task2 получает путь через XCom

Общий итог

Мы реализовали очень упрощенную версию следующих объектов airflow, попытались сохранить внутреннюю логику и названия:

  • DAG

  • Operator

  • TaskDecorator

  • XComArg

  • TaskInstance

  • XCom

То есть рассмотрели основные концепции Airflow и TaskFlow API.

А далее — минимальный рабочий пример. В нем добавлен объект LinearTaskRunner, который умеет запускать наш линейный ETL.

from __future__ import annotations

from collections.abc import Callable
from typing import Any

_CURRENT_DAG = None

XCOM_RETURN_KEY = "return_value"


class DAG:
    def __init__(self, dag_id: str) -> None:
        self.dag_id = dag_id
        self.task_dict: dict[str, BaseOperator] = {}

    def add_task(self, task: BaseOperator) -> None:
        # проверка на дубликат task
        #if task.task_id in self.task_dict and self.task_dict[task.task_id] is not task:
        #    raise ValueError(f"Task id {task.task_id!r} already exists in DAG")
        self.task_dict[task.task_id] = task

    @property
    def tasks(self) -> list[BaseOperator]:
        return list(self.task_dict.values())

    def get_task(self, task_id: str) -> BaseOperator:
        return self.task_dict[task_id]

    def __enter__(self) -> DAG:
        global _CURRENT_DAG
        _CURRENT_DAG = self
        return self

    def __exit__(self, exc_type: object, exc: object, tb: object) -> None:
        global _CURRENT_DAG
        _CURRENT_DAG = None


class XComStore:
    def __init__(self) -> None:
        self.values: dict[tuple[str, str, str], Any] = {}

    def push(self, dag_id: str, task_id: str, key: str, value: Any) -> None:
        self.values[(dag_id, task_id, key)] = value

    def pull(self, dag_id: str, task_id: str, key: str = XCOM_RETURN_KEY) -> Any:
        return self.values[(dag_id, task_id, key)]


class XComArg:
    def __init__(self, operator: BaseOperator, key: str = XCOM_RETURN_KEY) -> None:
        self.operator = operator
        self.key = key

# Базовый класс оператора
class BaseOperator:
    def __init__(
        self,
        python_callable: Callable[..., Any],
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
        dag: DAG | None = None,
    ) -> None:
        self.task_id = python_callable.__name__
        self.python_callable = python_callable
        self.args = args
        self.kwargs = kwargs
        self.dag = dag or _CURRENT_DAG
        self.upstream_task_ids: set[str] = set()
        self.downstream_task_ids: set[str] = set()

        self._set_xcomarg_dependencies()
        # кладем эту задачу для текущего дага
        if self.dag is not None:
            self.dag.add_task(self)

    def set_upstream(self, other: BaseOperator) -> None:
        self.upstream_task_ids.add(other.task_id)
        other.downstream_task_ids.add(self.task_id)

    def _set_xcomarg_dependencies(self) -> None:
        for arg in self.args:
            if isinstance(arg, XComArg):
                self.set_upstream(arg.operator)

        for arg in self.kwargs.values():
            if isinstance(arg, XComArg):
                self.set_upstream(arg.operator)

    def execute(self, context: dict[str, TaskInstance]) -> Any:
        resolved_args = [
            context["ti"].resolve(arg) if isinstance(arg, XComArg) else arg
            for arg in self.args
        ]
        resolved_kwargs = {
            key: context["ti"].resolve(value) if isinstance(value, XComArg) else value
            for key, value in self.kwargs.items()
        }
        return self.python_callable(*resolved_args, **resolved_kwargs)


class TaskInstance:
    def __init__(self, task: BaseOperator, xcom_store: XComStore) -> None:
        self.task = task
        self.xcom_store = xcom_store

    def xcom_push(self, key: str, value: Any) -> None:
        self.xcom_store.push(
            dag_id=self.task.dag.dag_id,
            task_id=self.task.task_id,
            key=key,
            value=value,
        )

    def xcom_pull(self, task_ids: str, key: str = XCOM_RETURN_KEY) -> Any:
        return self.xcom_store.pull(
            dag_id=self.task.dag.dag_id,
            task_id=task_ids,
            key=key,
        )

    def resolve(self, value: Any) -> Any:
        if isinstance(value, XComArg):
            return self.xcom_pull(task_ids=value.operator.task_id, key=value.key)
        return value

    def run(self) -> Any:
        context = {"ti": self}
        result = self.task.execute(context)
        self.xcom_push(XCOM_RETURN_KEY, result)
        return result


class LinearTaskRunner:
    def __init__(self, dag: DAG, xcom_store: XComStore) -> None:
        self.dag = dag
        self.xcom_store = xcom_store

    def run(self, task: BaseOperator) -> Any:
        self._run_task(task)
        return self.xcom_store.pull(self.dag.dag_id, task.task_id)

    def _run_task(self, task: BaseOperator) -> None:
        """ def _run_task(self, task):
                if task уже посчитан:
                    return

                для каждого upstream:
                    _run_task(upstream)

                запусти текущую задачу
        """
        if (self.dag.dag_id, task.task_id, XCOM_RETURN_KEY) in self.xcom_store.values:
            return
        # Рекурсивный запуск upstream-задач
        for upstream_task_id in task.upstream_task_ids:
            upstream_task = self.dag.get_task(upstream_task_id)
            self._run_task(upstream_task)
        ti = TaskInstance(task=task, xcom_store=self.xcom_store)
        ti.run()


class _TaskDecorator:
    def __init__(self, function: Callable[..., Any]) -> None:
        self.function = function

    def __call__(self, *args: Any, **kwargs: Any) -> XComArg:
        op = BaseOperator(
            python_callable=self.function,
            args=args,
            kwargs=kwargs,
        )
        return XComArg(op)


class TaskDecoratorCollection:
    def __call__(self, function: Callable[..., Any]) -> _TaskDecorator:
        return _TaskDecorator(function)


task = TaskDecoratorCollection()


with DAG("linear_demo") as linear_dag:

    @task
    def extract():
        print("extract")
        return 3

    @task
    def transform(x):
        print("transform")
        return x + 2

    @task
    def load(x):
        print("load")
        return x * 10

    result = load(transform(extract()))

linear_xcom_store = XComStore()
linear_runner = LinearTaskRunner(dag=linear_dag, xcom_store=linear_xcom_store)

print(linear_runner.run(result.operator))  # 50
print(linear_xcom_store.pull("linear_demo", "extract"))  # 3
print(linear_xcom_store.pull("linear_demo", "transform"))  # 5
print(linear_xcom_store.pull("linear_demo", "load"))  # 50

Комментарии (0)