Привет, Хабр!
Я Евгений Прочан, в платформенной команде Magnit OMNI развиваю инфраструктуру DWH. Расскажу здесь, почему нам понадобилось перейти от батчинга к CDC и как мы это делали. Причин перехода было две: потребность бизнеса в расширении возможностей инфраструктуры и нестабильность нашего старого процесса репликации.
Мы используем в основном базы данных PostgreSQL. Оттуда пакетами раз в час передаём данные в S3, ClickHouse и таблицы Iceberg. Наша потоковая нагрузка достигает примерно полутора терабайта данных, 6000 операций в секунду (около 1500 в самой нагруженной базе данных).
Как было до Debezium
На заре нашего DWH мы запускали скрипты по cron-у, и это было очень неприятно. У нас был выделен SSH-сервер, которому регулярно требовалось добавлять память.
Потом мы перешли на Airflow. Стало легче, но не намного. Ни о какой работе в реальном времени речи не шло. Требовалось найти более подходящий под наши условия инструмент.
Мы используем фреймворк Kafka Connect, написанный на Java.

У Kafka Connect есть координатор, source-коннектор и target-коннектор. Debezium — это source-коннектор для фреймворка Kafka Connect.
С какими проблемами мы столкнулись
После внедрения Debezium нам пришлось обходить ряд трудностей:
Потерю данных.
Непрозрачность мониторинга. Debezium достаточно капризный, некоторое время нам не удавалось настроить мониторинг и оповещения для надёжного контроля потока данных.
Неоптимальный конвертер для данных, которые приходят в Kafka. Он отвечает за преобразование информации, поступающей из Debezium. Потратили время на поиск более эффективного конвертера.
Проблемы инициализирующих снимков с таймаутами и блокировками. Хотя Debezium предназначен для потоковой передачи, в нём есть механизм снятия инициализирующих снимков с таблиц до начала пересылки. То есть мы формируем полное состояние таблицы и поддерживаем его постоянно в актуальном состоянии, как в источнике.
Прочие оптимизации, о которых я расскажу ниже.
Потеря данных
В PostgreSQL есть механизм WAL (журнал предзаписи) — это стандартный метод обеспечения целостности данных, который позволяет восстановить БД после падений. WAL можно читать через протокол репликации, используя слоты репликации. Debezium выдаёт себя за одну из таких реплик, поэтому может возникнуть следующая ситуация.
Допустим, в базе данных есть несколько таблиц, и мы добавляем ещё одну. Предыдущие таблицы живут своей жизнью, в них что-то происходит, а в новой нет активности в течение какого-то времени. WAL — один на всю базу. Debezium ничего не пишет в БД, WAL постепенно накапливается. По мере его накопления либо переполнится директория, в которой он лежит — и тогда PostgreSQL упадёт, — либо WAL достигнет предельного заданного нами размера (мы установили лимит в 30 Гб), за это отвечает переменная max_slot_wal_keep_size
. В результате часть данных может пропасть.

Мы боремся с этим добавлением heartbeat-топика. Так можно назвать саму таблицу.

