Привет! Меня зовут Ильсаф, я инженер данных в MAGNIT OMNI — бизнес-группе ритейлера «Магнит», которая отвечает за развитие омниканального опыта для клиентов. В этой статье я собрал свои практические наблюдения по работе Kafka Connect и Debezium с PostgreSQL: от настройки репликации до мониторинга и бэкфиллинга.
Ещё больше практических материалов по Kafka Connect, Debezium и построению надёжных data-пайплайнов публикую в своём Telegram-канале, а на сайте собираю статьи, разборы и полезные шпаргалки в одном месте.
Kafka Connect
Kafka Connect — open-source инструмент, который позволяет переносить данные между Kafka1 и другими системами в формате real-streaming2.
В эпоху BigData real-streaming стал очень востребованным для аналитики, благодаря чему появились Kafka и Kafka Connect и стали набирать популярность. Сейчас крайне редко можно увидеть вакансию специалиста, работающего с данными, где среди требуемых технологий не будет Kafka.
Kafka Connect состоит из двух видов коннекторов:
Source Connector — коннектор, который пишет в Kafka из внешнего источника данных.
Sink Connector — коннектор, который пишет во внешний источник данных из Kafka.

Debezium

Debezium (в разговоре «дебез») — коннекторы, позволяющие читать изменения данных (CDC)3 в режиме real-stream. Как правило, это источники под протоколом JDBC4.
PostgreSQL
Пункты ниже будут касаться работы СУБД PostgreSQL с Debezium. Решения были выявлены опытным путем и необязательно являются стопроцентными.
Настройка источника перед репликацией
Перед началом репликации из PostgreSQL нужно выполнить следующие действия:
max_slot_wal_keep_sizeзадаёт лимит WAL, который держится для репликационных слотов (в МБ). Если лимит превышен, слот могут выключить, а старые WAL удалятся. Значение -1 — без лимита, и при отсутствии читателя WAL может разрастись до заполнения диска. Отдельно проверьтеmax_wal_size: это порог для чекпойнта, а не жёсткий лимит WAL. Он не остановит рост WAL при сильном отставании слота, но влияет на частоту чекпойнтов и переработку сегментов.Создать публикацию и добавить туда необходимые таблицы.
CREATE PUBLICATION <publication_name> FOR TABLE <table1>, <table2>, … <tableN>;
(опционально) Настроить
Replica Identity— оно отвечает за то, какие события и насколько глубоко отправлять .
Replica Identity:
DEFAULT — отправляет create, update, delete, но в UPDATE только новое значение (без old state).
FULL — как DEFAULT, но включает предыдущее состояние (старые значения строк).
INDEX — использует уникальный индекс, указанный для репликации (минимальный набор ключей).
NOTHING — в UPDATE/DELETE не передает ключи строки (обычно не подходит для CDC).
ALTER TABLE <table> REPLICA IDENTITY FULL;
• Создать heartbeat таблицу.
CREATE TABLE IF NOT EXISTS debezium.heartbeat( id INT, last_update TIMESTAMPTZ DEFAULT current_timestamp, CHECK (id=1), PRIMARY KEY (id) );
В этом случае всегда одна запись, у которой будет меняться поле last_update.
• Создать сигнальную таблицу.
CREATE TABLE IF NOT EXISTS debezium.signal( id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL );
Потеря сетевой связности
Это проблема номер один: Debezium-коннектор чаще всего падает из-за этого. Чтобы это предотвратить, достаточно перезагрузить коннектор при условии, что сетевая связность восстановилась. По умолчанию коннектор не перезапускается после падения. Чтобы он пытался перезапускаться, нужно настроить следующие параметры:
{ "errors.log.enable": true, "errors.log.include.messages": true, "errors.max.retries": 15, "errors.retry.delay.initial.ms": 1000, "errors.retry.delay.max.ms": 10000, "errors.retry.timeout": -1, "errors.tolerance": "none" }
TOAST-столбец
Если вы работали с репликацией таблиц, в которых есть столбец с очень большим значением (чаще всего JSON), то скорее всего, вы уже встречались со значением -__debezium_unavailable_value.
Если коротко, то решается просто:
ALTER TABLE <schema>.<table> REPLICA IDENTITY FULL
Но в этом случае размер WAL увеличится в два раза, перед этим проверьте параметры max_slot_wal_keep_size и max_wal_size.
Если что, решение из этой статьи у меня не взлетело — оно привело к росту WAL и потере репликационного слота.
Гарантия корректной работы коннектора
Чтобы гарантировать корректную работу коннектора, статуса «Running» недостаточно, потому что может быть так: статус зеленый, но репликация не работает. Гарантом здесь выступает heartbeat-топик, который N раз в минуту пишет в топик, то есть нужно проверять свежесть сообщений (можно использовать Grafana).
Параметры, которые отвечают за настройку heartbeat:
{ "heartbeat.action.query": "INSERT INTO debezium.heartbeat(id) VALUES(1) ON CONFLICT(id) DO UPDATE SET last_update=now();", "heartbeat.interval.ms": 10000 }
В этом случае каждые 10 секунд в heartbeat-топик будет записываться сообщение, при отсутствии сообщений больше чем 5 минут — можно бить тревогу.
Бэкфиллинг
Часто бывает, что появилась возможность перезагрузить таблицу или какой-то интервал. Самый легкий и быстрый способ — использовать сигналы. Чтобы использовать их, нужно создать таблицу (DDL выше) и указать следующие параметры:
{ "signal.consumer.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username='secret' password='secret';", "signal.consumer.sasl.mechanism": "SCRAM-SHA-512", "signal.consumer.security.protocol": "SASL_SSL", "signal.consumer.ssl.truststore.location": "path_to_certificate", "signal.consumer.ssl.truststore.type": "PEM", "signal.data.collection": "debezium.signal", "signal.enabled.channels": "source,kafka", "signal.kafka.bootstrap.servers": "server1:9090,server2:9090,server3:9090", "signal.kafka.topic": "<service_name>.signal_topic" }
После этого в сигнальный топик <service_name>.signal_topic отправить сообщение:
{ "type": "execute-snapshot", "data": { "data-collections": [ "public.table1", "public.table2" ], "type": "INCREMENTAL", "additional-conditions": [ { "data-collection": "public.table1", "filter": "updated_at BETWEEN '2026-03-20 16:00:00' AND '2026-03-20 18:00:00'" } ] } }
БОНУС: CLI для управления коннекторами
В качестве крутой находки хочу поделиться инструментом, с которым можно легко управлять коннекторами Kafka Connect — kcctl.

В kcctl можно менять конфиги сразу в нескольких коннекторах с помощью регулярных выражений в названии коннектора, смотреть статусы коннекторов, создавать коннекторы. Мне кажется, это удобнее, чем использовать REST API или UI, если это касается каких-то манипуляций, а не просмотра.
Сноски
Kafka — open-source распределенная платформа, предназначенная для обмена сообщениями в реальном времени. ↩
real-streaming — потоковая передача данных в реальном времени. ↩
CDC — Change Data Capture — захват изменений данных, в каждом источнике реализован по-разному, например, PostgreSQL пишет изменения данных в WAL-лог. Основные плюсы CDC — чтение не напрямую из таблицы, что сильно снижает нагрузку на базу и повышает консистентность, так как в качестве изменения пишутся все операции CRUD (create, update, delete). ↩
JDBC — Java Database Connectivity — единый интерфейс для взаимодействия с источниками данных, такими как MySQL, PostgreSQL и т. д. ↩