
Меня зовут Андрей Серебрянский, и я люблю Apache Kafka. И гарантии доставки exactly once. И рассказывать обо всём этом на конференциях. Пять лет я строил платформы потоковой обработки данных в финтехе, а теперь вместе с командой работаю над YDB Topics: частью YDB (СУБД Яндекса), которая заменяет Apache Kafka в роли брокера сообщений. СУБД Яндекса уже некоторое время поддерживает Apache Kafka API. Недавно мы расширили этот API, добавив поддержку Kafka-транзакций.
Но наличия транзакций в брокере сообщений недостаточно для получения гарантий exactly once. Чтобы неудачно зависший или перезагрузившийся сервер не привёл к дублированию или потере сообщений, нужно хорошо понимать, как именно работают транзакции в брокерах сообщений вообще и в Apache Kafka в частности.
Эта статья будет полезна начинающим разработчикам и тем, кто хочет освежить знания или разобраться в тонкостях exactly once обработки данных с помощью YDB Topics или других брокеров сообщений.
Гарантии доставки данных при потоковой обработке
Я много раз слышал, что Apache Kafka, созданная LinkedIn для обработки аналитической информации, является одной из самых важных разработок 2010-х годов. Благодаря универсальности применения она стала одним из основных строительных блоков современного IT.
Её используют в разнообразных сценариях: для обмена сообщениями между микросервисами, сбора логов, метрик и трейсов, загрузки данных в DWH и во многих других.
И во всём этом многообразии использования постоянно приходится отвечать на одни и те же вопросы. Что будет с данными в случае сбоя в работе? Что, если зависнет или перезагрузится продюсер (записывающий сообщения сервис)? Или консьюмер (читающий сообщения сервис)? Или сама Apache Kafka? За годы сформировались три вида гарантий доставки:
At most once: данные можно терять, но нельзя дублировать. Такой подход часто используется в сборе событий с мобильных устройств, когда нужно минимизировать задержку и не ждать ответа от сервера.
At least once: данные нельзя терять, но можно дублировать. Самый популярный подход, при котором в случае отказов системы данные отправляются повторно до тех пор, пока не будет успешно получено подтверждение.
Exactly once: данные нельзя ни терять, ни дублировать. Такая архитектура применяется, например, при обработке платежей в банках.
YDB изначально создавалась для сценариев, где хранение терабайт данных и обработка тысяч финансовых транзакций в секунду должны выполняться с гарантиями ACID и exactly once.
Система может самостоятельно обеспечить гарантии exactly once только в том случае, если обработка данных целиком проходит внутри неё. Например, YDB может перемещать сообщения между топиками и таблицами с гарантиями exactly once в рамках одной базы данных, а Apache Kafka — между топиками в одном кластере.
Если же для обработки данных выполняется пользовательский код, то при записи клиентом двух одинаковых сообщений система не может определить, является ли такое дублирование ошибкой или нет. На практике обеспечение гарантий exactly once — это всегда комбинация кода, который пишут разработчики, и тех возможностей, которые предлагают SDK брокеров сообщений и СУБД.
В бизнес-критичных сценариях, например при обработке банковских транзакций, гарантии exactly once должны соблюдаться при отказе оборудования. Архитектура подобных решений чаще всего строится вокруг типовых сценариев работы с данными:
Stateless перемещение сообщений между топиками
Stateful перемещение сообщений между топиками
Загрузка сообщений в топик
Выгрузка сообщений из топика
Stateless перемещение сообщений между топиками
Один из самых популярных сценариев, которые я видел, — это перемещение сообщений между двумя топиками с гарантиями exactly once. Такой сценарий используется, например, при решардировании, фильтрации или в качестве индикатора выполнения задачи: сервис берёт задачу из входящего топика, выполняет её и помещает в исходящий топик информацию о том, что задача выполнена.

Гарантии exactly once для такого сценария требуют транзакций и означают, что при любых проблемах с серверами брокера сообщений и приложения сообщение должно быть обработано и помещено в исходящий топик ровно один раз. Что может пойти не так?
Проблема №1. Дублирование сообщений при ребалансировке
До версии Apache Kafka 2.6 (а старые версии до сих пор много где используются) была возможна ситуация, когда из-за зависшего и потом восстановившего работоспособность продюсера сообщения в исходящем топике могли быть задублированы.
Входящий топик разбит на партиции, и его читает некоторое количество консьюмеров, объединённых в одну группу. Для обеспечения согласованности Apache Kafka устанавливает ограничение: два консьюмера из группы не могут одновременно читать одну и ту же партицию топика.
Консьюмер 1 читает данные из входящего топика, пишет в исходящий, начинает коммит транзакции и перестаёт отвечать.
Через некоторое время Apache Kafka выполняет ребалансировку и входящий топик начинает читать консьюмер 2.
Консьюмер 2 читает те же данные, что и консьюмер 1, обрабатывает их, записывает в исходящий топик и выполняет коммит.
Консьюмер 1 восстанавливает работоспособность. Он ничего не знает о том, что данные уже обработаны и консьюмер 2 выполнил коммит. Консьюмер 1 уже прочитал данные из входящего топика и начал выполнять коммит до потери работоспособности, поэтому ограничение Apache Kafka на одновременное чтение топика нарушено не будет. Его идентификатор
transactional.idвсё еще уникален в системе, поэтому Apache Kafka понимает, что это тот же клиент, а не новый. Он завершит коммит без ошибки, и в исходящем топике окажутся задублированные данные.

