Привет, Хабр!
В этой статье хочу поделиться нашим опытом интеграции с Kafka.
В МегаФоне несколько десятков сервисов являются потребителями данных, публикуемых в кластерах Kafka. Все они разрабатывались под узкоспециализированные задачи.
В какой-то момент в нашем КХД также появилась необходимость интеграции с Kafka.
При разработке первой интеграции мы пошли традиционным путем и использовали Kafka Connect для Confluent 6.0.1. Сообщения, читаемые коннектором, перекладывались в Hadoop. Далее в PySpark выполнялся парсинг нужных данных, и полученные пачки выгружались в Oracle Exadata.
Но на этапе опытно-промышленной эксплуатации у нас возникли проблемы с производительностью из-за большого объема читаемых данных: ~100-110 млн сообщений в час (поток со звонками абонентов). Также было требование от бизнеса - данные в конечной витрине должны появляться с задержкой не более часа. Оптимизация интеграции затянулась еще на пару месяцев.
В итоге решение, которое мы внедрили в пром, не в полной мере устроило нас. Сложная реализация подразумевала необходимость привлекать на его дальнейшую доработку дефицитных экспертов.
Тем временем, перед нами встала задача разработки еще нескольких интеграций с Kafka.
Было очевидно, что требуется какое-то решение, которое не только ускоряло бы внедрение, исключая рутинную разработку, но и позволяло реализовать стандартную для таких интеграций батчевую выгрузку считанных сообщений в разные БД (Oracle/Hive/ClickHouse и в перспективе в Greenplum). И кроме того, умело выполнять предварительную обработку данных на лету (парсинг и трансформацию значений заданных атрибутов).
Мы обратились за помощью к разработчикам из смежного подразделения с соответствующей экспертизой и опытом разработки высоконагруженных сервисов. Чуть более чем через месяц в репозиторий был выложен готовый образ коллектора, соответствующего нашим требованиям. А по ходу дальнейшего тестирования функциональность его была значительно расширена.
Ниже подробно описываю реализованное в итоге решение и путь, который мы прошли при его тестировании и внедрении.
Основные возможности коллектора Kafkareader

Главная задача коллектора - возможность чтения топиков Kafka, парсинг и преобразование вычитанных сообщений с помощью встроенных функций или подключаемых пользовательских функций на java, формирование и выгрузка пачек данных в формате AVRO/JSON/CSV/TSV в Oracle/Greenplum/ClickHouse/Hive.
Также он позволяет:
Выполнять запись сообщений в локальные файлы (для исключения потерь данных в случае ошибок приёмника) и далее запись в таблицы Oracle/Greenplum/ClickHouse/Hive из локальных файлов.
Читать пароли (для подключений к БД) из PAM.
Выполнять логирование процесса вставки данных в БД (логирование оффсетов).
Настраивать мониторинг посредством отправки уведомлений с детализацией ошибок сервиса.
Настраивать мониторинг через Endpoint Heartbeat.
Настраивать Endpoint управления оффсетами.
Выполнять резервирование/георезервирование сервиса.
Коллектор работает в два этапа.
На первом этапе он вычитывает сообщения из топиков и выполняет парсинг данных согласно предварительно заданным правилам.
На втором этапе идет вставка пачки данных в БД.
Опционально перед вставкой в БД возможна промежуточная загрузка распарсенных данных локально в файлы. В этом случае коллектор сканирует содержимое каталога, объединяет файлы в батчи, делит на потоки и отправляет в БД.
Поточность в сохранении файлов регулируется числом консьюмеров, размеры файлов - настройками консьюмеров. Поточность вставки в БД регулируется настройками коллектора. Размерами вставляемых батчей также можно управлять. Commit в Kafka происходит только после сохранения данных в файл.
Удаление файла происходит только после успешного commit в БД в основную таблицу и в таблицу логирования (вставок). Запись в таблицу с данными и таблицу лога происходит в одной транзакции.
В случае ошибок вставки файлы помечаются и обрабатываются отдельной очередью (как правило, очередью меньшего размера, чтобы не влиять на основной поток). Т.е. в случае недоступности БД файлы будут пытаться обработаться отдельной очередью, пока БД не восстановится.
В случае прямой вставки сообщений в БД (без сохранения в файл) объединять получаемые консьюмером данные в пачки нельзя. Commit в Kafka происходит после успешной вставки батча в БД.
Коллектор не использует автокоммиты.
Особенности работы с топиками. Партиции и резервирование
Основным параметром, влияющим на производительность коллектора, является количество консьюмеров.
Рекомендуется создавать консьюмеры по числу партиций топика. Это обеспечивает максимальную скорость чтения данных из Kafka. При большем числе избыточные консьюмеры будут простаивать как резерв.

