Большинство современных систем — это не просто код, выполняющий запросы, а последовательности действий, которые должны выполняться атомарно и восстанавливаться при сбое. Речь идёт не о бизнес-логике в пределах одной функции, а об оркестрации процессов: цепочках шагов, где каждая операция может завершиться ошибкой, требующей компенсации.

Такую задачу решает паттерн Saga — один из самых сложных и важных архитектурных паттернов. Он описывает, как выполнить серию распределённых операций с возможностью отката (rollback), не прибегая к глобальным транзакциям.

Примечание: движок пока не проходил испытаний в продакшене и не является production-ready решением. В настоящий момент проект активно тестируется и оптимизируется. Цель статьи — не заявить о production-ready платформе, а показать архитектуру Saga-движка, который можно встроить в Go-приложение без инфраструктурных накладных расходов. Если вы хотите поэкспериментировать с workflow-движком или поучаствовать в обкатке, буду рад обратной связи.

Проблема

Реализация оркестрации вручную обычно быстро превращается в хаос. Ошибки приходится обрабатывать каскадно, rollback логика размазывается по коду, а попытки добавить пользовательское подтверждение или параллельные ветки делают систему непредсказуемой.

С другой стороны, есть зрелые платформы вроде Temporal или Cadence. Они надёжны, но требуют развертывания целой инфраструктуры: брокеров, воркеров и делают простой процесс зависимым от внешней экосистемы.

Между этими крайностями и появился Floxy — встраиваемая библиотека на Go, реализующая Saga-паттерн с оркестрацией, компенсациями и интерактивными шагами, без внешних сервисов и тяжёлого runtime.

Когда Workflow действительно нужен

Небольшое отступление. Workflow-подход — это не просто способ «красивее организовать бизнес-логику». Это архитектурный инструмент, который становится необходимым, когда простые машины состояний перестают справляться. Ниже — ключевые сценарии, в которых использование workflow приносит максимальную пользу.

Сложные последовательности действий

Когда процесс — это не переходы между статусами, а цепочка зависимых операций:

  • вызовы внешних сервисов;

  • проверка ограничений;

  • резервирование ресурсов;

  • подтверждение пользователя;

  • пост-обработка результатов.

В таких случаях «статус» показывает только положение, но не само поведение процесса, а логика расползается по сервисам, cron-джобам и хендлерам. Workflow позволяет описать последовательность шагов как структуру, которую движок выполняет и контролирует.

Процессы с компенсациями (подход SAGA)

Шаг N может завершиться ошибкой, требующей отмены шагов 1...N−1:

  • снять резерв товара;

  • вернуть деньги;

  • удалять созданные записи;

  • отменить отправку данных в сторонние сервисы.

Ручная реализация компенсаций быстро превращается в хаос. Workflow/Saga обеспечивает централизованную и предсказуемую обработку откатов. Floxy делает это без брокеров, внешних рантаймов и тяжёлой инфраструктуры.

Гарантии идемпотентности и достоверности состояния

В распределённых системах типичны проблемы:

  • повторное выполнение шагов;

  • гонки за состояние;

  • «зависшие» сущности;

  • дубли запросов.

Workflow фиксирует состояние каждого шага и позволяет движку безопасно выполнять повторы, используя уникальный для каждого шага idempotency key. Разработчик концентрируется на бизнес-логике, а не на defensive programming.

Условия, ветвления и параллельные ветви

Процессы с:

  • условными ветками,

  • параллельной обработкой (Fork),

  • синхронизацией (Join),

  • несколькими путями завершения.

В статусных моделях это приводит к бесконечным промежуточным состояниям и трудно отслеживаемой логике. Workflow выражает это декларативно.

Человеское участие

Если процесс должен:

  • ждать подтверждения пользователя,

  • проходить ручную модерацию,

  • зависеть от действий человека или внешнего сервиса,

то статус «WAITING_APPROVAL» ничего не гарантирует — кроме того, что кто-то когда-то должен изменить его на «APPROVED». Workflow фиксирует точку остановки и корректно продолжает процесс после подтверждения.

Длительные процессы

Когда выполнение занимает:

  • минуты,

  • часы,

  • дни,

  • требует ожидания событий извне,

workflow — единственный способ надёжно хранить прогресс, восстанавливаться после сбоев и контролировать время выполнения.

Версионирование процессов

Когда бизнес-процессы меняются со временем:

  • добавляются шаги;

  • изменяются условия;

  • меняются компенсации.

Статусная модель превращается в клубок условий «если объект старого формата». Workflow позволяет запускать старые и новые процессы параллельно, используя разные версии схемы.

Наблюдаемость и диагностика

Если важно:

  • видеть граф процесса;

  • понимать, где он остановился;

  • разбираться, какие шаги выполнялись;

  • анализировать ошибки.

Workflow делает историю частью модели. Статусы дают лишь текущий срез, лишая контекста.

