Добро пожаловать в четвёртую и заключительную часть серии о новом Flowable Async Executor. До этого момента путь был довольно насыщенным:
В первой части были представлены новый Async Executor и связанные с ним базовые концепции, необходимые для понимания последующих публикаций.
Вторая часть подробно рассматривала детали и возможности настройки различных компонентов Async Executor.
В третьей части были представлены и обсуждены результаты бенчмарков, которыми мы остались очень довольны.
Однако остаётся один важный вопрос: как мы пришли к текущей реализации? Что подтолкнуло нас к этим изменениям и почему? Как мы нашли узкие места и использовали эти данные для создания лучшего подхода? И, учитывая, что первая версия появилась более десяти лет назад, как Async Executor эволюционировал, сохраняя обратную совместимость?
Именно этому посвящена эта часть. Мы воспользуемся возможностью оглянуться назад и вспомнить различные реализации, которые появлялись за это время. Мы выделили четыре поколения Async Executor и кратко рассмотрим каждое из них. Поскольку Flowable является форком Activiti, история начинается с первой версии Activiti (5.0.0).
Первое поколение (версии 5.0.0 – 5.16.0)
Первая реализация называлась тогда “Job Executor”. Она была прямым воплощением требований, описанных во второй части: задания должны были сохраняться, выбираться, помещаться во внутреннюю очередь, выполняться, а при ошибках — повторяться.
В те годы все типы заданий — обычные асинхронные задания, таймеры, приостановленные задания и deadletter-задания — хранились в одной таблице базы данных. Различие между типами происходило на уровне SQL-запросов: например, deadletter-задания — это те, у которых не осталось попыток, таймеры — задания с определённым типом и временем исполнения и т.д.
Выделенный поток (job acquisition thread) отвечал за выборку любого типа задания (асинхронного или таймера) и передавал их во внутреннюю очередь, откуда потоки выполнения брали их для исполнения.

Стоит обратить внимание на алгоритм, который использовался в те годы, потому что многие базовые концепции сохранились и сегодня, даже спустя десятилетие (что неудивительно, ведь даже тогдашний код строился на опыте предыдущих систем).
Получить страницу исполняемых заданий. Исполняемыми считаются те задания, параметры которых соответствуют определённым состояниям: тип, срок исполнения, признак приостановки и так далее. Можно представить, что SQL-запрос был довольно сложным, чтобы охватить все варианты.
-
Попытаться заблокировать каждое задание по очереди.
Это означало обновление полей lock owner и lock expiration time в строке таблицы заданий с помощью оптимистической блокировки (что может сбивать с толку, ведь это другая блокировка, не та, что подразумевается под «lock owner»).
Если возникала ошибка оптимистической блокировки, это означало, что другой экземпляр уже заблокировал задание. Такое возможно в многонодовой конфигурации.
Если ошибка не возникала, задание передавалось во внутреннюю очередь.
Если логика задания выполнялась без ошибок, задание удалялось.
Если при выполнении возникала ошибка, число попыток уменьшалось, а lock owner и время истечения блокировки очищались. Теперь это задание мог забрать другой (или тот же) исполнитель в следующем цикле выборки.
(Это упрощённое описание. На самом деле алгоритм содержал гораздо больше деталей.)
Также важно отметить, что добавление понятий lock owner и времени истечения блокировки на шаге 2 означало, что эти состояния нужно было учитывать и на шаге 1. Ведь job executor не должен был выбирать задания, которые уже заблокированы другим экземпляром. Поэтому в запросе приходилось сравнивать дату с lock expiration time, чтобы определить, не «зависло» ли задание (например, из-за сбоя сервера).
Как видно из описания, Job Executor изначально проектировался для многонодовой работы: прежде чем задание могло быть выполнено, оно должно было быть «привязано» к определённому экземпляру. Это гарантировало, что ни одно задание не будет выполнено дважды. Помните об этом: концепция «блокировки» или «привязки» задания к конкретному экземпляру Async Executor стала ключевой для оптимизации в самых новых реализациях.
Второе поколение (версии 5.17.0 – 6.1.0)
Хотя первое поколение Async Executor постепенно дорабатывалось с каждым новым релизом, его фундаментальная архитектура оставалась практически неизменной.
Однако в определённый момент пользователи начали приближаться к пределам возможностей этой архитектуры. Эксперименты и анализ бенчмарков позволили выявить основную проблему: запросы на получение заданий и связанных с ними данных. Было обнаружено, что при высокой нагрузке ответы от базы данных начинают сильно замедляться, а в некоторых случаях база даже переходит к полному сканированию таблицы вместо использования индексов. Причина заключалась в том, что все данные о заданиях хранились в одной таблице, а с развитием продукта и появлением новых сценариев запросы становились всё более сложными.
Решением стало разделение данных о заданиях по разным таблицам, каждая из которых обслуживала свой конкретный сценарий. Цель заключалась в максимальном упрощении запросов, чтобы база данных могла возвращать данные в приемлемое время. В результате таблица заданий была разделена на четыре отдельные таблицы: асинхронные задания, таймеры, приостановленные задания и deadletter-задания. Эти таблицы используются и в текущей реализации.

