В компаниях разного масштаба часто складываются решения, которые время от времени требуют пересмотра из‑за неэффективности с точки зрения затрат на поддержку, проблем с производительностью, масштабируемостью, узкого места для багов и ещё ряда причин. Этот цикличный процесс эволюции иногда требует рефакторинга архитектуры.
И на каждой такой итерации стоит задача в том, чтобы заложить фундамент с потенциалом для покрытия существующих потребностей на ближайшие несколько лет, при этом затратив минимально достаточное количество ресурсов.
Стоит отметить, что на практике, в зависимости от политики компании, с разной степенью лояльности относятся к процессу внедрения новых решений. Там, где это происходит труднее, в приоритет ставится простой, понятный и в то же время эффективный подход.
Так, 3 года назад мы делали MVP, для которого был необходим механизм для запуска задач по крону. На старте мы имели:
API сервис на Scala + ZIO;
NiFi v1.24;
базы данных ClickHouse, PostgreSQL.
Экспертизы по NiFi у нас было немного, а флоу задач был не таким тривиальным, как, например, переложить JSON из одного места в другое, для чего NiFi идеально подходит, а нужно было в процессе хитро агрегировать данные, и, к тому же, требование к процессу CD — изменения настроек запуска задач должны производиться через базу данных.
Как итог, мы написали свой Scheduler на ZIO, так как это было быстрее и, как нам тогда казалось, надёжнее.
Базы данных
Для ясности стоит уточнить, что ClickHouse содержит базы для каждого потребителя: foo
, bar
. Добавление нового потребителя заключается в создании базы идентичной структуры, а централизованного хранения информации о потребителях нет. Каждый потребитель имеет идентичную DDL‑схему, в том числе таблицу job_settings
, которая содержит информацию о запуске задач: cron
и job_type
. Scheduler содержит у себя информацию об этих потребителях и при запуске обращается к нужной базе за списком задач.
В PostgreSQL расположена сущность locks
, которая нужна для распределённой блокировки в нашем Scheduler.
Первые сложности
Почти сразу мы столкнулись с проблемами самописного планировщика. И их можно ранжировать от критичных до неудобных так:
сложность дебага на проде;
проблемы распределённой блокировки — планировщик работает в нескольких инстансах, и, несмотря на транзакционность, нет‑нет да и приходится чинить механизм блокировки;
необходимость доработки самого механизма шедулинга — например, динамическое подтягивание новых задач в расписание;
появилось окно для багов — с развитием сервиса растёт сложность поддержки всего проекта, увеличивается влияние человеческого фактора.
Думаю, этого достаточно, чтобы начать искать новое решение, но вместо того, чтобы «всё выкинуть и переписать заново», давайте подумаем, как можно было бы исправить ситуацию, не меняя подход:
писать больше тестов;
выделить оркестратор для таск‑менеджмента;
для каждого типа задач сделать независимый воркер;
настроить общение оркестратора и воркеров через месседж‑брокер, либо сделать воркер-пулл с распределением задач.
Это решит часть проблем, но породит новые:
менеджерить оркестратор и воркеры станет сложнее, и это потребует дополнительных трудозатрат;
на каждый тип задачи заводить отдельный воркер невыгодно. Тогда задачи придётся группировать и создавать групповые воркеры. Тут возникают вопросы... А по какому принципу группировать? И не столкнёмся ли мы в будущем со свалкой в коде, с которой всё равно нужно будет что‑то делать?
написание большого количества тестов не всегда помогает избежать багов, а зачастую, наоборот, они только тормозят процесс, и большая часть времени тратится на подгонку тестов под текущее решение. Тесты нужны там, где они действительно нужны, а именно в чувствительных местах. Предсказать их необходимость в месте, где появится баг, — то же самое, что не учесть специфичный кейс в самих тестах.
Первый этап. Поиск решения.
Перед поиском решения определимся с критериями, удовлетворяющими наши требования:
из коробки должен быть доступен процесс запуска задач;
динамическое планирование — при появлении новых задач они должны добавляться в расписание;
прозрачный процесс выполнения — удобство дебага, так как его иногда приходится делать на проде;
поддержка распределения нагрузки между узлами;
уход от упоминания потребителей в коде к их централизованному описанию;
поддержка триггерного запуска задач;
минимальное влияние на инфраструктуру — использовать уже имеющиеся технологии без привлечения DevOps.
Первое, что приходит на ум в данной ситуации, — перенести построение рабочих процессов в Prefect или Airflow, но из доступных у нас — только кластерный NiFi, который концептуально не совсем подходит под наши запросы, но мы попробуем что‑нибудь с этим сделать.
Наше решение будет содержать 3 верхнеуровневые процесс‑группы: state manager
, task trigger
, task executor
.

Менеджер состояния