Когда workflow приносит максимальную пользу — краткое резюме

Используйте workflow, а не статусную модель, когда процесс:

  • сложный, ветвистый или многошаговый;

  • требует компенсаций при сбое;

  • должен гарантировать идемпотентность;

  • включает внешних участников или ручные действия;

  • может выполняться долго;

  • требует возобновления после сбоев;

  • должен быть наблюдаемым и диагностируемым;

  • должен иметь версионирование.

В общем, в подобных ситуациях стоит не плодить статусы внутри сущностей, а вынести процесс в отдельные таблицы и управлять им через workflow. Это упрощает логику и делает систему надёжнее.

Философия Floxy

Floxy опирается на простую идею: workflow — это часть программы, а не отдельный сервис.

Вместо выделенной платформы с RPC и брокерами Floxy предлагает библиотеку, в которой бизнес-процесс описывается с помощью обычного кода Go — без нового языка или YAML-файлов.

Основные принципы:

  1. Минимализм. Всё строится вокруг context.Contextpgx и простых структур данных.

  2. Предсказуемость. Любое состояние хранится в PostgreSQL; поведение детерминировано.

  3. Изоляция. Все таблицы создаются в схеме workflows, не мешая основной схеме базы данных.

  4. Оркестрация как библиотека. Saga, retry, rollback и human-in-the-loop доступны без внешнего runtime.

  5. Версионирование. Каждый workflow-шаблон имеет номер версии, обеспечивая безопасное развитие процессов.

Ключевые возможности

Floxy реализует полный набор функций для построения надёжных оркестраций:

  • Saga с оркестрацией и компенсациями. Каждый шаг может иметь OnFailure-обработчик, выполняющий откат или компенсацию.

  • SavePoint. Частичный rollback до последней сохранённой точки.

  • Conditional steps. Ветвления логики с помощью Go templates — без внешнего DSL.

  • Parallel / Fork / Join. Параллельные ветви выполнения и последующая синхронизация.

  • Human-in-the-loop. Поддержка шагов, требующих участия человека (confirmreject).

  • Cancel и Abort. Мягкая отмена или немедленное завершение workflow.

  • Dead Letter Queue. Режим DLQ с приостановкой workflow и ручным восстановлением.

  • Idempotency-aware шаги. Контекст выполнения (StepContext) предоставляет метод IdempotencyKey(), помогающий разработчикам реализовать безопасные операции.

  • Миграции встроены через go:embed. Floxy полностью самодостаточен и имеет функцию применения миграций.

Абстракции

Floxy — это библиотека с простыми, но выразительными абстракциями:

  1. Store — слой хранения шаблонов, инстансов шаблонов, состояний и событий (PostgreSQL через pgx).

  2. Builder — билдер workflow шаблонов

  3. Engine — исполнитель и координатор шагов: планирует, откатывает, повторяет, синхронизирует.

  4. Worker Pool — фоновый пул, обрабатывающий очередь шагов.

Каждый шаг выполняется в контексте (context.Context), а фоновый воркер проверяет таблицу workflow_cancel_requests, чтобы своевременно прерывать long-running шаги.

Workflow как граф

Workflow в Floxy — это ориентированный ациклический граф шагов (DAG), определяемый через встроенный Builder API.

Builder формирует структуру в виде списка смежности, проверяет наличие циклов и сериализует описание в JSON для хранения в workflow_definitions.

wf, _ := floxy.NewBuilder("order", 1).
    Step("reserve_stock", "stock.Reserve").
    Then("charge_payment", "payment.Charge").
    OnFailure("refund", "payment.Refund").
    Step("send_email", "notifications.Send").
    Build()

Если Builder обнаруживает цикл, Build() возвращает ошибку — проверяя корректность графа ещё до запуска флоу в движке.

Версионирование и изоляция

Каждый шаблон workflow хранится с номером версии. При обновлении шаблона разработчику необходиму увеличить номер версии. Таким образом, запущенные экземпляры продолжают выполнение по своей оригинальной схеме.

Все таблицы Floxy находятся в отдельной схеме workflows, включая таблицы workflow_instancesworkflow_stepsworkflow_events и workflow_definitions и другие. Это обеспечивает полную изоляцию и упрощает интеграцию в существующие приложения.

Human-in-the-loop и взаимодействие

Floxy поддерживает интерактивные шаги (StepTypeHuman), которые приостанавливают выполнение и ждут решения пользователя.

Workflow переходит в состояние waiting_decision, а решение (confirmed или rejected) записывается в таблицу workflow_human_decisions. После этого движок либо продолжает выполнение, либо завершает процесс с ошибкой.

Таким образом, Floxy может использоваться не только для автоматических процессов, но и для сценариев с подтверждением, ревью или ручным контролем.

Cancel и Abort

