Микросервисная архитектура позволяет разрабатывать высоконагруженные, распределенные и гибкие приложения. Но цена разработки таких систем очень высока, и решая выше указанные проблемы, разработчики сталкиваются с другими проблемами, которых либо нет в монолитных приложениях, либо они не так сильно в них проявляются.
Основные проблемы микросервесных приложений:
Сложный обмен данными между сервисами.
Сбор и отдача данных из разных сервисов с агрегацией, фильтрацией, пагинацией и прочей обработкой.
Консистентность данных между сервисами.
Инвалидация кэша в bff-сервисах, при изменении данных в core-сервисах.
Интеграция с внешними системами в части обмена большого количества данных.
Решать выше перечисленные проблемы можно разными способами. В своей работе в компании Greensight в качестве senior backend developer при разработке заказных проектов на базе Open Source платформы Ensi, я с коллегами перепробовал множество решений.
Данная статья описывает практическое использование Kafka в микросервисных приложениях для решения этих проблем.
Кейс 0. Шина данных

Самый распространенный способ взаимодействия между сервисами в части передачи данных это REST API. Для того чтобы из сервиса A (например, PIM (Product Information Management System) – сервис хранения и обработки информации о товарах) передать информацию (например, данные по товару) в сервис B (например, сервис Offers – сервис формирования торговых предложений) и сервис C (например, сервис Feed – сервис для выгрузки и передачи данных в нужном формате и семантике для сторонних сервисов (рекламных систем, маркетплейсов)) сервису A нужно вызывать разные методы API этих сервисов.
Так, например, при создании товара в сервисе A (PIM) нужно вызвать методы POST /offers
в сервисе B (Offers), чтобы создать соответствующее товару торговое предложение, и POST /products
в сервисе C (Feed), чтобы сохранить в БД Feed информацию по товару для последующей выгрузки во внешние системы. При изменении данных по товару в сервисе A (PIM), чтобы изменения применить и в других сервисах, нужно вызвать метод PATCH /products/{id}
в сервисе C (Feed) (а до этого по хорошему нужно как-то узнать id товара в сервисе Feed, либо реализовать в сервисе Feed метод обновления данных по товару по id из сервиса PIM). При удалении товара в сервисе A (PIM) для удаления связанных с ним данных в других сервисах нужно вызвать методы DELETE /offers/{id}
в сервисе B (Offers) (тут тоже придется заморочиться с id оффера) и DELETE /products/{id}
в сервисе C (Feed) (и тут опять проблема с id товара в сервисе Feed).
Получается необходимо реализовать непростое взаимодействие между сервисами A, B и C для каждого случая, тем самым сделав сильную связанность между сервисами. А хорошее микросервисное приложение наоборот должно обладать слабой связанностью между сервисами, обеспечивая таким образом следующие преимущества:
Гибкость и масштабируемость. Простота добавления новых и удаление уже существующих сервисов без влияния на работу всей системы.
Надежность и отказоустойчивость всей системы. Исключение падения и деградации других сервисов системы по цепочке между собой в случае нестабильной работы одного из её сервисов.
Независимая разработка и развертывание сервисов. Каждый сервис может разрабатываться, тестироваться и развертываться своей командой, не мешая работе других команд.
Более простая поддержка и эволюция. Изменения в одном сервисе проще вносить, так как они не требуют переделки и тестирования всей системы.
Улучшенная читаемость и сопровождаемость. Код сервисов становится более понятным и удобным для поддержки, так как он не зависит от других сервисов.
В качестве решения данной проблемы, если сервису A не нужны результаты обработки данных от сервисов B и C (т.е. взаимодействие между сервисами A, B и C может быть асинхронным), можно использовать Kafka как шину данных, через которую и будет происходить все взаимодействие.
Для этого в Kafka для каждой сущности создается топик (далее топик сущности), в который записываются все события создания, изменения и удаления экземпляров сущности. Структура сообщения топика может быть следующая:
{
"dirty": [
"field_1",
"field_2"
],
"original": {
"field_1": "field_1_value",
"field_2": "field_2_value"
},
"attributes": {
"id": 141504,
"field_1": "field_1_value",
"field_2": "field_2_value",
"field_3": "field_3_value",
...,
"field_N": "field_N_value"
},
"event": "create|update|delete"
}
Описание полей сообщения:
dirty
– массив полей сущности, которые изменились; пустой при создании и удалении сущности;original
– массив значений полей сущности, которые изменились; пустой при создании и удалении сущности;attributes
– массив всех полей сущности;-
event
– событие сущности:create
– создание;update
– изменение;delete
– удаление.
Все события, связанные с сущностью, регистрируются в топике Kafka сервисом-источником: в нашем случае это сервис A (PIM). Остальные сервисы (сервисы B (Offers) и С (Feed)), которым нужна информация о событиях сущности подписываются на сообщения из этого топика, и по своему усмотрению обрабатывают их. Сервис A ничего не знает от том, какие сервисы обрабатывают события из Kafka, не знает их внутреннее устройство и т.д. Таким образом обеспечивается слабая связанность между сервисами и решается проблема передачи данных между ними. В терминах паттернов программирования в данном случае реализуется паттерн Наблюдатель.