Очевидно, как это упростило ситуацию:
Размещение приостановленных заданий в отдельной таблице означало, что больше не нужно было проверять их состояние при выборке. Хотя это усложнило сам процесс приостановки (так как данные нужно было переносить), эффект от ускорения запросов значительно перевесил первоначальные затраты времени.
Аналогичная история с deadletter-заданиями: когда при выборке не нужно было учитывать, что некоторые задания могли завершиться с ошибкой, запросы упростились и стали быстрее.
Таймеры принципиально отличаются от асинхронных заданий. Таймеры могут находиться в таблице долгое время (даже годами), тогда как асинхронные задания означают, что работа должна быть выполнена немедленно. Выделение таймеров в отдельную таблицу позволило упростить запросы к асинхронным заданиям, так как проверка временной метки срока исполнения (due date) могла быть полностью исключена. Это означало, что таймеры нужно было выбирать иначе, чем асинхронные задания. Для этого появились два потока выборки — по одному на каждый тип.
Выборка таймера стала принципиально иной: когда таймерное задание выбирается (то есть наступает срок его исполнения), оно фактически преобразуется в асинхронное задание, что означает его удаление из таблицы таймеров и добавление в таблицу асинхронных заданий (что указывает на готовность к выполнению).
Все вышеперечисленное означало, что любая новая запись в таблице асинхронных заданий указывала на задание, готовое к немедленному выполнению. Фильтрация не требовалась (кроме проверки, не забрал ли задание другой экземпляр), и запрос стал простым и быстрым. По сути, это было похоже на появление нового сообщения в очереди сообщений: когда оно приходит, оно готово к обработке.
Третье поколение (версии 6.1.0 – 6.6.0)
Архитектура второго поколения Async Executor успешно удовлетворяла потребности пользователей Flowable в течение многих лет, в том числе при работе с высокими нагрузками. Третье поколение не было переосмыслением архитектуры, а представляло собой рефакторинг Async Executor в действительно переиспользуемый и независимый компонент.
Это было вызвано необходимостью поддержки «асинхронной истории» и создания надёжной основы для индексации данных в Elasticsearch в enterprise-продуктах Flowable. Подробное объяснение этих тем выходит за рамки этой статьи, но если кратко, Async Executor был выбран как производительный способ перемещения данных процессов и кейсов в фоне для повышения эффективности транзакций в основной базе данных.
Больше технических деталей о «асинхронной истории» можно найти здесь и в этом посте.
Вторым важным фактором стало появление множества движков в open-source и enterprise-продуктах Flowable. Изначально Flowable был только про исполнение BPMN. Однако за эти годы ситуация сильно изменилась: появилось много новых движков как в open-source, так и в enterprise-решениях. Некоторые из этих движков (в первую очередь CMMN engine, но также, например, content engine) нуждались в таймерах или асинхронных задачах, аналогично процессному движку.
В результате Async Executor был переработан так, чтобы его можно было легко интегрировать в любой движок. В этот период Spring Boot стал стандартом де-факто, поэтому стало важно уметь интегрироваться с низкоуровневыми механизмами таких сред (например, использовать стандартный TaskExecutor из Spring Boot). Были проведены масштабные рефакторинги, Async Executor получил собственные изолированные сервисы и отдельные модули Maven.
В результате появилось то, что мы считаем третьим поколением: переиспользуемый, независимый компонент, который теперь применяется для множества разных задач как в open-source-движках Flowable, так и в enterprise-продуктах.
Четвёртое и текущее поколение
Третье поколение Async Executor развивалось вместе с ростом требований к продуктам Flowable. Мы продолжали улучшать и дорабатывать его, пока не столкнулись с очередным архитектурным ограничением.
Первый шаг при оптимизации существующей системы — точно определить, где находятся узкие места. Как и в любом инженерном процессе, сначала нужно собрать данные и метрики. Из open-source сообщества и от заказчиков мы получили информацию, что при большом количестве заданий в базе данных (сотни тысяч и более) резко возрастает число ошибок оптимистической блокировки. Сами по себе такие ошибки не являются проблемой — напротив, они свидетельствуют о нормальной работе системы и ожидаемы. Разные экземпляры конкурируют за получение заданий: одни получают их, другие — нет, что и приводит к появлению ошибок оптимистической блокировки в логах.