Floxy поддерживает два механизма остановки:

  • Cancel — выполняет rollback до корня (save points игнорируются),

  • Abort — немедленно прерывает выполнение без компенсации.

Оба варианта инициируются добавлением записи в таблицу workflow_cancel_requests. Фоновый воркер периодически опрашивает её и вызывает context.CancelFunc() для активных шагов соответствующего инстанса.

Dead Letter Queue

Floxy поддерживает два различных режима обработки ошибок:

  1. Классический режим Saga (по умолчанию): при сбое шага движок выполняет откат к последней точке сохранения и запускает обработчики компенсаций.

  2. Режим DLQ: откат отключен, рабочий процесс приостанавливается в состоянии dlq, а неудавшиеся шаги сохраняются в DLQ для ручного анализа.

Поведение режима DLQ

Когда DLQ включен для рабочего процесса:

  1. Обработчики компенсаций не выполняются.

  2. Рабочий процесс приостановлен: workflow переходит в статус dlq (не терминальный, может быть возобновлен).

  3. Активные шаги заморожены: все запущенные шаги переведены в состояние паузы.

  4. Очередь очищена: очередь экземпляра очищается для предотвращения дальнейшего выполнения.

  5. Ручное восстановление: после устранения проблем используйте RequeueFromDLQ для возобновления.

Диаграмма последовательностей

Ниже представлена верхнеуровневая диаграмма последовательностей работы движка:

Диаграммы с подробным разбором работы движка есть в директории docs репозитория floxy.

Тестирование и примеры

Floxy покрыт большим количеством unit- и интеграционных тестов, которые используют testcontainers для автоматического запуска PostgreSQL в контейнере. Это позволяет проверять корректную работу движка во всех сценариях — от простых последовательных флоу до сложных параллельных и компенсационных процессов.

Дополнительно для проекта реализовано хаос-тестирование, позволяющее проверять устойчивость движка к задержкам, сбоям и непредсказуемому поведению окружения. Примеры таких тестов можно найти в отдельном репозитории: https://github.com/floxy-project/floxy-stress-test. Тестирование сделано при помощи фреймворка ChaosKit, специально разработанного для chaos-тестирования floxy (фреймворк общего назначения, но идея его создать была изначально именно с целью тестирования workflow движка).

Кроме того, в репозитории размещено множество примеров (./examples), которые демонстрируют различные типы шагов, использование OnFailure, ветвления, условия, human-in-the-loop сценарии и rollback-политику. Это делает вход в проект простым и наглядным даже для новичков в Go.

Помимо этого, репозиторий оснащен большим количеством документации и PlantUML диаграммами, позволяющими детально понять процесс работы движка.

Почему Floxy остаётся лёгким

Floxy не использует брокеров, RPC или внешние демоны. Он работает полностью внутри процесса приложения, опираясь только на PostgreSQL и стандартные пакеты Go и pgx:

  • pgx — быстрый драйвер и пул соединений;

  • context — управление временем жизни операций;

  • net/http — REST API через новый ServeMux;

  • go:embed — встроенные миграции и схемы.

Несмотря на наличие фоновых воркеров и планировщика, Floxy остаётся библиотекой, а не платформой — без отдельных бинарников или RPC протоколов.

Пример использования

engine := floxy.NewEngine(pgxPool)
defer engine.Shutdown()

wf, _ := floxy.NewBuilder("order", 1).
    Step("reserve_stock", "stock.Reserve").
    Then("charge_payment", "payment.Charge").
    OnFailure("refund", "payment.Refund").
    Step("send_email", "notifications.Send").
    Build()

engine.RegisterWorkflow(ctx, wf)

engine.RegisterHandler(&ReserveStock{})
engine.RegisterHandler(&ChargePayment{})
engine.RegisterHandler(&RefundPayment{})
engine.RegisterHandler(&Notifications{})

workerPool := floxy.NewWorkerPool(engine, 3, 100*time.Millisecond)
workerPool.Start(ctx)

instanceID, err := engine.Start(ctx, "order-v1", input)

Экосистема Floxy

Floxy развивается как полноценная экосистема инструментов для всех стадий работы с workflow.

Floxy Pro