На схеме выше показан пример настройки контейнеров коллектора (Consumer Group A), развернутых на трех серверах.
Каждый контейнер имеет свой конфиг.
Конфиги контейнеров Kafkareader 1, Kafkareader 2 и Kafkareader 3 настроены идентично на чтение консьюмерами топиков Topic 1 и Topic 2 без указания партиций. Количество консьюмеров в каждом контейнере соответствует 6 партициям топика Topic 1 и 3 партициям топика Topic 2.
Стрелками показан один из возможных вариантов распределения брокером Kafka партиций между консьюмерами:
Topic 1. Partition 1 –> Kafkareader 1. Consumer 1
Topic 1. Partition 2 –> Kafkareader 1. Consumer 2
Topic 1. Partition 3 –> Kafkareader 1. Consumer 3
Topic 1. Partition 4 –> Kafkareader 1. Consumer 4
Topic 1. Partition 5 –> Kafkareader 2. Consumer 1
Topic 1. Partition 6 –> Kafkareader 2. Consumer 2
Topic 2. Partition 1 –> Kafkareader 1. Consumer 7
Topic 2. Partition 2 –> Kafkareader 2. Consumer 7
Topic 2. Partition 3 –> Kafkareader 3. Consumer 7
Консьюмеры ниже будут простаивать как резерв:
Kafkareader 1. Consumer 5
Kafkareader 1. Consumer 6
Kafkareader 1. Consumer 8
Kafkareader 1. Consumer 9
Kafkareader 2. Consumer 3
Kafkareader 2. Consumer 4
Kafkareader 2. Consumer 5
Kafkareader 2. Consumer 6
Kafkareader 2. Consumer 8
Kafkareader 2. Consumer 9
Kafkareader 3. Consumer 1
Kafkareader 3. Consumer 2
Kafkareader 3. Consumer 3
Kafkareader 3. Consumer 4
Kafkareader 3. Consumer 5
Kafkareader 3. Consumer 6
Kafkareader 3. Consumer 8
Kafkareader 3. Consumer 9
Также на схеме показано чтение консьюмерами группы Consumer Group B партиций топика Topic 2. Эти консьюмеры создаются другим потребителем.
Возможные варианты настройки поточности:
Один консьюмер на одну партицию без указания партиции. Таким образом будет обеспечиваться максимальная поточность в чтении. Можно указывать и меньшее число потоков - Kafka будет присваивать обработку нескольких партиций потоку. Можно указывать и большее число потоков - часть из них будет резервными.
Один консьюмер на одну партицию с указанием партиции, т.е. перечисление всех партиций топика. В этом случае назначением партиций Kafka не занимается. Доступна установка офсетов.
При нехватке ресурсов коллектора можно запускать несколько контейнеров на разных серверах.
При нехватке ресурсов БД также можно запускать чтение нескольких партиций в одном консьюмере.
Чтение всех партиций топика одним консьюмером в процессе. Коллектор работает в один поток, нагрузка на БД минимальна, вставка долгая. Можно использовать для теста, отладки маппингов.
Для каждой новой интеграции поднимается свой контейнер коллектора и там прописываются свои настройки. Для резервирования сервиса коллектор можно поднять на резервном сервере с идентичными настройками. Рекомендуется, чтобы консьюмеры создавались без указания конкретных партиций и балансировкой занималась Kafka.
Оффсеты
Указать оффсеты можно, только если консьюмер получает заранее определенные партиции при конфигурации коллектора. Если партиции назначаются балансировкой, механизм установки оффсетов не работает.
Основные параметры коллектора
В конфиге коллектора задаются параметры инициализации коллектора, настройки очередей и настройки записи в БД.
Кроме общих параметров можно также задавать следующие:
Количество одновременно обрабатываемых батчей при вставке в БД. Лучше делать его равным количеству партиций или количеству одновременных соединений к БД.
Количество одновременных повторно обрабатываемых батчей.
Время, которое будет выжидаться перед созданием нового консьюмера, если старый закрыт из-за ошибки.
Максимальное количество сообщений в пачке, которую получит консьюмер.
Минимальное количество файлов в пачке для вставки в БД.
Таймаут (в сек), после которого начинается обработка следующей группы файлов (эта и предыдущая настройки позволяют накапливать достаточно большой объем данных, большие батчи важны для производительности некоторых БД, например, ClickHouse).
Таймаут (в сек), после которого выполняется попытка вставки записи в БД, если предыдущая вставка завершается ошибкой.
Настройки очередей и пачек данных, вставляемых в БД
Число консьюмеров, т.е. поточность (скорость) чтения из Kafka. В идеале число консьюмеров равно числу партиций в топике.
Размер первичных файлов. Параметр определяет максимальное число сообщений, которые консьюмер получит за раз и которые будут сохранены в файл. Фактически консьюмер будет получать меньшее число сообщений, если текущий поток небольшой. Обычно параметр начинает работать только после простоя чтения, когда разбирается накопленная очередь. Рекомендуется устанавливать параметр примерно в 2-3 раза больше числа сообщений, получаемых консьюмером при нормальной работе (нужно определить опытным путём).
-
Размер вставляемой пачки в БД. Часто Kafka отдаёт сообщения более мелкими порциями, чем необходимо для вставки в БД. Для Oracle рекомендуемый размер батча 500-3000 строк, для CickHouse 100000-500000. Размер батча (пачки) в меньшей степени влияет на Oracle, но очень важен для ClickHouse. Размеры батчей влияют на потребляемую коллектором память. При нехватке ресурсов коллектора можно запускать несколько коллекторов на разных машинах и делить консьюмеры между ними.
Максимальный размер пачки в режиме разбора очереди = максимальное количество сообщений в пачке, которую получит консьюмер * минимальное количество файлов в пачке для вставки в БД. При нормальной работе значение параметра не достигается, и чтобы накопить достаточное количество файлов для вставки, коллектор попытается достичь минимального количества файлов в пачке (склеит в одну пачку сообщения из указанного количества файлов) или будет ждать указанное в таймауте количество сек и потом склеит все файлы какие есть.
Если опция записи в файлы отключена, то максимальный размер вставляемой пачки в БД = максимальному количеству сообщений в пачке, которую получит консьюмер.
Поточность вставки – сколько одновременно склеенных из нескольких файлов больших пачек будет уходить в БД. Параметр сильно влияет на нагрузку БД. Для CkickHouse рекомендуется делать равным количеству шардов, для остальных БД устанавливать опытным путём (смотрим, сколько необходимо для разбора потока при нормальной работе, и умножаем на 2 или 3, чтобы была возможность разбирать скопившиеся после простоя очереди).
-
Размер очереди вставки. Если количество одновременно обрабатываемых батчей при вставке в БД определяет число активных потоков в БД, то данный параметр - это число подготовленных пачек в очереди. Пока данные вставляются в БД, коллектор будет готовить к вставке новые пачки, чтобы сократить время между вставками. Подготовленные пачки занимают место в памяти коллектора - обычно рекомендуется делать параметр равным количеству одновременно обрабатываемых батчей при вставке в БД.
Для обработки ошибок существует отдельная очередь. У нее также есть количество активных потоков вставки (рекомендуется ставить вдвое меньшим чем количество одновременно обрабатываемых батчей при вставке в БД, чтобы не влиять на основной поток).
Если файлы сохраняются на диск, но не успевают вставляться в БД и копятся, то нужно проверять размер вставляемой пачки и количество потоков вставки в БД.
Если растет лаг в Kafka, то нужно увеличивать число консьюмеров.
Параметры парсинга сообщений
Подразумевают собой параметры маппинга ключей сообщений на столбцы таблиц БД. Одной строке из маппинга в конфиге соответствует один столбец в таблице БД.
Значения, записываемые в столбцы таблицы, могут быть как вычисляемыми, так и извлекаемыми из JSON/AVRO/.. .
Коллектор умеет также приводить значения к указанному типу JDBC перед отправкой в БД.
Для списков объектов коллектор позволяет накапливать значения и сохранять строкой через запятую в случае Oracle или типизированным массивом в случае ClickHouse. Для Oracle также доступна сложная обработка через JSON_TABLE.
Также коллектор позволяет делать сложные преобразования нескольких полей через ссылки.
Метрики, мониторинг
Для мониторинга состояния сервера используется endpoint heartbeat. Для контроля доступности сервиса heartbeat используется Blackbox exporter.
Если прописать в конфиге настройки почтового сервера, то можно получать уведомления об ошибках из файла лога.
Также можно настроить мониторинг по таблице логов по одному из полей батча и alert на отставание от реального времени. При этом каждую партицию топика лучше мониторить отдельно.
Можно настроить мониторинг лагов в Grafana через Kafka Exporter.
В коллекторе также есть возможность получать основные значимые метрики для настройки алертов (количество файлов в обработке, нет файлов в течение N минут, нет свежих файлов моложе N минут, файлов скопилось больше N и т.д.). Дополнительно можно включить метрику, возвращающую максимальный лаг из всех партиций в отдельности (коллектор читает лаги из всех партиций топика и возвращает один самый большой).
Коллектор умеет отдавать нужные метрики по http в текстовом формате, который поддерживается Prometheus. Если условия срабатывают, то Prometheus отправляет алерт в AlertManager, который смотрит в свои настройки и решает куда отправить уведомление - на почту, смс или и то и другое. В Prometheus также смотрит Графана и рисует графики для визуального контроля. Отправка sms происходит через sms-шлюз по протоколу smpp.
Проблемы, с которыми мы столкнулись при запуске коллектора
Частой проблемой в наших интеграциях с Kafka было расхождение между заявленной спецификацией формата сообщений и данными, которые реально приходили от поставщика.
Например, на этапе парсинга JSON мы заметили, что в сообщениях поставщик стал передавать список объектов, а не один объект: [{"…], как было согласовано ранее с владельцем данных. Поэтому на этапе внедрения пришлось дорабатывать коллектор – мы добавили в него расширенную диагностику, позволяющую распознавать такие проблемы.
Другой важной проблемой, которая влияла на производительность интеграции в целом, для нас стала постобработка данных на приемнике.
В Oracle через JSON_TABLE мы пытались реализовать сложную логику для извлечения и преобразования значений из необходимых нам атрибутов. Для этого нужно было передавать в БД большие куски JSON. Объем передаваемых данных сильно снижал производительность коллектора из-за роста потребления памяти сервера, а также влиял на скорость передачи трафика по сети.
В итоге разработчики коллектора добавили возможность предварительной обработки данных перед вставкой в БД с помощью подключаемых пользовательских функций на Java. И это сработало. На коллектор мы вынесли большую часть по трансформации данных, параллельно получив дополнительный бонус – снизили нагрузку на БД. Потенциально эта функциональность коллектора решает также проблему, при которой на приемнике в принципе отсутствует возможность реализации сложной логики для постобработки данных.
При старте коллектор компилирует файл .java, загружает полученные классы, и далее на функции этих классов можно ссылаться в конфиге в соответствующих маппингах.
Например, таким образом мы преобразовываем номера телефонов к нужному формату, вычисляем балансы, обрабатывая многомерные списки и т.д. Также можем передавать в функцию, например, массив элементов и взять из него только первый элемент.
Еще одной проблемой стала включенная локальная буферизация данных (предварительная запись в файлы).
Мы решили перенести на коллектор Kafkareader интеграцию, реализованную ранее на Kafka Connect (с объемами ~100-110 млн сообщений в час).
При нагрузочном тестировании выгрузка таких объемов данных в файлы (перед вставкой в БД) замедлила скорость чтения топиков. Проблема решилась отключением опции буферизации данных и развертыванием контейнеров коллектора на резервных серверах. В таких случаях также можно попытаться поиграться с настройкой количества консьюмеров. На каких-то топиках, где формируется меньше всего файлов и которые успевают разбираться коллектором, например, вместо 8 сделать по 4. И соответственно уменьшить количество активных потоков, пишущих в БД.
Все наши последующие интеграции с Kafka имели свои особенности, но в итоге мы решили нашу главную проблему, сократив сроки развертывания интеграций в проме, и привлекая на эти задачи дата инженеров без опыта работы в PySpark и Java.