Однако если количество ошибок оптимистической блокировки продолжает расти, а число заданий в базе не уменьшается или даже увеличивается, это явный признак наличия узкого места. Используя проект для бенчмарков, который мы описывали в предыдущей части, мы провели множество экспериментов и прототипов, подключая профилировщик. Иногда эти реализации не приводили к лучшим результатам, но они давали понимание того, как взаимодействуют различные компоненты в реальной конфигурации.
Уже в первых сериях экспериментов быстро проявилась одна закономерность: при «перегрузке» Async Executor сотнями тысяч заданий, которые нужно выполнить одновременно, поток выборки не успевает обеспечивать потоки выполнения новыми заданиями, особенно если экземпляров Async Executor много. Мы пробовали различные комбинации настроек, о которых говорили во второй части: увеличивали размер выборки, настраивали размер очереди и так далее. Но закономерность сохранялась, хотя по ходу мы находили небольшие улучшения. В самых оптимальных настройках, которые нам удалось подобрать, производительность достигала 1200–1500 заданий в секунду (на аналогичном оборудовании, что и в бенчмарках). Это видно на следующем скриншоте профилировщика (жёлтый цвет означает ожидание):

Во время запуска тестов сервер базы данных был практически полностью загружен, что явно не является желаемой ситуацией:

Серверы, на которых работал Flowable Async Executor, напротив, использовали лишь четверть своей вычислительной мощности:

Мы также заметили, что количество ошибок оптимистической блокировки увеличивалось линейно при добавлении новых экземпляров Async Executor. Снова подчеркнём: это не проблема, так и было задумано в архитектуре третьего поколения. В некоторых случаях возникали и исключения deadlock, которые на самом деле были замаскированными ошибками оптимистической блокировки. Поскольку этот вопрос уже не раз обсуждался на форуме, стоит отметить следующее:
Термин «deadlock» здесь не совсем корректен. На самом деле база данных внутренне удерживает блокировки строк, когда несколько транзакций выполняются одновременно — так обеспечивается принцип ACID. Однако если количество блокировок строк становится слишком большим (этот параметр настраивается во многих СУБД), база данных случайным образом «убивает» одну из транзакций, надеясь, что это снизит нагрузку и остальные транзакции смогут завершиться. В таких случаях в некоторых СУБД выбрасывается исключение deadlock. Технически это не настоящий дедлок: если бы времени было достаточно, база могла бы сама всё разрешить, классической взаимной блокировки не происходит. Однако многие СУБД используют такие тайм-ауты для обеспечения приемлемого времени отклика.
Главный вывод, который мы сделали при анализе данных: узким местом стала конкуренция за таблицы. То, что база данных тратила много CPU и возникало множество (игнорируемых) исключений, указывало на то, что база не справляется с большим количеством параллельных запросов и изменений. Особенно сильно это проявлялось на этапе выборки заданий (acquire phase). Настройка размера страницы выборки и размера очереди помогала лишь частично, не настолько, как мы рассчитывали.
Мы уже сталкивались с этим раньше: когда переходили от первого к второму поколению Async Executor и разделяли таблицы. Когда нагрузка на одну таблицу росла, пропускная способность снижалась. Разделение на четыре разные таблицы тогда помогло снизить давление. Мы пришли к выводу, что снова нужно найти способ уменьшить нагрузку на таблицы.
Попытка шардирования
Классическим решением при столкновении с конкурирующими параллельными потребителями одних и тех же данных является шардирование. В данном контексте шардирование означает, что каждое задание получает ключ шарда, и запросы при выборке заданий учитывают этот ключ. Таким образом, шард — это подмножество всех данных, определяемое этим ключом. Мы пробовали и другие прототипы, но стоит отдельно рассмотреть шардирование, так как это естественная реакция на описанную выше проблему.
Идея: дать каждому экземпляру Async Executor уникальный ключ шарда и разделить задания между ними, по одному ключу на инстанс. Для хранения ключа шарда мы использовали поле category, которое уже существовало (и добавили индекс):
Разделить миллион заданий на 4 разных набора, каждому присвоить свою категорию. То есть 250 000 заданий с категорией A, 250 000 с категорией B, 250 000 с категорией C и 250 000 с категорией D.
Изменить acquire-запросы так, чтобы они учитывали категорию, которая была жёстко задана на каждом экземпляре.
Запустить по одному экземпляру Async Executor на каждый набор, всего четыре.
Результаты эксперимента оказались разочаровывающими. Такой подход действительно дал небольшое увеличение пропускной способности при добавлении новых экземпляров, но лишь на 10–20% на экземпляр, и с каждым новым инстансом прирост уменьшался. Из плюсов — при этом подходе не возникало ошибок оптимистической блокировки и deadlock’ов. Так что мы поняли, что движемся в правильном направлении, пытаясь оптимизировать логику выборки.
Ещё один минус шардирования — необходимость выбирать ключ шарда при создании задания. Это можно реализовать, например, по кругу (round-robin), но тогда каждый экземпляр Flowable должен знать, какие другие узлы сейчас онлайн и какие ключи используются. Кроме того, если экземпляр выходит из строя, существующие задания нужно перераспределять между другими ключами. Оба эти вопроса потребовали бы довольно сложных изменений. А, как известно, сложные алгоритмы обычно приводят к сложным багам. Этот факт, в сочетании с результатами по производительности, заставил нас оставить эту идею на уровне прототипа и не развивать её дальше.
Тем не менее, эксперимент показал главное: мы были на правильном пути, пытаясь снизить конкуренцию за таблицы заданий. Шардирование действительно упростило запросы и значительно сократило количество блокировок строк в базе, но объём работы с одной таблицей всё равно оставался слишком большим.
Глобальная блокировка выборки (Global Acquire Lock)
Мы экспериментировали с разными решениями и в итоге остановились на том, что теперь называем стратегией Global Acquire Lock.
Вместо того чтобы экземпляры Async Executor конкурировали друг с другом за выборку заданий, теперь каждый поток выборки сначала должен получить доступ к глобальной блокировке, прежде чем сможет забирать задания. Если блокировка уже занята другим экземпляром Async Executor, поток выборки делает короткую паузу. В результате в такой схеме только один экземпляр Async Executor в конкретный момент времени занимается выборкой. Главное преимущество — теперь логику выборки можно серьёзно оптимизировать, так как не нужно учитывать, что другие экземпляры тоже могут пытаться заблокировать задания одновременно. Побочный эффект — блокировку теперь можно делать пакетно, без необходимости использовать оптимистическую блокировку, как раньше. Это также означает, что к таблицам заданий доходит гораздо меньше запросов.
Хотя это может показаться нелогичным (ведь мы фактически вводим «контролёра»), на практике производительность значительно возрастает (что подтверждается нашими бенчмарками), потому что:
Несмотря на то, что только один узел занимается выборкой заданий, сама выборка происходит гораздо быстрее за счёт пакетной обработки.
Снижение конкуренции за таблицы базы данных приводит к тому, что запросы выборки выполняются быстрее, так как таблице не нужно обрабатывать множество параллельных запросов.
Теперь можно забирать большие пакеты заданий без помех, что раньше было проблемой. Это гарантирует, что потоки-исполнители всегда обеспечены работой. Даже если частота выборки ниже, объём работы, получаемый за раз, больше, поэтому потоки не простаивают. Именно поэтому теперь настройки по умолчанию изменены так, чтобы забирать больше заданий за один цикл.
Ещё один плюс: «тяжёлая» операция преобразования таймерного задания в асинхронное теперь может выполняться отдельным пулом потоков. Это значительно ускоряет обработку таймеров.