Apache Kafka использует идентификатор
transactional.id, чтобы уникально идентифицировать транзакционные продюсеры. Одновременно может существовать только один экземпляр продюсера с такимtransactional.id. Если появится новый продюсер с таким жеtransactional.id, то Apache Kafka перестанет принимать сообщения от предыдущего.
До версии Apache Kafka 2.6 эта проблема решалась правильным формированием transactional.id или использованием фреймворков вроде Apache Kafka Streams, Spring Kafka или Apache Flink, которые формировали идентификаторы с учётом описанной выше особенности Apache Kafka.
В YDB транзакции по умолчанию имеют уровень изоляции serializable, поэтому два консьюмера просто не могут параллельно выполнить коммит для смещений чтения — один из них будет первым, а второй получит ошибку, так как у него более старое поколение. Подробности в нашей статье на Хабре.
В Apache Kafka 2.6 проблему дублирования сообщений при ребалансировке починили. Каким образом? Запретив консьюмерам получать офсет (порядковый номер первого сообщения, для которого чтение ещё не подтверждено), пока есть незавершённая транзакция, в которой офсет изменён.
Проблема №2. Блокировка чтения новых транзакций до таймаута старых
Изменения в Apache Kafka 2.6 породили другую проблему: если при перезапуске приложения поменять transactional.id, то Apache Kafka не сможет понять, что начатые до перезапуска транзакции уже неактуальны и их нужно отменять.
В результате Apache Kafka не даст консьюмерам читать новые сообщения, пока таймауты незавершённых транзакций не истекут и такие транзакции не будут отменены. Поэтому для корректной работы с транзакциями в Apache Kafka рекомендуется выбирать отдельный transactional.id для каждого экземпляра продюсера и переиспользовать его при перезапусках этого экземпляра.

При использовании YDB проблемы с блокировкой чтения новых транзакций тоже нет. СУБД Яндекса хранит данные незавершённых транзакций в отдельной временной партиции. Поэтому консьюмерам нет необходимости ждать отмены транзакций для сохранения гарантий согласованности: они всегда читают данные из основной партиции, а не временной.
Проблема №3. Дублирование сообщений при сетевых задержках
До недавнего времени задержки сетевых пакетов могли приводить к дублированию сообщений. Если сервер Apache Kafka не получал TCP-пакет с данными транзакции до истечения таймаута, то клиент мог отменить транзакцию и заново отправить данные и коммит транзакции.
В редких случаях «задержавшийся» TCP-пакет мог быть получен сервером Apache Kafka до пакета с запросом на повторный коммит. Такая ситуация могла привести к дублированию данных в топике. Начиная с версии Apache Kafka 4.0 эту проблему решили так: у каждой транзакции появился уникальный идентификатор.
В нативных транзакциях YDB такой проблемы нет, потому что у транзакций всегда был уникальный идентификатор. В транзакциях Kafka-протокола мы поддерживаем Apache Kafka API 3.4 и вынуждены сохранять поведение Apache Kafka с возможным дублированием сообщений. С поддержкой протокола 4.0 об этой проблеме тоже можно будет забыть.
Проблема №4. Отменённые транзакции занимают место
Apache Kafka хранит неподтверждённые сообщения в тех же партициях, что и подтверждённые. Это означает, что, когда происходит успешный коммит транзакции, Apache Kafka добавляет специальное техническое сообщение об этом коммите. Клиентские SDK вычитывают все сообщения в партициях и, ориентируясь на служебные метки, выбирают и передают клиентским приложениям только сообщения из подтверждённых транзакций. Подробности в статье на Хабре.
Такая архитектура приводит не только к блокированию чтения новых транзакций до таймаута старых, но и к тому, что сообщения отменённых транзакций продолжают занимать место в топиках, а клиентские SDK — вычитывать их данные и тратить пропускную способность сети.
YDB Topics используют отдельные временные партиции для хранения сообщений в неподтверждённых транзакциях. При коммите сообщения копируются в основную партицию. А при откате транзакции временная партиция просто удаляется. Такая архитектура позволяет не только экономить сетевые чтения для клиентов, но и занимаемое сообщениями место на дисках.
Stateful перемещение сообщений между топиками
Ещё один популярный способ потоковой обработки — хранение состояния во внешней системе. Типичный пример такой архитектуры: сервис вычитывает сообщения из входящего топика, обрабатывает их с сохранением промежуточного результата и время от времени записывает сообщения в исходящий топик.
В простейшем случае это может быть подсчёт определённых сообщений, где счётчик сохраняется в отдельном хранилище, а количество сообщений с какой-то регулярностью записывается в исходящий топик.

