Debezium — популярный фреймворк для Change Data Capture (CDC), позволяющий отслеживать изменения в источниках данных (таких как базы данных) и передавать их в потоковые платформы вроде Apache Kafka. Одним из компонентов Debezium является JDBC Sink Connector, предназначенный для записи данных из Kafka в реляционные базы данных посредством интерфейса Java Database Connectivity (JDBC). 

Debezium JDBC Sink Connector может решать множество задач: от репликации данных между БД и синхронизации обновлений между микросервисами до создания резервных копий данных для целей тестирования или разработки. Мы в VK Tech используем Debezium JDBC sink connector, чтобы строить перформанс-интеграции. Но в нагрузочных тестах столкнулись с проблемой производительности, которая не решалась никакими обходными путями. Поэтому нам пришлось детально погрузиться в нюансы обработки событий в Debezium JDBC connector.

Привет, Хабр. Меня зовут Артём Дубинин. Я старший разработчик Backend в команде Tarantool CDC — решения для репликации данных в реальном времени между системами управления базами данных (СУБД). В этой статье я изложу свою интерпретацию создания Debezium, расскажу о том, как работает Debezium JDBC connector, а также о нашем варианте оптимизации перформанса, который попал в Open-source версию.

Начнем с контекста: Kafka и Kafka Connect

Начиная говорить про Debezium, необходимо пояснить контекст, из-за которого родился Debezium 

Kafka — популярный распределенный брокер сообщений, разработанный для эффективного сбора, хранения и передачи потоков данных в реальном времени. Он отличается высокой производительностью, надежностью и способностью легко масштабироваться.

Одним из ключевых инструментов для взаимодействия Kafka с внешними системами является Kafka Connect. Это встроенный механизм Kafka, облегчающий подключение и передачу данных между Kafka и различными сервисами, такими как базы данных, файловые системы и аналитические инструменты. 

Kafka Connect состоит из двух типов коннекторов:

  • Source connectors извлекают данные из внешних систем и загружают их в Kafka.

  • Sink connectors получают данные из Kafka и направляют их в другие системы.

При чем здесь Debezium

По умолчанию при работе с Kafka Connect для каждой конкретной базы данных необходимо разрабатывать собственный source connector, что усложняет процессы интеграции и требует значительных усилий разработчиков.

Чтобы упростить интеграцию с различными базами данных и решить проблему отсутствия универсального подхода, команда энтузиастов разработала множество коннекторов, которые были объединены в единое название Debezium («DBs-ium»читай как набор инструментов для баз данных). Этот инструмент (набор коннекторов) был объединен двумя связанными универсальными идеями: паттерном получения данных из баз данных Change Data Capture (CDC) и единым форматом сообщения события. Идея использовать один паттерн для получения данных CDC, а конкретнее Log-Based CDC, вводит понятие «событие базы данных» и позволяет дедуплицировать написание кода коннекторов для различных баз данных. Идея единого формата данных же позволяет использовать несколько коннекторов одновременно для единых цепочек данных.

Поскольку Log-Based CDC легко интегрируется в большинство БД, Debezium позволил получить унифицированный способ сбора изменений из любых поддерживаемых баз данных и доставки их в Kafka.

Но в Kafka Connect присутствует еще и возможность написания Sink-коннектора, то есть коннектора, который забирает данные из Kafka и отдает в базу данных. Здесь Debezium придумали использовать другую универсальную идею — использовать JDBC.

Все выглядит неплохо: благодаря Kafka Connect мы можем передавать данные из любого источника в любой приемник. Однако если речь идет о построении перформанс-интеграций, возникает ряд проблем именно на стороне sink, несмотря на то что источник функционирует безупречно. Основная сложность связана с использованием JDBC-коннектора.

Детальнее о проблеме и способах ее преодоления

Чтобы понять суть проблемы, немного упростим задачу и представим, что нам нужно работать не с разными базами данных, а только с PostgreSQL, причем переливать данные из одного PostgresSQL в другой PostgreSQL, получая некую PostrgeSQL федерацию: из PostgreSQL Stand-in инстанса в PostgreSQL Stand-by инстанс. Для организации такой реализации достаточно трех коннекторов:

  • Kafka Connector;

  • PG Connector;

  • JDBC Connector.

И в случае с Source-частью этого пайплайна все довольно просто. Мы используем технологию CDC, которая фиксирует все изменения в базе данных: вставки, обновления, удаления и другие операции. Коннектор формирует пакет изменений (батч), который целиком записывается в Kafka. Kafka устроен таким образом, что он просто добавляет новые записи в конец журнала (append-only log), поэтому нам неважно, какого рода событие произошло.

Но с Sink-частью все несколько сложнее, поскольку база данных это не просто append-only log (хотя база данных и может таковым быть), но в базе данных мы оперируем уже событиями — событиями INSERT, UPDATE, DELETE и т. д.

Здесь можно придумать несколько способов, как обрабатывать пачку событий, пришедшую из Kafka. 

Naive way. Наивный путь подразумевает обработку по одной записи, то есть по одному событию в целевую БД (в нашем случае это PostgreSQL Stand-by). При этом, поскольку JDBC синхронный, здесь возникают вынужденные ожидания: вставляем запись, ждем, удаляем запись, ждем и так далее. Из-за этого перформанс значительно проседает.