Есть ещё две дополнительные причины, почему мы выбрали именно такую реализацию.
Во-первых, реализация получилась простой и не требует более сложных алгоритмов, например, выборов лидера (leader-election), которые мы тоже рассматривали. Когда речь идёт о параллелизме, простая реализация легче в поддержке, оптимизации и отладке при возникновении проблем.
Во-вторых, все API остаются прежними, и эта архитектура полностью совместима с предыдущими поколениями. Не требуется никакой миграции данных. На самом деле, этот режим можно включать и выключать с помощью флага (и даже менять на лету, без перезапуска). Все остальные альтернативы, которые мы анализировали, требовали миграции данных.
Итоги
Рефакторинг сложного компонента и проведение бенчмарков — всегда непростая задача. В этой серии материалов мы надеемся показать, что новая эволюция Flowable Async Executor — это значительный шаг вперёд по сравнению со всеми предыдущими версиями. Как показано, он способен обрабатывать тысячи заданий и таймеров в секунду. При этом обеспечивается обратная совместимость как на уровне API, так и на уровне данных.
Значения по умолчанию, которые теперь поставляются с Flowable open source (начиная с версии 6.7.0) и Flowable enterprise (с версии 3.9.0), были изменены в соответствии с результатами проведённых экспериментов. Конечно, настройки можно и нужно адаптировать под реальную нагрузку системы. Надеемся, мы также показали, что Flowable Async Executor — это гибкий, производительный и масштабируемый компонент.
Как и с предыдущими поколениями Async Executor, мы ожидаем, что текущую архитектуру можно будет ещё дорабатывать и совершенствовать, исходя из пользовательских сценариев и новых бенчмарков. Например, перспективным направлением может стать динамическая настройка размера выборки для разных потоков выборки или потоков выполнения. Это может быть реализовано с помощью формулы, учитывающей оставшуюся ёмкость очереди, среднее время выполнения последних x заданий, нагрузку на потоки-исполнители и так далее.
Ещё одна возможная область для исследований — различные реализации механизма блокировки. Например, использование распределённого сетевого решения для глобальной блокировки выборки вместо таблицы в базе данных.
Можно с уверенностью сказать: Flowable всегда стремится к максимальной производительности, не жертвуя богатой историей и обратной совместимостью, которой придерживается уже много лет.
Об авторе:
Joram Barrez Principal Software Architect
Ключевой разработчик Flowable с более чем десятилетним опытом работы с open source и построения масштабируемых процессных движков. Сооснователь проекта Activiti (на базе которого создан Flowable), а до этого был участником команды JBoss jBPM.

BPM Developers — про бизнес-процессы: новости, гайды, полезная информация и юмор.