В Debezium есть механизм постоянного внесения обновлений в эту таблицу. Прослушивая её, мы записываем данные в heartbeat-топик. Так мы избегаем ситуации, когда по каким-либо таблицам отсутствует активность, и больше не теряем данные.
# Интервал отправки heartbeat-сообщений (в миллисекундах)
heartbeat.
interval.ms
= 10000
# SQL-запрос, который будет выполняться для heartbeat
heartbeat.action.query = 'INSERT INTO debezium. heartbeat (id) \
VALUES (1) \
ON CONFLICT (id) DO UPDATE SET last_update = now(); '
# Добавить схему и heartbeat таблицы в вычитку
schema. include. list = 'debezium'
table include list = 'debezium.heartbeat'
Чем нагруженней база данных, тем чаще надо будет обновлять heartbeat.
Можно пойти ещё дальше и добавить в топик мета-информацию. Поскольку у нас регулярно добавляются новые базы, которые нужно прослушивать с помощью Debezium, мы добавили в код версионирование. Иногда это помогает при отладке.
# Интервал отправки heartbeat-сообщений (в миллисекундах)
heartbeat.
interval.ms
= 10000
# SQL-запрос, который будет выполняться для heartbeat
heartbeat.action.query = 'INSERT INTO debezium. heartbeat (id, db_version) \
VALUES (1, version()) \
ON CONFLICT (id) DO UPDATE SET last_update = now(); '
# Добавить схему и heartbeat таблицы в вычитку
schema. include. list = 'debezium'
table include list = 'debezium.heartbeat'
Мониторинг
До того как добавить heartbeat-топик, мы мониторили всё через Airflow. Предположим, раз в 5 минут мы опрашивали все наши Debezium-коннекторы. Если статус был running, то мы считали, что всё хорошо. Но иногда коннекторы под капотом оказывались в состоянии up for retry, хотя наружу отдавали статус running. И к следующей проверке мы могли потерять слот репликации, потому что WAL достигал лимита в 30 гигабайтов.
Для решения этой проблемы мы начали отслеживать состояние коннекторов с помощью того же heartbeat-топика. Для этого в GraphQL написали такой код:
sum(
increase(
kafka_server_BrokerTopicMetrics_MessagesIn{
topic=~"cdc.*.__debezium- heartbeat.*"
}[5m]
)
) by (topic) == 0
Здесь мы смотрим, сколько сообщений приходит в топик, и отбираем те, которые являются «пульсом». Для этого присваиваем им единообразные названия. И проверяем, сколько изменений произошло за последние пять минут. Если нисколько, значит, Debezium сломался.
Также в AlertManager выполняется такой код:
- alert: DebeziumHeartbeatStopped
expr: sum(increase(kafka_server_BrokerTopicMetrics_MessagesIn{
topic=~"cdc.*.__debezium-heartbeat.*"}[5m])) by (topic) == 0
for: 10m
labels:
sendto: dwh-only
annotations:
title: "[DWH Team] Heartbeat топик у источника {{ $labels.topic }}перестал наполняться"
summary: "3a {{ $activeAt }} в топике {{ $labels.topic }} кол-восообщений: {{ $value }}"
url: "
https://grafana.platform.corp/var-topic=
{{ $labels.topic}}"
Мониторинг и оповещения работают в связке, не нужно постоянно обращаться к AirFlow и API Kafka Connector.
Heartbeat можно рассматривать как сквозной интеграционный тест. Если он изменяется со временем, то это означает, что:
БД доступна, потому что Debezium наполняет heartbeat только при активном подключении к источнику данных.
Kafka Connector работает, иначе процесс коннектора не мог бы отправлять сообщения.
Kafka доступен и принимает сообщения, потому что Kafka Connector записывает в heartbeat.
Работает сеть между всеми звеньями, от БД до брокеров Kafka.
Конвертер для сообщений
Существуют такие конвертеры:
StringConverter. Преобразует в обычную строку. Мы его не используем.
JsonConverter. Применяется по умолчанию. Сначала мы пользовались им.
JsonSchemaConverter. Более продвинутая версия, с типизацией и другими дополнительными возможностями. Однако размер сообщений получается больше.
AvroConverter и ProtobufConverter. Преобразуют в бинарный формат, требующий сериализации и десериализации. Мы выбрали вариант Protobuf, потому что в нашей компании этот формат используется для межсервисного взаимодействия.
Если вы выберете JsonSchemaConverter, то обязательно добавьте реестр схем данных.

Как это работает? В начало каждого сообщения добавляется идентификатор схемы, которая физических хранится в реестре. При этом она кешируется в памяти приложения, чтобы не нужно было постоянно обращаться за ней в реестр.
Поскольку мы контролируем схемы сообщений, то благодаря бинарному формату получаем меньший объём данных в Kafka. Для сравнения, насколько сжимаются данные при использовании разных форматов:

Сначала мы использовали JSON со схемой, которую сохраняли в сообщениях. То есть в каждом сообщении у нас передавалась и полезная нагрузка, и описание самого сообщения. Это плохо влияло на размер. Добавив Protobuf и схему Reduce, мы почти вдвое уменьшили трафик.

Инициализирующие снимки таблиц
Напомню, что штатно Debezium работает так: запоминает место в WAL, на котором мы начинаем снимок, потом считывает всю таблицу целиком и начинает передавать данные с запомненного места.
С инициализирующими снимками возникали проблемы.
Во-первых, Debezium пытался просто применить к таблице select*
. У него это не всегда получалось из-за размеров таблиц — возникали блокировки.
Во-вторых, мы не могли управлять этим процессом. При этом иногда Debezium мог делать снимок больше дня, и если происходил сбой — например, падала сеть, — то приходилось начинать сначала.
В-третьих, при создании инициализирующего снимка резко возрастала нагрузка на базу данных.
Наконец, не всегда удавалось сделать снимки с больших таблиц: если в БД прописан таймаут на select
меньше, чем требуется времени для создания снимка. Эта проблема особенно актуальна, если величиной таймаута управляешь не ты, а неподконтрольный сервис.
В Debezium 1.6 появились инкрементальные снимки, и мы полностью перешли на них. В чём их преимущества:
Меньше нагрузка на БД. Вся база делится на фрагменты по батчам. Размер фрагмента можно указать в конфигурации в параметре
LimitOffset
.Можно отменить создание снимка или поставить на паузу. Также можно возобновить его в любом месте.
Не нужно создавать новый коннектор. Допустим, мы загружаем десять таблиц, и нужно добавить ещё пять. Раньше в таких ситуациях при создании инициализирующего снимка нам приходилось поднимать отдельный коннектор и создавать снимок дополнительных таблиц, со всеми вышеописанными проблемами.
Хотя инкрементальные снимки удобнее, инициализирующие тоже бывают полезны:

Во-первых, инициализирующие снимки работают быстрее, потому что живут в отдельном процессе, а у инкрементального один процесс и делает снимок, и отправляет его.
Во-вторых, инкрементальные снимки делаются последовательно, а инициализирующие можно распараллелить.
В-третьих, до Debezium 2.2 инкрементальные снимки можно было делать только по первичному ключу, а потом разрешили использовать суррогатный ключ (главное, не забудьте добавить индекс на это поле).
Настройка сигналов
Для управления инкрементальными снимками приходится активно использовать так называемые сигналы:
execute-snapshot
: запускает инкрементальный снимок для указанных таблиц;stop-snapshot
: прерывает текущий снимок, в том числе удаляет часть таблиц из текущего запроса на снятие снимка;pause-snapshot
: ставит на паузу текущий снимок;resume-snapshot
: возобновляет ранее остановленный снимок;custom
: отправляет произвольные данные для пользовательской обработки. Например, для автоматического управления или реализации HTTP-интерфейса.
Документация у Debezium хорошая, но процедура запуска сигналов через Kafka описана неудовлетворительно. Мы пользуемся таким способом:
{
"type": "execute-snapshot",
"data": {
"data-collections": [
"schemal. table1",
"schema1. table2"
]
"type": "INCREMENTAL"
}
У этого сигнального топика будет отдельный consumer, который нужно расписать, прежде чем запустить сигнал. В документации об этом не сказано.
Кроме того, здесь мы указываем, какой топик использовать в качестве интерфейса для вставки сигнальных сообщений. В channels
нужно указать и source
, и kafka
. Source
— это БД, через неё будет проходить сигнал. А все остальные сигнальные интерфейсы — это просто абстракция над базой данных.
После запуска сигнала мы формируем сообщение и требуем запустить снимок, этого достаточно. При желании можно добавить предикаты для таблицы, чтобы читать её не полностью. Также в конфигурации можно добавить суррогатный ключ.
Прочие оптимизации
Однажды нам нужно было сделать снимок таблицы, занимавшей 20 Гб. Мы выбрали инициализирующий снимок, потому что объём был не слишком велик и время поджимало. Но retention на топике оказался меньше снимка, и Debezium молча перестал работать. Пришлось отлаживать.
При переключении с реплики на мастер нужно перенести состояние слота репликации. Необходимо запомнить, в каком месте WAL вы находитесь, чтобы продолжить чтение оттуда. До PostgreSQL 17 нужно было делать мультипликации команды и перезапускать PostgreSQL, а потом это начало выполняться автоматически.
Последняя оптимизация связана с организацией инфраструктуры. Когда у нас было примерно 10 баз, нам хватало REST API для создания коннекторов и топиков. Но с увеличением количества баз мы начали часто путаться, поэтому теперь запускаем всё это через Terraform.
Кроме того, для работы сигнальных и heartbeat-таблиц их нужно создать в базе. Вначале мы делали это довольно медленно. Теперь всё это включено в миграции, то есть мы быстро добавляем каждую новую базу.
Резюме
Когда мы только начали внедрять Debezium, то ожидали, что сейчас закатаем рукава, почитаем документацию и всё быстро реализуем. В итоге столкнулись с вышеописанными проблемами. Потом подключили DevOps. Система работает в Kubernetes, мы вместе изучали журналы и документацию. И только подключив DBA к работе тех баз, откуда мы хотели читать, нам удалось победить все трудности. Сначала DBA сопротивлялись и не позволяли писать heartbeat в их базах. Пришлось подробно объяснять, зачем это нужно и какую пользу им принесёт. Добились согласия, и теперь новая система радует всех причастных своей надёжной работой.
Комментарии (3)
manyakRus
20.08.2025 19:53Debezium must Die
Debezium какчает разные таблицы с разной скоростью и получается неконсистентность,
когда например есть document_id=100500, а документа нет с таким id=100500
поэтому нельзя использовать Debezium.
mr_stepik
20.08.2025 19:53У меня в проекте был как-то дебезиум.
Заменил на систему логической репликации, все в 100 раз лучше и надёжней. Нет ни проблем с транзакциями, ни с зависшими данными как в статье описывается.
На дибезиуме это кстати решается заменой плагина pg-logical
budnikovsergey
log ведь, а не lock