Мы хотим динамически хранить метаинформацию о потребителях, которая собирается из баз данных consumer_foo
, consumer_bar
, и не обращаться к БД за ними каждый раз, когда нам понадобится список актуальных записей.
Соответственно, при перезапуске NIFI прогреваем кэш: раз в минуту проверяем наличие там записи, если её нет — обогащаем, и раз в час актуализируем.
Триггер задач

Процесс‑группа для избежания гонок запускается на Primary Node (Scheduling Strategy: Primary node only)
, и раз в минуту из таблицы job_settings
по каждому потребителю достаёт cron и сравнивает его с текущим временем. Если время не подошло — FlowFiles скипаются, иначе подаются на вход в процесс-группу исполнителя задач.
Здесь основной момент в процессоре "ExecuteScript": происходит проверка совпадения cron и текущего времени. Для сравнения мы используем библиотеку cronutils и привычный для NiFi формат записи крона QUARTZ.
import java.time.ZonedDateTime
import groovy.json.JsonSlurper
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.parser.CronParser
import com.cronutils.model.time.ExecutionTime
import static com.cronutils.model.CronType.QUARTZ
def flowFile = session.get()
if(!flowFile) return
try {
def cronExpression = ''
session.read(flowFile) { inputStream ->
try {
// Парсим json
def json = new JsonSlurper().parseText(inputStream.getText('UTF-8'))
cronExpression = json.cron?.toString()
} catch (e) {
log.warn("Could not parse JSON from FlowFile content for ${flowFile.id}", e)
}
}
// Если cron пустое или отсутствовало в JSON, отправляем в failure
if(!cronExpression || cronExpression.trim().isEmpty()) {
log.warn("CRON expression is null or empty for FlowFile ${flowFile.id}. Routing to failure.")
session.transfer(flowFile, REL_FAILURE)
return
}
// Мы используем QUARTZ, это совпадает с планировщиком Nifi
def cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(QUARTZ)
def parser = new CronParser(cronDefinition)
// Парсим cron из тела FlowFile
def quartzCron = parser.parse(cronExpression.trim())
// Создаем ExecutionTime, который умеет делать вычисления по cron
def executionTime = ExecutionTime.forCron(quartzCron)
// соответствует ли текущее время нашему крону?
if (executionTime.isMatch(ZonedDateTime.now())) {
session.transfer(flowFile, REL_SUCCESS)
} else {
session.transfer(flowFile, REL_FAILURE)
}
} catch(e) {
log.error("Failed to process cron for FlowFile ${flowFile.id}", e)
session.transfer(flowFile, REL_FAILURE)
}
Важно заметить, что cron должен игнорировать секунды (т.е. соответствовать синтаксису -- «в любую секунду»), так как запуск может происходить в начале минуты в любую секунду, поэтому мы пренебрегаем этой погрешностью.
Исполнитель задач

Перенаправляет задачи по атрибуту job_type
на соответствующий Input Port процесс‑группы.
Распределение при этом нагрузки между нодами кластера происходит по стратегии Round Robin
в настройках очереди split
после разделения списка задач на отдельный FlowFiles в процессоре SplitJson.
Перенос логики воркеров.
Логика таски может быть нетривиальной, и даже для реализации простой операции может потребоваться несколько процессоров. Частично это решается Groovy‑скриптами или самописными процессорами.
С последними всё сложнее: у нас сейчас работает 3 таких легаси‑процессора, и, когда с ним что‑то случается, то это часто «чёрный ящик». К примеру, недавно мы столкнулись с кэшированием пропертей — по непонятной причине NiFi стал кэшировать свойства самописных процессоров и не сбрасывать их до пересоздания процессора. Сложность реализации многопоточной обработки — нет встроенных инструментов, нужно самостоятельно писать потокобезопасные обвязки. Но, даже если по какой‑то причине Groovy‑скриптов недостаточно и нужен самописный процессор, то желательно придерживаться правила: процессор должен решать одну конкретную задачу, в дальнейшем это сильно упростит дебаг.
Бывают ситуации, при которых несколько процессоров можно заменить одним небольшим скриптом — в большой процесс‑группе это сильно упрощает понимание всего процесса.
Итоги первого этапа.
Итак, данная реализация имеет явные ограничения:
Кэширование данных слишком частое; иначе при перезапуске NiFi с пустым кэшем задачи не запустятся.
Невозможность отделить механизм триггера от выполнения.
Невозможность запуска по триггеру из внешних сервисов.
Из преимуществ — простота реализации, которая служит фундаментом для дальнейшей модификации.
Отдельно стоит упомянуть про то, что логи здесь собираются из любого уровня вложенности через Output Port
и прокидываются на самый верхний уровень, где обрабатываются соответствующим процессором.
Второй этап. Использование месседж‑брокера.
Мы можем сделать менее связный процесс с помощью месседж‑брокеров, разберём на примере Kafka.