Floxy Pro — расширенная версия библиотеки, созданная для больших объемов данных. Она добавляет:

  • Partitioned Tables с pg_partman. Все основные таблицы (workflow_instancesworkflow_stepsworkflow_eventsworkflow_dlq) разбиваются на партиции по created_at с ежедневными секциями. Это обеспечивает автоматическое управление, быструю очистку старых данных и высокий уровень производительности при больших объёмах записей.

  • Автоматическое управление партициями. pg_partman создаёт новые секции на 30 дней вперёд и удаляет старые (90-дневное хранение) — без ручного вмешательства.

  • floxyctl. CLI-инструмент для запуска, отладки и управления workflow-инстансами. Поддерживает два режима:

    1. In-memory — выполнение YAML-описанных workflow без БД (подходит для тестов, CI/CD и скриптов).

    2. Database Mode — управление workflow-инстансами в PostgreSQL: запуск, отмена (cancel), аварийное завершение (abort).

    floxyctl особенно полезен в DevOps и CI/CD pipelines, где шаги описываются в YAML и выполняются через bash-скрипты или HTTP-запросы. Это позволяет использовать Floxy как workflow engine, даже если основное приложение написано не на Go (например, Ruby, Python или Node.js) — шаги могут вызываться через HTTP-хендлеры.

  • floxyd. Демон непрерывной обработки workflow (для других языков программирования). Работает как долговременный сервис с конфигурируемым пулом воркеров, polling-интервалами и статистикой выполнения. Поддерживает Bash и HTTP-хендлеры, TLS, health checks и Prometheus metrics. floxyd подходит для постоянной эксплуатации workflow-процессов.

Отличие floxyctl и floxyd:

Характеристика

floxyctl

floxyd

Режим работы

CLI

Долговременный демон

Хранилище

In-memory или PostgreSQL

Только PostgreSQL

Назначение

Тесты, локальные запуски, CI/CD

Тесты, локальные запуски, CI/CD, долговременные процессы

Выполнение

Одноразовое

Непрерывное

Floxy UI и IDE-плагины

  • Floxy UI — веб-интерфейс для визуализации и управления процессами, больше подходит для dev-среды.

  • Плагины для GoLand и VS Code предоставляют визуализацию workflow по Go-коду: автоматически строят PlantUML или Mermaid-диаграммы, что удобно для документирования и ревью бизнес-процессов.

Так выглядит отображение workflow в GoLand с плагином:

Таким образом, Floxy — это уже не просто OpenSource библиотека, а цельная экосистема инструментов, позволяющая проектировать, визуализировать и выполнять сложные workflow в распределённых системах.


Если вы дочитали до этого места, то, вероятно, вам интересны не только возможности Floxy, но и то, как всё это работает внутри. Значит, вы входите в ту часть аудитории, которая любит разбираться в механизмах и архитектурных решениях — и для вас подготовлен расширенный раздел.

Далее мы разберём принципы исполнения и те решения, которые определили дизайн библиотеки, которые обычно остаются «за кадром» библиотечной абстракции.

Если же вам достаточно обзорной части и вы хотите перейти сразу к итогам — можете смело перемещаться к Заключению или ознакомиться непосредственно с проектом по ссылке на GitHub: github.com/floxy-project/floxy.


Ключевые принципы дизайна

1. PostgreSQL как единственная зависимость

Философия: Минимизация инфраструктурных зависимостей.

Мотивация

Большинство workflow-движков требуют комбинацию из нескольких компонентов: Redis или RabbitMQ для очереди задач, PostgreSQL для хранения состояния, отдельный сервис для распределенных блокировок. Floxy использует только PostgreSQL для всех этих задач.

Это упрощает развертывание: вместо 3-4 сервисов нужна только одна база данных. Меньше точек отказа, проще процедуры резервного копирования и восстановления.

Как это реализовано

Механизм очереди:

-- Извлечение задачи с учетом приоритетов и времени ожидания (priority aging)
WITH next_item AS (
  SELECT id
  FROM workflows.workflow_queue
  WHERE scheduled_at <= NOW() AND attempted_at IS NULL
  ORDER BY 
    LEAST(100, priority + FLOOR(EXTRACT(EPOCH FROM (NOW() - scheduled_at)) * 0.5)) DESC,
    scheduled_at ASC
  LIMIT 1
  FOR UPDATE SKIP LOCKED
)
UPDATE workflows.workflow_queue
SET attempted_at = NOW(), attempted_by = $workerID
FROM next_item
WHERE workflows.workflow_queue.id = next_item.id
RETURNING *

Хранение состояния:

  • workflow_instances — жизненный цикл workflow

  • workflow_steps — история выполнения шагов

  • workflow_events — журнал аудита

  • workflow_join_state — координация fork/join конструкций

Распределенные блокировки:

  • FOR UPDATE SKIP LOCKED вместо блокировок в Redis

  • attempted_by + attempted_at для отслеживания владельца задачи

  • Автоматическое освобождение через ReleaseQueueItem()

Преимущества

  • Единая точка конфигурации

  • ACID-гарантии из коробки

  • Упрощенный мониторинг (один источник данных)

  • Встроенные механизмы резервного копирования

  • Меньше сетевых задержек (нет межсервисных вызовов)

Компромиссы

  • PostgreSQL становится узким местом при очень высоких нагрузках

  • Требуется правильная настройка пула соединений

  • Обслуживание индексов критично для производительности

2. Транзакция на каждый шаг

Философия: Каждый шаг — атомарная единица работы.

Архитектура