Batch processing. В JDBC есть батчевый API, который позволяет делать prepareStatement конкретным событием. То есть можно объединять несколько событий одного типа в батчи.  

Выглядит это следующим образом: сначала создается подготовленное выражение (PreparedStatement) для определенной таблицы, затем последовательно привязываются значения всех записей из пакета (батча), и только после этого выполняется команда (executeBatch()):

session.doWork(conn -> {
prepareStatement = conn.preparedStatement(“
INSERT INTO
TBL_NAME (field1, …, fieldN) VALUES (?, …, ?)
“);

// Привязка параметров и добавление в батч
// prepareStatement.addBatch(...); // 100 вставок

prepareStatement.executeBatch();

});

Такой же подход применяется и для удаления записей (DELETE):

session.doWork(conn -> {
prepareStatement = conn.preparedStatement(“
DELETE FROM
TBL_NAME WHERE KEY_FIELD=?
“);
// ------//------------

});

То есть мы можем иметь несколько буферов для событий разных типов (например, insert и delete), что потенциально должно улучшить перформанс за счет сокращения ожиданий между передачей батчей.

Но и здесь есть нюансы, поскольку в процессе передачи событий может быть нарушена последовательность.

Возможные последствия и наше решение

Из-за батчинга события delete и insert, относящиеся к одной записи, могут прийти в неправильном порядке. Например, в источнике первым событием было delete, а вторым — insert, однако после объединения событий в батч порядок может измениться, и в результате в целевой базе данных запись будет ошибочно удалена, хотя в изначальной базе данных запись существует.

Подобные ситуации отрабатывает и Debezium. Но он применяет не совсем оптимальный подход. Так, Debezium выбрал простое решение проблемы конфликтов: при обработке события Debezium проверяет, есть ли события во втором буфере, и если есть, то сбрасывает конкурирующий по событию буфер. То есть если события приходят в таком порядке: insert, insert, delete, — то вначале накопится буфер insert’ов из двух событий и при обработке delete этот буфер insert’ов будет сброшен.

Это работает, консистентность сохраняется.

Но в случаях, когда конкурирующие события часто чередуются, все равно будет много ожидания; в худшем случае, когда каждое новое событие будет конкурирующим, мы будем ждать, как на изначальной картинке.

При этом Debezium не проверяет ключ события — он проверяет только его тип, что неоптимально. Можно сделать так, чтобы события конкурирующего буфера сбрасывались только тогда, когда хотя бы у одного события в конкурирующем буфере есть ключ, равный ключу обрабатываемого сейчас события. 

Но тогда, если мы хотим сохранять еще как-либо границы транзакции, у нас могут возникнуть проблемы. Теоретически данный функционал можно было сделать опциональным.

Изучая дальше оптимизации Debezium, можно узнать про такую опцию, как reduction buffer, которая позволяет не сбрасывать старые события по одному ключу в рамках одного батча.

Например, к нам пришло несколько INSERT по одному ключу и события INSERT мы воспринимаем как UPSERT.

Тогда, убрав все события, кроме последнего, мы все так же получим консистентное состояние БД, как если все события были сброшены в базу данных.

За данный функционал и отвечает опция use.reduction.buffer.

Но была одна проблема: редуцирование буфера работает только в рамках одного типа буфера. И если у нас все так же часто чередуются DELETE и INSERT, мы будем получать такую же проблему ожиданий.

Именно этот аспект мы и решили доработать. Так, мы усовершенствовали подход reduction buffer, начав учитывать тип события и его ключ в рамках обоих буферов. Теперь, если у нас имеются события с одним и тем же ключом, мы можем исключить устаревшие события прямо на этапе предварительной обработки, не отправляя их дальше в базу данных. 

Например, если за событием удаления (delete) сразу следует вставка (insert) той же самой строки, событие удаления становится ненужным и удаляется заранее.

Причем мы сделали так, что теперь события конкурирующего буфера не нужно сбрасывать никогда, так как не осталось конкуренции из-за того, что конкурирующие события попросту удаляются из буферов.

Благодаря этому подход позволяет уменьшить количество избыточных операций в базе данных, сократить число необходимых write-вызовов и значительно повысить производительность. Наибольший эффект достигается в ситуациях с большим количеством одновременных конкурентных событий.

Полученные результаты и выводы

При тестировании граничного случая для наших задач использование патча позволило повысить производительность обработки данных почти в 30 раз.

Примечание: на графике на самом деле указан разброс от 0 MB/s до 30 MB/s. В графане неправильно был настроен лейбл вертикальной оси

При этом предложенная нами реализация под Debezium JDBC sink connector явля��тся опцией — по умолчанию коннектор может работать как и прежде. Таким образом, просто расширяется количество сценариев его применения.

Важно, что наш патч не остался внутренним решением: 9 июля 2025 года он попал в финальный релиз Debezium 3.2.0 Final Released и уже доступен всем пользователям фреймворка.

Таким образом, наш кейс показывает, что Debezium и JDBC sink connector — не только функциональные, но и гибкие инструменты, которые при необходимости можно успешно дорабатывать под свои задачи, делая результат подобных оптимизаций общедоступным в рамках Open-source версий.

А вы уже тестировали измененную производительность reduction buffer? Делитесь отзывами и обратной связью — будет полезно.

Комментарии (1)


  1. SerRozan
    19.11.2025 12:47

    Похоже, ребята наконец сделали штуку, которую не страшно отдавать в прод - вот это уже прогресс.