В процесс‑группу state_manager добавляем топик с параметром cleanup.policy=compact
, который будет содержать всего один ключ и обновляться раз в час.

Task trigger теперь вместо отправки задач в Output Port пишет в топик очереди задач.

Task executor забирает задачу из топика и так же распределяет по процесс‑группам.
Уже выглядит лучше. Этот подход позволяет нам:
плавно переехать на соседний NiFi при апгрейде версии.
Задачи можно триггерить из любого сервиса, запушив нужный JSON в топик.
хранение динамически меняющейся метаинформации о потребителях в Kafka с доступом из других сервисов и синхронизация кэша с топиком.
Использование брокера сообщений решает нашу задачу, но это не подходит под требование минимального влияния на инфраструктуру.
Третий этап. Финальная реализация
Вместо месседж‑брокера мы можем использовать:
ListenHTTP processor
. Плюсы: простота. Минусы — нужно открывать дополнительные порты в контейнере, что нарушает требование о влиянии на инфраструктуру.Remote Process Group + Remote Connections Ports
. Плюсы: минимальное влияние на инфраструктуру. Минусы: оверхед, но в рамках допустимого. На первый взгляд этот вариант нам подходит.
Менеджер состояния.

Для получения state из внешних сервисов реализуем асинхронный запрос на входной порт и отдельный запрос для получения ответа. На Input Port отправляется триггер FlowFile, по которому забираются данные из кэша и отправляется на Output Port, который слушает наш сервис. Все Input/Output ports, что смотрят наружу, должны иметь настройку Receive from: Remote connections (site-to-site)
.
Исполнитель задач.

В нашем случае NiFi будет отправлять очередь задач сам в себя и иметь открытый порт для удалённых соединений (не физический, а абстрактный), поэтому принимающая процесс‑группа -- task_executor, должна иметь Input Port c возможностью удаленных подключений. Теперь мы можем отправлять триггер на запуск задачи из разных инстансов NiFi или внешних сервисов.
Триггер задач

В процесс‑группе триггера задач добавляем Remote Process Group
(RPG) и соединяем с ним выходной FlowFile.
RPG позволяет обмениваться очередями между Input/Output портами разных инстансов/кластеров NiFi. В самом RPG прописываем URL вида https://host:port/nifi
.
Если вы у себя локально развернули NiFi в Docker‑контейнере, то у вас, скорее всего, будет что‑то вроде: https://7865f4237219:8443/nifi
. При обращении к localhost
NiFi не находит соединение. Имя пода тоже выдаёт ошибку о том, что CN сертификата содержит доменные имена localhost
и альтернативный вариант, вот он нам и нужен, — в моём случае это 7865f4237219
. При желании можно выпустить свои сертификаты с именем пода в CN и подложить их в NiFi.
Готово, теперь наш флоу полностью собран. Опираясь на требования, давайте определим, какие задачи мы решили:
сам NiFi отвечает за запуск и распределение задач;
скрипт проверки совпадения крона с текущим временем обеспечивает динамическое планирование;
дебаг стал проще в силу прозрачности процесса выполнения на UI;
распределение нагрузки по нодам;
state_manager отвечает за сбор и хранение описания потребителей;
возможность запускать задачи из внешних сервисов через Remote Process Group;
минимальное влияние на инфраструктуру — мы не использовали дополнительных инструментов, всё реализовано в NiFi без привлечения DevOps.
Реализация имеет допущения, которыми мы пренебрегли в нашем процессе, это:
необходимость писать обёртку под NiFi для внешних сервисов;
небольшой оверхед в процесс‑группах по сравнению с архитектурой через месседж‑брокер.
Будем считать это платой за ограничения в требованиях.
Итоги
Вот такое получилось исследование. В NiFi есть много настроек, ручек, за которые можно подёргать и сделать нужный процесс. Но в нём есть два явных небольших минуса — устаревший интерфейс и зависание браузера, когда UI открыт долго в соседней вкладке (по крайней мере в версии 1.24; хотя уже есть 2.x, и там, возможно, это поправили), и один ощутимый — разработка и деплой кастомных процессоров.
Использовать NiFi или нет — зависит от конкретных условий/задач, но даже в нём можно сделать достаточно много.
Все 3 примера я выложил на свой GitHub, буду рад, если они послужат отправной точкой для ваших задач.
Если статья сэкономила вам время, натолкнула на своё решение или вы сталкивались с похожей задачей, напишите свой фидбэк в комментариях, будет интересно почитать.
Также буду признателен за рейт статьи и если вы посетите мой Журнал, в котором я выкладываю только то, что попадает мне в закладки.