func (engine *Engine) ExecuteNext(ctx context.Context, workerID string) error {
    return engine.txManager.ReadCommitted(ctx, func(ctx context.Context) error {
        // 1. Извлечение из очереди
        item := store.DequeueStep(ctx, workerID)
        
        // 2. Выполнение шага
        output, err := handler.Execute(ctx, stepCtx, input)
        
        // 3. Обновление состояния
        store.UpdateStep(ctx, stepID, status, output, errMsg)
        
        // 4. Постановка следующих шагов в очередь
        store.EnqueueStep(ctx, instanceID, nextStepID, priority, delay)
        
        // 5. Удаление из очереди
        store.RemoveFromQueue(ctx, queueID)
        
        // Все или ничего
        return nil
    })
}

Почему это важно

Консистентность: Если обработчик упал — происходит откат транзакции, и шаг остается в очереди. Если база данных недоступна — никаких частичных обновлений. Невозможна ситуация, когда шаг помечен выполненным в базе, но следующий шаг не поставлен в очередь.

Идемпотентность:

// Повторное выполнение безопасно благодаря IdempotencyKey
step := &WorkflowStep{
    IdempotencyKey: uuid.NewString(), // уникален для каждой попытки
    RetryCount:     0,
}

// При повторе того же шага:
// - Тот же IdempotencyKey
// - RetryCount увеличивается
// - Обработчик может проверить дедупликацию

Восстановление: Падение воркера приводит к автоматическому откату транзакции. Элемент очереди освобождается, и другой воркер может его забрать. Нет "осиротевших" состояний.

Уровень изоляции

// ReadCommitted для операций
txManager.ReadCommitted(ctx, func(ctx context.Context) error {
    // Видим только зафиксированные данные
    // Минимальная конкуренция за блокировки
})

Границы транзакций

Что внутри транзакции:

  • Извлечение шага из очереди с захватом блокировки на уровне строки

  • Загрузка определения workflow

  • Выполнение обработчика

  • Обновление статуса шага

  • Уведомление join-узлов

  • Постановка следующих шагов в очередь

  • Логирование событий

Что снаружи:

  • Выполнение обработчика может делать внешние вызовы (API и т.д.)

  • Но изменение состояния происходит только после успешного возврата

Компромиссы

Внешние побочные эффекты не откатываются. Решение: идемпотентность в обработчиках плюс компенсация.

Накладные расходы на транзакцию для каждого шага. Решение: пулинг соединений и воркеров.

3. Неблокирующая обработка очереди через SKIP LOCKED

Философия: Используем эксклюзивные блокировки, но избегаем ожидания их освобождения.

Используемое решение во Floxy: SKIP LOCKED

-- Неблокирующие пессимистичные блокировки (оптимально):
SELECT * FROM queue 
WHERE status = 'pending' 
LIMIT 1 
FOR UPDATE SKIP LOCKED;  -- Пропускаем заблокированные строки

-- Воркер A берет и блокирует строку 1
-- Воркер B видит что строка 1 заблокирована, пропускает её, берет строку 2
-- Воркер C видит что строки 1,2 заблокированы, пропускает их, берет строку 3
-- Максимальный параллелизм

SKIP LOCKED не делает пессимистичную блокировку оптимистичной. Это оптимизация поведения при конкуренции: блокировка остается пессимистичной (эксклюзивной), но обработка конкуренции становится неблокирующей.

Результат: преимущества обоих подходов — нет конфликтов (как у пессимистичных) и нет ожидания (как у оптимистичных), что дает максимальную пропускную способность.

Сценарии использования

Сценарий 1: Нормальная работа

Воркер A: DequeueStep() — получает step_1 (заблокирован)
Воркер B: DequeueStep() — пропускает step_1, получает step_2
Воркер C: DequeueStep() — пропускает step_1,2, получает step_3

Сценарий 2: Падение воркера

Воркер A: взял step_1, упал
Откат транзакции
Воркер B: DequeueStep() — получает step_1 (уже разблокирован)

Сценарий 3: Cancel workflow

Воркер A: выполняет step_1 (заблокирован)
Воркер B: CancelWorkflow() — GetActiveStepsForUpdate()
          — пропускает step_1 (заблокирован воркером A)
          — останавливает другие шаги
Воркер A: завершает выполнение — проверяет отмену — откатывается

4. Расширяемость через систему плагинов

Философия: Доступность расширения через хуки.

Архитектура плагинов

type Plugin interface {
    Name() string
}

type WorkflowStartPlugin interface {
    BasePlugin
    OnWorkflowStart(ctx context.Context, instance *WorkflowInstance) error
}