Такая архитектура сложнее, чем перемещение между топиками, потому что транзакции нужно делать не в рамках одной системы, а между двумя разными. Если транзакция успешно завершилась в одной системе и неуспешно в другой, то отмена неуспешной транзакции никак не повлияет на успешно завершённую, что приведёт к потере согласованности.
Для такой архитектуры часто применяются фреймворки вроде Kafka Streams или Flink. Информация о транзакции сначала записывается в отдельное хранилище, и только после успешного коммита во все участвующие системы в этом отдельном хранилище делается пометка, что транзакция успешно выполнена.
YDB поддерживает Apache Kafka API, поэтому для сценариев stateful перемещений сообщений между топиками можно использовать как Kafka Streams, так и Flink. Если же хранить промежуточные данные в таблицах YDB, то можно воспользоваться нативным механизмом транзакций между таблицами и топиками — в таком случае можно не использовать внешние фреймворки и построить решение только на YDB.
Загрузка сообщений в топик
Типичный случай из моей практики: получаем данные по REST API и хотим, ничего не потеряв, записать их в Apache Kafka ровно один раз. Или похожий случай: нужно загрузить сообщения из базы данных в Apache Kafka.

В случае получения сообщений по REST API пригодится идемпотентный продюсер. Даже если несколько раз отправить в брокер одно и то же сообщение (например, в случае разрыва сетевого подключения и повторных попыток отправки), то благодаря такому продюсеру оно будет записано только один раз.
В Apache Kafka идемпотентность достигается путём получения для сессии producer.id и нумерации сообщений в рамках сессии. Такой подход работает в сценариях с разрывами сетевых подключений, но даёт сбой, если продюсер или брокер перезапускаются, продюсер получает новый producer.id и начинает нумерацию с нуля.
Если сообщения для загрузки в топик приходят по REST API, то единственное хорошее решение, которое я видел, — это создание отдельного приложения на Flink или Kafka Streams. Если сообщения можно уникально идентифицировать, например по идентификатору, то такое приложение может устранять дубли на заданном временном окне и записывать сообщения в целевой топик уже без дублирования.
В YDB Topics идемпотентный продюсер позволяет писать сообщения ровно один раз, если у них есть уникальный возрастающий числовой идентификатор. Сервер YDB сам отфильтрует все сообщения, у которых такой идентификатор,
SeqNo, меньше или равен тому, который уже был обработан ранее.
Загрузка из базы проще, чем из REST API. Данные можно загружать с помощью Kafka Connect: начиная с версии 3.3 можно использовать коннекторы с гарантиями exactly once (пример для debezium). YDB тоже можно использовать с Kafka Connect для CDC из базы: пример есть в документации.
Выгрузка сообщений из топика
Kafka Connect позволяет реализовать сценарий выгрузки сообщений с гарантиями exactly once. Благодаря принципу идемпотентности сообщения можно записывать ровно один раз, например в базу данных. Это требует наличия первичного ключа и поддержки UPSERT со стороны СУБД.
Также идемпотентная запись работает в Hadoop и S3. Для её поддержки нужно правильно настроить коннектор: как это сделать, написано в документации конкретных коннекторов.
Kafka Connect можно использовать для выгрузки сообщений из YDB Topics. Кроме этого, YDB Topics поддерживает нативные транзакции между топиками и таблицами, которые можно использовать для перемещения данных с ACID гарантиями.
По сравнению с рассмотренными ранее сценариями выгрузка сообщений из топика является наиболее простой, потому что при проектировании мы можем положиться на идемпотентные свойства системы, в которую выгружаются данные.
Обеспечить гарантии exactly once все ещё непросто
Завершая эту статью, я хочу ещё раз подчеркнуть, что гарантии exactly once в потоковой передаче данных — это в первую очередь архитектурная задача. Apache Kafka Transactions API, поддерживаемое YDB, даёт необходимые строительные блоки для разработки отказоустойчивых и надёжных систем. Но не освобождает нас, разработчиков, от дизайна архитектуры с учётом возможных отказов.
База данных YDB и её компонент YDB Topics доступны как опенсорс-проект и как коммерческая сборка с открытым ядром. Вы можете запустить её у себя или воспользоваться нашим managed-решением в Yandex Cloud.
Мы общаемся с нашими пользователями в Telegram и на Хабре. Если вы разрабатываете системы с использованием YDB, Apache Kafka или другого брокера сообщений, то в комментариях к этой статье я буду рад обсудить exactly once и другие архитектурные вопросы.
mazurovn
Спасибо очень интересная статья. Хотелось бы еще статью про репликацию данных через брокер сообщений .
asserebryanskiy Автор
Привет! Спасибо! Подскажи, про репликацию - это про то, как быстро и надежно отгрузить данные из БД через CDC и Kafka в DWH или в другие БД? Или про то, как реплицировать данные между несколькими кластерами kafka?