Все последующие кейсы по-факту являются частными случаями использования Kafka как шины данных. Но они выделены в отдельные кейсы в разрезе проблем, которые они решают.
Кейс 1. Денормализация данных
Одним из преимуществ микросервисных приложений является распределенное хранение данных между сервисами. Каждый сервисы отвечает за хранение данных из своей независимой предметной области. Отдача этих сгруппированных данных по одной предметной области очень хорошо ложится на структуру разделов административной панели системы, где 1 раздел ~ 1 сервис, и на одной странице админки обычно управляются данные по одной сущности этого сервиса.
Но это же преимущество с другой стороны является и проблемой. Для пользовательской части приложения (например, витрины интернет-магазина или маркетплейса) или для BI-систем необходимо на одной странице отдавать данные из разных предметных областей, а соответственно и сервисов. Задача усложняется, если требуется предварительная обработка этих данных в совокупности.
Например, приложение состоит из следующих сервисов:
PIM (Product Information Management System) – core-сервис хранения и обработки информации о товарах;
Offers – core-сервис формирования торговых предложений;
BU (Business Units) – core-сервис для хранения и управления информацией о складах и организациях;
CMS (Content Management System) – сервис для централизованного управления информационным контентом, SEO-оптимизации, управления меню и навигацией;
Feed – сервис для выгрузки и передачи данных в нужном формате и семантике для сторонних сервисов (рекламных систем, маркетплейсов);
Catalog Cache – об этом сервисе будет рассказано чуть позже;
Customer API Gateway – bff-сервис для витрины интернет-магазина;
Customer GUI Frontend – фронтенд-сервис витрины.