type StepCompletePlugin interface {
    BasePlugin
    OnStepComplete(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error
}
// и так далее для других хуков

Точки расширения

Уровень workflow:

  • OnWorkflowStart — перед первым шагом

  • OnWorkflowComplete — после успешного завершения

  • OnWorkflowFailed — при неудаче workflow

Уровень шага:

  • OnStepStart — перед выполнением обработчика

  • OnStepComplete — после успешного шага

  • OnStepFailed — при неудаче шага

Уровень отката:

  • OnRollbackStepChain — при компенсации

Поток выполнения

// В engine.executeStep():
func (engine *Engine) executeStep(...) error {
    // 1. Хук плагина ДО
    if engine.pluginManager != nil {
        if err := engine.pluginManager.ExecuteStepStart(ctx, instance, step); err != nil {
            return fmt.Errorf("plugin hook failed: %w", err)
        }
    }
    
    // 2. Основная логика
    output, err := handler.Execute(ctx, stepCtx, input)
    
    // 3. Хук плагина ПОСЛЕ
    if err != nil {
        engine.pluginManager.ExecuteStepFailed(ctx, instance, step, err)
    } else {
        engine.pluginManager.ExecuteStepComplete(ctx, instance, step)
    }
}

Примеры использования плагинов

Плагин метрик:

type MetricsPlugin struct {}

func (p *MetricsPlugin) OnStepStart(ctx, instance, step) error {
    metrics.Counter("floxy.step.started", tags("workflow", instance.WorkflowID))
    return nil
}

func (p *MetricsPlugin) OnStepComplete(ctx, instance, step) error {
    duration := step.CompletedAt.Sub(*step.StartedAt)
    metrics.Histogram("floxy.step.duration", duration)
    return nil
}

Плагин аудита:

type AuditPlugin struct {
    logger *slog.Logger
}

func (p *AuditPlugin) OnWorkflowStart(ctx, instance) error {
    p.logger.Info("workflow started", 
        "instance_id", instance.ID,
        "workflow", instance.WorkflowID,
        "input", instance.Input)
    return nil
}

Менеджер плагинов

type PluginManager struct {
    plugins []Plugin
}

func (pm *PluginManager) ExecuteStepStart(...) error {
    for _, plugin := range pm.plugins {
        if p, ok := plugin.(StepStartPlugin); ok {
            if err := p.OnStepStart(ctx, instance, step); err != nil {
                // Логируем, но не прерываем весь workflow
                slog.Warn("plugin failed", "plugin", plugin.Name(), "error", err)
            }
        }
    }
    return nil
}

Список реализованных плагинов

На момент написания статьи были реализованы следующие плагины:

для REST API:

  1. cancel - endpoint для отмены выполнения workflow

  2. abort - endpoint для аварийного завершения workflow

  3. cleanup - endpoint для очистки устаревших инстансов workflow

  4. dlq - endpoint для возврата workflow, находящегося в DQL, с измененным input

  5. human-decision - endpoint для подтверждения или отклонения workflow, ожидающего решения от пользователя

для движка:

  1. metrics - сбор prometheus метрик

  2. telemetry - сбор телеметрии OpenTelemetry\Jaeger

  3. notifications - отправка нотификаций при изменении состояния workflow\шага

  4. validate - применение кастомных правил валидации к input данным шагов

  5. audit - запись аудит-лога

  6. rollback-depth - отслеживание глубины выполнения rollback, реализован специально для chaos-тестирования

Почему это важно

Преимущества:

  • Ядро остается не перегруженным и понятным

  • Каждая функция — opt-in через плагин

  • Легко тестировать ядро без плагинов

  • Сторонние интеграции без форка кода

Без плагинов: Пришлось бы добавлять в ядро: метрики, логирование, webhook'и, circuit breakers... Ядро раздувается, появляется связность с внешними сервисами.

С плагинами: Ядро содержит только логику выполнения workflow. Всё остальное — через плагины. Можно комбинировать в конструкторе.

5. Распределенные воркеры без состояния

Философия: Воркеры не хранят состояние, всё в PostgreSQL.

Архитектура без состояния

type Engine struct {
    // Разделяемое состояние (read-only после инициализации):
    txManager     TxManager
    store         Store
    handlers      map[string]StepHandler  // Реестр, не меняется
    pluginManager *PluginManager
    cancelContexts map[int64]map[int64]context.CancelFunc  // Отмена в памяти
    
    // Чего нет:
    // - состояние workflow
    // - прогресс шагов
    // - элементы очереди
    // - состояние join-узлов
}

Модель масштабирования

Горизонтальное масштабирование:

PostgreSQL
    ├── Воркер 1 (цикл ExecuteNext)
    ├── Воркер 2 (цикл ExecuteNext)
    ├── Воркер 3 (цикл ExecuteNext)
    └── Воркер N (цикл ExecuteNext)

Каждый воркер:

  1. DequeueStep() — берет работу

  2. Выполняет обработчик

  3. Обновляет состояние

  4. Повторяет цикл

Координация не нужна:

  • Нет выбора лидера

  • Нет синхронизации состояния между воркерами

  • Нет gossip-протокола

  • PostgreSQL — единственная точка синхронизации

Распределенный реестр обработчиков

Проблема:
У воркера A есть обработчик "sendEmail"
У воркера B нет обработчика "sendEmail"
Как избежать ошибок?

Решение: задержка при отсутствии обработчика

func (engine *Engine) executeStep(...) error {
    // Проверяем, зарегистрирован ли обработчик локально
    engine.mu.RLock()
    _, hasHandler := engine.handlers[stepDef.Handler]
    engine.mu.RUnlock()
    
    if !hasHandler {
        // Освобождаем элемент очереди для другого воркера
        delay := engine.jitteredCooldown()  // 1s +/- 20%
        store.RescheduleAndReleaseQueueItem(ctx, queueID, delay)
        
        // Логирование с троттлингом
        if engine.shouldLogSkip(logKey) {
            store.LogEvent(ctx, instanceID, nil, EventStepSkippedMissingHandler, ...)
        }
        
        return nil  // Не помечаем шаг как failed
    }
    
    // Обработчик есть — выполняем
    return handler.Execute(ctx, stepCtx, input)
}

Как работает:

t=0: Воркер A извлекает шаг "sendEmail"
     — нет обработчика
     — откладывает на t=1s
     — освобождает
     
t=1s: Воркер B извлекает шаг "sendEmail"
      — есть обработчик
      — выполняет

Задержка с джиттером:

func (engine *Engine) jitteredCooldown() time.Duration {
    base := 1 * time.Second
    jitterPct := 0.2  // +/- 20%
    
    // Случайное значение [-0.2, +0.2]
    delta := (rand.Float64()*2 - 1) * jitterPct
    
    // 1s * (1 +/- 0.2) = [0.8s, 1.2s]
    return time.Duration(float64(base) * (1 + delta))
}

Зачем джиттер? Избегаем эффекта thundering herd:

Без джиттера:
t=0: 10 воркеров пытаются взять шаг
t=1s: все 10 снова пытаются одновременно

С джиттером:
t=0: 10 воркеров пытаются взять шаг
t=0.8-1.2s: воркеры повторяют попытку в разное время

Восстановление после падения воркера

Сценарий:

t=0: Воркер A извлекает step_1
     BEGIN TRANSACTION
     Выполняет обработчик... (занимает 5 секунд)
     
t=3: Воркер A падает

t=3: PostgreSQL: откат транзакции
     attempted_at = NULL
     attempted_by = NULL
     
t=4: Воркер B извлекает step_1
     Начинает заново

Требование идемпотентности:

// Обработчик должен быть готов к повтору
type Handler interface {
    Execute(ctx, stepCtx, input) (output, error)
}

// stepCtx.IdempotencyKey() можно использовать для дедупликации

Отмена в распределенной среде

Задача:
Воркер A выполняет шаг
Воркер B хочет отменить workflow
Как синхронизировать?

Решение: таблица запросов отмены + фоновый воркер

// Воркер B:
func (engine *Engine) CancelWorkflow(ctx, instanceID, ...) error {
    // Просто создаем запись
    store.CreateCancelRequest(ctx, &WorkflowCancelRequest{
        InstanceID: instanceID,
        CancelType: CancelTypeCancel,
    })
    // Не ждем завершения
}

// Воркер A (фоновая горутина):
func (engine *Engine) cancelRequestsWorker() {
    ticker := time.NewTicker(100 * time.Millisecond)
    for {
        select {
        case <-ticker.C:
            engine.processCancelRequests()
        }
    }
}

func (engine *Engine) processCancelRequests() {
    // Проверяем активные workflow
    for _, instanceID := range engine.cancelContexts {
        req := store.GetCancelRequest(ctx, instanceID)
        if req != nil {
            // Отменяем контекст для этого instance
            cancelFunc()
        }
    }
}

В цикле выполнения:

func (engine *Engine) executeStep(...) error {
    // Регистрируем контекст отмены
    handlerCtx, cancel := context.WithCancel(ctx)
    engine.registerInstanceContext(instanceID, stepID, cancel)
    defer engine.unregisterInstanceContext(instanceID, stepID)
    
    // Выполняем
    output, err := handler.Execute(handlerCtx, stepCtx, input)
    
    // Проверяем, была ли отмена во время выполнения
    if errors.Is(handlerCtx.Err(), context.Canceled) {
        req := store.GetCancelRequest(ctx, instanceID)
        if req != nil {
            return engine.handleCancellation(...)
        }
    }
}

Балансировка нагрузки

Естественная балансировка через очередь:

  • Нет sticky-маршрутизации

  • Нет назначения партиций

  • Каждый воркер просто берет следующий доступный элемент

Обработка приоритетов:

ORDER BY priority DESC, scheduled_at ASC

Элементы с высоким приоритетом обрабатываются первыми, но все воркеры видят одну и ту же очередь.

Priority Aging:

LEAST(100, priority + FLOOR(wait_seconds * 0.5))

Элементы с низким приоритетом постепенно поднимаются, что предотвращает голодание.

Стратегии развертывания

Blue-Green:

СТАРЫЕ: Воркеры 1-5 с обработчиками {A, B, C}
НОВЫЕ: Воркеры 6-10 с обработчиками {A, B, C, D}

Развертывание НОВЫЕ — оба пула работают параллельно
Остановка СТАРЫЕ — плавное завершение

Canary:

90% трафика — СТАРЫЕ воркеры
10% трафика — НОВЫЕ воркеры (с новой версией обработчика)

Мониторинг ошибок — если OK, увеличиваем до 100%

Rolling:

Остановка Воркер 1 — Развертывание новой версии — Запуск Воркер 1
Остановка Воркер 2 — Развертывание новой версии — Запуск Воркер 2
...

Преимущества:

  • Простое масштабирование (просто добавь воркеров)

  • Нет накладных расходов на синхронизацию состояния

  • Автоматическое восстановление после падений

  • Нет сценариев split-brain

Ограничения: PostgreSQL — единственная точка конкуренции. Решения: реплики для чтения, партиционирование для очень больших нагрузок.

Реестр обработчиков должен быть согласованным. Решения: версионирование определений workflow, плавное устаревание обработчиков.

6. Двухуровневое обеспечение консистентности при rollback - Defense in Depth Rollback

Параллельные ветви (Fork/Join) создают классическую гонку: один шаг уже успел завершиться, а другой в параллельной ветке упал и инициировал rollback. Если следующий шаг уже был взят воркером до того, как движок попытался остановить ветку — он всё равно выполнится, что приводит к неконсистентному состоянию (workflow в статусе failed, но есть шаг в статусе completed).

Floxy решает проблему через стратегию Defense in Depth — сочетание двух уровней защиты:

Уровень 1 — Preventive (превентивный)

Сразу после failure движок:

  1. находит соответствующий fork,

  2. помечает все pending шаги параллельных веток как skipped,

  3. останавливает их до начала rollback.

Этот механизм снижает вероятность гонки, но не устраняет её полностью — воркер мог уже начать выполнение шага.

Уровень 2 — Reactive (реактивный)

Если несмотря на превентивную остановку какие-то шаги всё же успели завершиться, движок выполняет «реактивный откат»:

  1. определяет последний savepoint,

  2. находит все completed шаги, созданные после него,

  3. ставит их компенсации в очередь (в обратном порядке),

  4. завершает workflow только после выполнения компенсаций.

Таким образом, движком достигается eventual consistency.

Для вложенных Fork внутри Fork выполнение rollback на данный момент не поддерживается.

Почему это не workaround

Стоит подчеркнуть, что проблема «overshoot completion» (когда шаг успевает завершиться после того, как другая ветка уже упала и инициировала rollback) — это фундаментальное свойство всех распределённых workflow-систем:

  • после выдачи шага worker-у его невозможно гарантированно остановить;

  • между моментом планирования шага и фактическим исполнением всегда есть временной интервал;

  • шаг может завершиться «слишком поздно», уже после начала rollback;

  • корректная модель — выполнение компенсаций post-factum.

Именно поэтому двухуровневый rollback (Preventive + Reactive) — это не обходной путь, а  корректная реакция на неизбежные гонки исполнения.

Резюме: философия дизайна

Ключевые принципы:

  1. Простота > Возможности: Один PostgreSQL вместо множества сервисов

  2. Транзакции > Координация: ACID-гарантии вместо распределенного консенсуса

  3. Неблокирующие операции > Блокировки: SKIP LOCKED вместо очередей ожидания

  4. Композиция > Монолит: Плагины вместо раздутого ядра

  5. Без состояния > С состоянием: Управление через базу данных вместо координации в памяти

  6. Многоуровневая защита > Атомарный контроль: Превентивные и реактивные механизмы обеспечивают корректность Fork/Join при неизбежных гонках

Эти принципы обеспечивают:

  • Простую операционную модель

  • Предсказуемое поведение

  • Легкую отладку

  • Надежное восстановление


Заключение

Floxy решает ту же задачу, что и крупные оркестраторы, но с философией библиотечного подхода, свойственной Go: минимум абстракций, максимум контроля.

Он реализует Saga-паттерн с оркестрацией, поддерживает компенсации, условия, параллелизм и интерактивные шаги — при этом остаётся лёгким, прозрачным и встраиваемым.

github.com/floxy-project/floxy

P.S. Статья и так получилась достаточно объёмной, поэтому многие детали «внутренней кухни» остались за рамками. Если вам интересно углубиться в архитектуру Floxy, я подготовлю отдельный материал — о том, как реализованы Fork/Join, как определяется набор терминальных шагов для синхронизации в Join, как устроен rollback и другие внутренние механизмы.

Комментарии (0)