В этом приложении необходимо решить следующие задачи:
Вывести каталог товаров на витрину.
Сгенерировать sitemap с данными по товарам для поисковых систем.
Сформировать фиды для отдачи данных по товарам на маркетплейсы.
Каталог товаров – это набор торговых предложений (товар + бренд + категория + цена + сток + магазин/склад), которые для простоты дальше мы будем называть просто товарами. На витрину, в sitemap и фиды должны попадать только активные товары, у которых есть сток и цена. Также на витрине покупатель может отфильтровать товары по категории и бренду, и данные по товарам выводятся не все сразу, а постранично. В sitemap и фиды же необходимо отдать сразу все данные по товарам.
В классической схеме для решение поставленных задач сервисам Customer API Gateway, CMS и Feed пришлось бы отправить следующую цепочку запросов и выполнить определенный набор действий:
Из Offers получить все офферы, у которых есть стоки и цены.
Из PIM для полученных офферов получить данные по активным товарам, их категориям и брендам. Если каталог товаров выводится на витрине, и покупатель хочет получить товары только определённой категории или/и бренда, то необходимо офильтровать товары по ним.
Из BU по id найденных магазинов запросить подробную информация по ним (название, адрес и т.д.).
Если каталог выводится на витрине, то необходимо отдать только необходимое количество офферов для текущей страницы.
В случае каталога с сотнями тысяч или миллионами товаров, задача становится еще сложнее: реализовать фильтрацию данных по данным из разных сервисов без денормализации данных невозможно. Для хранения таких денормализованных данных по каталогу товаров и нужен сервис Catalog Cache. По-факту это своего рода монолит, в котором хранятся заранее собранные и подготовленные данные из core-сервисов. И Customer API Gateway вместо отправки нескольких запросов в core-сервисы Offers, PIM и BU, отправляется 1 запрос в сервис Catalog Cache. Но как же наполнить и поддерживать БД сервиса Catalog Cache в актуальном состоянии?
Для первичного наполнения БД Catalog Cache данными из БД core-сервисов в сервисе Catalog Cache реализуется миграции: под капотом они просто собирают все необходимые данные из core-сервисов по REST API. Для последующего поддержания данных БД Catalog Cache в актуальном состоянии, на помощь приходит Kafka. По кейсу 0 Kafka используется как шина данных, и с помощью неё новые, измененные и удаленные данные отражаются в БД Catalog Cache.
Аналогично через первичную миграцию и Kafka нужные данные из core-сервисов попадают с сервисы CMS и Feed. А затем дальше используются для генерации sitemap и фидов.
Кейс 2. Консистентность данных
Проблемы консистентности данных очень важна при разработке приложений. Консистентность данных (или согласованность данных) – это свойство данных, которое означает, что данные во всех системах и в разные моменты времени остаются непротиворечивыми и согласованными друг с другом. Другими словами, данные должны быть актуальными, точными и соответствовать друг другу, чтобы избежать ошибок и непредсказуемого поведения. Более подробно, консистентность данных включает в себя:
Целостность данных. Данные не содержат ошибок и неполноты.
Непротиворечивость данных. Данные во всех системах и в разные моменты времени соответствуют и не противоречат друг другу.
Актуальность данных. Данные отражают текущее состояние системы и не являются устаревшими.
При разработке монолитных приложений проблема консистентности данных решается на уровне БД с помощью первичных и внешних ключей, уникальных индексов и constraints. У микросервисных приложений дела обстоят намного сложнее: у каждого сервиса своя БД, причем БД могут быть разными по СУБД, а также абсолютно разными по типу (реляционная, иерархическая, колоночная и т.д.). Поэтому переложить все ответственность за консистентность данных на БД не получится.
Самым простым способом обеспечение консистентности данных в микросервисных приложениях является REST API. Когда в сервисе-источнике, например PIM, создается, изменяется или удаляется сущность (например, товар), из этого сервиса вызывается нужные метод в другом сервисе, например, Offers. И в зависимости от события создается изменяется или удаляется связанная с ним сущность (например, торговое предложение). Но при таком подходе обеспечение консистентности данных в разных сервисах становится очень трудозатратной, а также опять нарушается главный принцип слабой связанности между сервисами.
Использование Kafka позволяет решить и проблему консистентности данных. Через топики сущностей все изменения по данным из core-сервисов разливаются по сервисам-потребителям, список которых не известен сервису-источнику. И каждый сервис-потребитель сам решает каким образом обеспечить консистентность данных внутри себя.

Кейс 3. Инвалидация кэша
Для реализации высоконагруженных приложений используются разные подходы и технологии. Одним из таких подходов является кэширование данных. Кэширование данных – это процесс сохранения копий данных в быстром хранилище, называемом кэшем, для обеспечения более быстрого доступа к ним в будущем. Кэш находится ближе к месту использования данных, чем их исходное хранилище, что сокращает время доступа.
Вместо того, чтобы каждый раз получать данные из исходного источника (например, базы данных или удаленного сервера), приложение или система обращается к кэшу. Если нужные данные есть в кэше (попадание кэша), они возвращаются немедленно, что значительно быстрее, чем получение из исходного источника. Если данных в кэше нет (промах кэша), они загружаются из исходного источника и сохраняются в кэше для последующего использования.
Но кэш не панацея, и как любое решение помимо достоинств имеет и недостатки. Главным недостатком кэша является его актуальность: если данные изменились в сервисе-источнике, то кэш должен быть сброшен (инвалидирован). Самая простая реализация сброса кэша это установка TTL (Time to Life, время жизни) кэша: по истечению указанного времени кэш удаляется, и создается заново с актуальными данными. Но не всегда решение с TTL может подойти бизнесу: для важных данных, например, остатки или цены для товара, сброс кэша должен происходить моментально.
Опять самым простым, но костыльным, способом для сброса кэша, является использование REST API. Так, например, есть сервис CMS с данными по баннерам, контентным страницам, меню, акциями и т.д. Эти данные через Customer API Gateway отдаются на витрину, предварительно сохраняясь в кэш, и извлекаясь оттуда при повторном запросе. Если исходные данные меняется в сервисе CMS, он может отправить запрос по api в сервис Customer API Gateway на endpoint по сбросу кэша. Но таким образом опять же усложняется разработка обоих сервисов и нарушается принцип слабой связанности.
Задачу инвалидации кэша тоже можно решить через Kafka. Либо как в случае кейсов 1 и 2 использовать топики сущностей. Либо сделать отдельный единый топик content-cache с хранением информации о необходимости сброса кэша для всех необходимых сущностей. Подход с единым топиком content-cache хорош тем, что сервису-потребителю не нужно слушать множество топиков, а каждый слушатель в kuber это отдельный под, который потребляет ресурсы сервера. Да и для сброса кэша сервису не нужны все данные о сущности: достаточно типа сущности, ее id и события, которое с ней произошло (создание, изменение или удалением), чтобы сбросить только нужные ключи кэша для сущности. Обычно для сущности кэшируются все ее постранички, а также все её деталки (для них и нужен id сущности).

Кейс 4. Интеграция с внешними системами
Еще одной задачей при разработке микросервисных, да и монолитных приложений тоже, является интеграция с внешними системами. Обычно она решается как и все остальные кейсы с помощью REST API. Реже, но все равно встречаются способы интеграции с помощью файлов или общей БД (ну это совсем редкий и извращенный кейс).
Задача осложняется, когда объем данных для обмена очень большой. Необходимо обеспечить следующие аспекты интеграции:
Быстрота получения/отдачи данных. Время получение и отдачи данных должно быть приемлемым для обеих систем.
Масштабируемость объема данных. Способность обрабатывать возрастающие объемы данных и нагрузки.
Стабильность и гарантированность получения/отдачи данных. Обеспечение непрерывной работы системы, минимизация рисков сбоев, а также гарантия передачи данных от источника к приемнику.
Исключить деградацию нашей системы при деградации внешней системы. Тормоза, недоступность и прочие проблемы, которые могут возникнуть на стороне внешней системы, не должны отражаться на доступности и работоспособности нашей системы.
Безопасность данных. Защита данных от несанкционированного доступа.
Все указанные аспекты можно успешно решить с помощью REST API. Для этого для взаимодействия нашего приложения с внешней системой создается сервис-коннектор, который предоставляет api для внешней системе, либо сам обращается к внешней системе. Также на сервис-коннектор ложится ответственность за преобразование отдаваемых или получаемых данных в/из внешней системы, вопросы безопасности данных (например, аутентификация входящих запросов от внешней системы). Также вопросы диспетчеризация данных находятся в зоне ответственности сервиса-коннектора. И вот тут для взаимодействия с core-сервисами может либо также использоваться REST API, либо Kafka.
В случае, когда нужно отдать данные во внешнюю систему, можно обойтись только использованием REST API между core-сервисами и сервисом-коннектором. А вот в случае получения данных из внешней системы, для того чтобы не нагружать api core-сервисов запросами на создание/обновление/удаление данных, лучше использовать Kafka. Подход с использованием Kafka позволит не отбирать доступные воркеры сервиса, которые также используются для обработки пользовательских запросов. Также передача данных через Kafka повышает стабильность и гарантированность получения данных от внешних систем: данные копятся в топике(ах) Kafka и постепенно обрабатываются слушателями core-сервисов. Если core-сервис по какой-то причине не смог обработать сообщение из топика, он сделает это повторно необходимое количество раз.

Заключение
Kafka очень гибкий и мощный инструмент при разработке приложений, особенно микросервисных. С помощью неё можно решить множество фундаментальных проблем:
Сделать разработку проще, обеспечив слабую связанность между сервисами, но в то же время сохранив консистентность данных между ними.
Решить проблему денормализации данных, тем самым увеличив скорость работы приложения.
Обеспечить валидность кэша, решив проблему актуальности данных.
Реализовать стабильный и гарантированный обмен данными с внешними системами.
bereza_evgenij Автор
Друзья, это моя первая проба пера на этом сайте. Совсем забыл в заключении написать, что если вы знаете еще какие-то интересные и полезные кейсы по применению Kafka в микросервисной архитектуре, то поделитесь, пожалуйста, своим опытом :)