
Вы слышали о Kafka, MQTT, S3, Iceberg, Trino, PostgreSQL, Redis и Flink? А насколько хорошо вы знаете эти технологии? По каждой из них написаны огромные книги («Kafka: The Definitive Guide», около 800 страниц), и каждый день выходят новые публикации про тонкости.
Эта статья про другое.
Вместо внутренностей движков и законов распределённых систем посмотрим на эти технологии как на кубики LEGO: какую роль каждая из них играет в архитектуре и как они стыкуются друг с другом. Это будет практический туториал: начнём с минимальной конфигурации и постепенно соберём сложную систему. Статью можно просто читать как обзор архитектуры, а можно запускать каждую конфигурацию и изучать её в деталях. Для этого достаточно Git, Git LFS и Docker Compose. Всё запускается в контейнерах. Даже примеры на Java собираются через Docker multi-stage build.
Итак, представим, что по всему земному шару распределены датчики, которые с некоторой периодичностью отправляют свои координаты и температуру. Будем собирать, хранить и анализировать эти показания в потоковом режиме.
Платформа: Kafka, Kafka Connect, Schema Registry, Kafka UI
Для туториала нужен конструктор https://github.com/IoTDataPlatform/constructor. Перед клонированием репозитория установите Git LFS.
git clone https://github.com/IoTDataPlatform/constructor.git cd constructor
Сейчас файлов немного:
tree ├── bricks │ ├── docker-compose.yml │ ├── Dockerfile │ ├── filter_tags.py │ └── fs.tar.gz └── README.md
В директории bricks лежат «кубики» системы и конструктор на Python. В архив fs.tar.gz упакованы исходники, некоторые Java-библиотеки из репозиториев Confluent и JAR-файлы коннекторов. Прошу прощения за размер архива, но так сборка примеров для туториала работает очень быстро. Да и вообще, некоторые JAR невозможно скачать напрямую из РФ из-за взаимных блокировок. Запускаем конструктор:
(cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg)
Как работает конструктор
Код конструктора находится в bricks, включая его docker-compose.yml. Чтобы не прыгать между директориями, запускаем (cd bricks && ...). Так после выполнения снова окажемся в директории constructor. docker compose run --rm запускает одноразовый контейнер для сервиса lg (от слова LEGO). В контейнере запустится скрипт filter_tags.py, который прочитает параметры командной строки и добавит нужные для заданной конфигурации директории и файлы. А чтобы эти файлы принадлежали вам, задаём --user.
Можно сделать это через Makefile, но лично я люблю видеть, что запускаю.
Dockerfile и docker-compose.yml конструктора простые.
FROM python:3.12-alpine WORKDIR /app COPY filter_tags.py /app/filter_tags.py ADD fs.tar.gz /app/fs ENTRYPOINT ["python", "filter_tags.py"]
services: lg: build: context: . volumes: - ..:/work
Конструктор добавил новые файлы и директории — это и есть код системы:
tree ├── docker-compose.yml ├── kafka-connect │ ├── connectors-pack.tar.gz │ ├── Dockerfile │ ├── init │ │ └── init.sh │ └── sources
Центральный файл docker-compose.yml описывает запуск компонентов:
Kafka, распределённая потоковая платформа;
Kafka Connect, сервис переноса данных между Kafka и другими системами;
Confluent Schema Registry, реестр схем сообщений;
Kafka UI, веб-интерфейс для работы со всеми этими компонентами.
Назначение остальных файлов изучим по ходу дела. А пока запустим:
docker compose up -d --build
После старта всех сервисов по ссылке http://localhost:8080 откроется такой интерфейс Kafka UI:

Ingest: пуллинг через Kafka Connect

Сейчас в нашей системе нет данных. Будем это исправлять, добавляя компоненты. По зоне ответственности компоненты можно разделить на четыре группы. Ingest доставляет данные из внешнего мира в систему. Storage хранит их персистентно. Transform преобразует данные в удобные производные. А Serve делает их доступными для внешних систем.
Подтянем данные в нашу систему методом polling, как будто сама система периодически опрашивает датчики температуры. Реальных сенсоров у нас нет, поэтому будем генерировать синтетические показатели. Откатим, пересоберём конструктор и запустим систему:
docker compose down -v git clean -fd (cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg --tags SOURCE) docker compose up -d --build
Немного деталей
docker compose down -v гасит все запущенные контейнеры и удаляет volumes, так новая конфигурация стартует без старых данных.
git clean -fd возвращает директорию constructor в состояние, как будто проект только что склонировали.
Третью команду уже разбирали, только сейчас добавился тег SOURCE, так мы сообщаем конструктору добавить в конфигурацию источник данных.
Четвёртой командой запускаем новую конфигурацию.
Сразу же идём на Kafka UI http://localhost:8080 и переходим в раздел Topics:

Думая о данных, мы обычно представляем таблицы и связи между ними. Таблицы отражают состояние объектов. В потоковых системах фокус смещается с состояния на события — моменты времени, когда что-то произошло или изменилось.
Так что же у нас вместо таблиц? В Kafka базовая абстракция хранения и обработки данных называется топиком. Через топики приложения обмениваются данными. Вместо строк или записей в классических базах данных топики содержат сообщения. Новые сообщения дописываются в конец лога и после этого уже не изменяются. Топики, названия которых начинаются с двух подчёркиваний, являются внутренними системными. Их мы изучать не будем. Сейчас нас интересует топик earth-temp. Заходим в него и раскрываем первое сообщение в списке:

Key — идентификатор датчика. Value выглядит как обычный JSON.
{ "sensorId": {"string": "zzsvd7rr3rdo2p8ybjwguvqex"}, "latitude": {"double": 16.17987628543722}, "longitude": {"double": 25.77224771279677}, "timestamp": {"long": "2026-04-12T03:35:27.035Z"}, "temperatureF": {"double": 63.6} }
Однако обратите внимание, что размер value всего лишь 66 байт. Но длина в символах явно больше! Это потому, что сообщение закодировано в Avro, эффективном бинарном формате. Kafka безразлично, что именно лежит внутри сообщения. Для неё что ключ, что значение всего лишь последовательность байтов, которые могут содержать JSON, CSV, HTML, бинарные данные и что угодно ещё. Именно взаимодействующие приложения должны договориться о формате (контракте). Но если это просто массив байтов, который даже не содержит названий полей, то как Kafka UI отображает сообщения в читаемом виде? Чтобы декодировать Avro, нужна схема, с помощью которой это сообщение было сериализовано, так называемая writer schema. А эта схема должна где-то храниться. Вспоминаем про Schema Registry и открываем его на UI:

Там лежит схема сообщений для топика earth-temp. Сообщения хранят schema ID, что позволяет приложениям правильно декодировать данные из массива байтов. Обратите внимание также на свойство Compatibility BACKWARD. Реестр не просто хранит схемы сообщений, но и проверяет их на совместимость. Сейчас в эту тему углубляться не будем.
Мы посмотрели на сообщения, посмотрели на их схему, но откуда сообщения появляются? Заходим в раздел Kafka Connect:

Приложения, которые пишут данные в Kafka, называются producer'ами. Сейчас таким продюсером является коннектор типа SOURCE. По легенде он опрашивает датчики и записывает их показания. По мере развития системы появятся и другие коннекторы, а через этот UI можно за ними наблюдать, менять конфигурацию и перезапускать:

Чтобы всё это заработало, конструктор добавил такие файлы:
tree ├── docker-compose.yml └── kafka-connect ├── connectors-pack.tar.gz ├── Dockerfile ├── init │ ├── earth-temp-connect-source.json │ └── init.sh └── sources └── temp-source-connector ├── build.gradle ├── settings.gradle └── src └── main ├── java │ └── dev │ └── miron │ └── connect │ └── temp │ ├── TempSourceConnector.java │ └── TempSourceTask.java └── resources └── META-INF └── services └── org.apache.kafka.connect.connector.Connector
Плагин для Kafka Connect, который генерирует температуру и координаты, реализован на Java в директории sources. Сборка происходит в Docker в multi-stage build:
FROM gradle:8.7-jdk17 AS builder WORKDIR /src COPY sources/temp-source-connector/ ./ RUN gradle --no-daemon clean jar FROM confluentinc/cp-kafka-connect-base:8.1.0 ADD connectors-pack.tar.gz /usr/share/confluent-hub-components/ COPY --from=builder /src/build/libs/temp-source-connector-1.0.0.jar /usr/share/confluent-hub-components/temp-source-connector/
Кроме этого коннектора во время сборки устанавливается и набор стандартных из архива connectors-pack.tar.gz. Их мы будем использовать в следующих разделах статьи. init — самая интересная директория, которая будет пополняться файлами по мере развития системы. Здесь собраны конфигурации всех коннекторов, а скрипт init.sh запускает их через REST API Kafka Connect:
#!/bin/sh set -eu for file in /init/*.json; do [ -e "$file" ] || exit 0 name="$(basename "$file" .json)" echo "Applying connector: $name" curl -fsS -X PUT \ "$CONNECT_URL/connectors/$name/config" \ -H "Content-Type: application/json" \ --data @"$file" echo done echo "Init done"
Сейчас здесь только одна конфигурация:
{ "connector.class": "dev.miron.connect.temp.TempSourceConnector", "tasks.max": "1", "topic": "earth-temp", "sensors": "5", "messages.per.sensor.per.second": "1" }
В этом файле задано, сколько событий генерировать и в какой топик записывать. При старте по земному шару распределяются случайным образом датчики, а потом с заданной периодичностью генерируются показания температуры с учётом удалённости от экватора и времени дня. В docker-compose.yml описан сервис kafka-connect и one-shot connect-init:
kafka-connect: build: context: kafka-connect image: my-cp-kafka-connect:8.1.0 container_name: kafka-connect hostname: kafka-connect connect-init: image: alpine/curl:latest depends_on: kafka-connect: condition: service_healthy volumes: - ./kafka-connect/init:/init:ro environment: CONNECT_URL: http://kafka-connect:8083 entrypoint: ["/bin/sh", "/init/init.sh"] restart: "no"
Storage: реляционная база данных PostgreSQL

Kafka сама может хранить данные с настраиваемым периодом хранения. Но для аналитических запросов она подходит плохо. Добавим PostgreSQL:
docker compose down -v git clean -fd (cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg --tags SOURCE,POSTGRES) docker compose up -d --build
Список коннекторов http://localhost:8080/ui/clusters/local/kafka-connect/connectors?fts=false:

Появился коннектор типа SINK. Его задача — подхватывать сообщения из топика earth-temp и отправлять в базу данных на постоянное хранение. Ещё одно важное изменение можно увидеть на UI в разделе Consumers http://localhost:8080/ui/clusters/local/consumer-groups/connect-earth-temp-postgres-sink:

Consumer — приложение, которое читает сообщения из топиков. Топик в Kafka — это именно лог, а не очередь. Когда consumer прочитывает сообщение, оно из-за этого не удаляется. Другой consumer сможет прочитать то же сообщение ещё раз позднее. Consumer group — это один или несколько процессов, которые делят между собой обработку сообщений в топике. На один топик может существовать несколько таких consumer group, которые выполняют разные задачи. Kafka хранит позицию чтения — offset. Поэтому если какой-нибудь читающий процесс упадёт, после перезагрузки он сможет продолжить с первого необработанного сообщения. Offset фиксируется в результате commit, когда consumer записывает свой прогресс. На странице выше можно отслеживать отставание (Consumer Lag) от конца топика.
Теперь структура такая:
tree ├── kafka-connect │ ├── connectors-pack.tar.gz │ ├── Dockerfile │ ├── init │ │ ├── earth-temp-connect-source.json │ │ ├── earth-temp-postgres-sink.json │ │ └── init.sh ├── postgres │ └── init │ └── init.sql
В конфигурации коннектора задано, из какого топика и в какую таблицу какой базы писать:
{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "earth-temp", "connection.url": "jdbc:postgresql://postgres:5432/weather", "connection.user": "postgres", "connection.password": "postgres", "table.name.format": "earth_temp", "insert.mode": "insert", "auto.create": "false", "auto.evolve": "false", "quote.sql.identifiers": "always" }
Скрипт создания таблицы:
create table if not exists earth_temp ( id bigserial primary key, "sensorId" text, "latitude" double precision, "longitude" double precision, "timestamp" timestamptz, "temperatureF" double precision, "created_at" timestamptz not null default NOW() );
И сервис в docker-compose.yml:
postgres: image: postgres:16-alpine container_name: postgres hostname: postgres environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: weather ports: - "5432:5432" volumes: - postgres-data:/var/lib/postgresql/data - ./postgres/init:/docker-entrypoint-initdb.d:ro
PostgreSQL здесь играет не только роль Storage, но и роль Serve. Приложения могут запрашивать данные через SQL. Выполним запрос через DataGrip:


Storage: Lakehouse на S3 + Iceberg

Есть и другой распространённый подход к хранению аналитических данных: Data Lake, или озеро данных. В объектном хранилище типа S3 лежат файлы произвольного формата, например, JSON, Parquet, CSV, Avro. А могут быть и вовсе неструктурированными. Такой подход позволяет хранить огромный объём данных. Он также позволяет отделить хранение от обработки. Однако, без контроля Data Lake легко превращается в Data Swamp, или болото данных. Поэтому часто вводят дополнительный уровень со структурой хранения и схемами. Такой подход называется Lakehouse и представляет собой нечто среднее между Data Lake и Data Warehouse. Данные по-прежнему лежат в объектном хранилище, но уже в упорядоченном виде. Вычисления отделены от хранения. Apache Iceberg — открытый табличный формат для такого подхода. Заменяем PostgreSQL на Lakehouse:
docker compose down -v git clean -fd (cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg --tags SOURCE,ICEBERG) docker compose up -d --build
Коннектор теперь другой http://localhost:8080/ui/clusters/local/kafka-connect/connectors?fts=false:

В системе появился сервис MinIO с интерфейсом http://localhost:9001/login:

Это S3-совместимое хранилище, и визуально оно похоже на файловую систему. Объекты хранятся в бакетах — условно логических дисках:

Спускаемся earth/temp_raw/data/timestamp_day=2026-04-12/ и видим Parquet-файлы:

Колоночный формат Parquet хорошо подходит для аналитических данных. Можно хранить широкие денормализированные таблицы, а потом быстро считывать только нужные для аналитического запроса колонки. Такая оптимизация называется projection. Формат значительно отличается от B-деревьев, которые используют в реляционных базах данных, которые лучше подходят для транзакционной нагрузки.
Файлы лежат в директории timestamp_day=2026-04-12. Формально это не совсем корректно. Хранилища типа S3 хранят данные как пары ключ-значение. Корректнее говорить не о пути к файлу, а о префиксе ключа объекта. Не будем жертвовать понятностью ради формальной точности. Можно сказать, что данные партиционированы по дате события. Так, если нам нужна аналитика за определённый период, мы быстро выберем нужные данные. Такая оптимизация называется partition pruning.

По пути к данным мы пропустили "директорию" metadata. Именно метаданные делают Iceberg табличным форматом. Зачем вообще понадобился этот слой? Мы могли бы просто хранить Parquet-файлы в бакетах. Можно было бы дописывать данные в эти файлы или просто добавлять новые. Но если задуматься над тем, как организовать удаление записей, частичные обновления, что делать во время поломки сети с незавершёнными записями, то всё становится сложнее. А если учесть, что нам надо поддержать конкурентную запись разными системами, то необходимость табличного слоя становится очевидной. Iceberg определяет то, как множество файлов объединяется в одну логическую таблицу.

Serve: федеративный движок запросов Trino

Для работы с данными и метаданными Iceberg есть специальные библиотеки. Но не будем же мы писать низкоуровневый код на Java для каждого аналитического запроса? С PostgreSQL было явно удобнее. А если запросы окажутся тяжёлыми? Нам придётся писать код для распределённого исполнения? К счастью, для Lakehouse уже есть распределённые SQL-движки. Помните, мы говорили про разделение storage и compute. В нашей конфигурации Trino и будет compute-частью этого разделения. Он федеративный, потому что может выполнять запросы не только к Iceberg, но и ко многим другим системам хранения, даже в рамках одного запроса.
docker compose down -v git clean -fd (cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg --tags SOURCE,ICEBERG,TRINO) docker compose up -d --build
Подключимся к нему из DataGrip и выполним запрос:


В итоге схема такая: данные лежат в объектном хранилище в формате Parquet, Iceberg держит над ними метаданные табличного слоя, Trino читает таблицу, а снаружи всё выглядит как обычный SQL.
Код системы организован сейчас так:
tree ├── docker-compose.yml ├── kafka-connect │ ├── connectors-pack.tar.gz │ ├── Dockerfile │ ├── init │ │ ├── earth-temp-connect-source.json │ │ ├── earth-temp-iceberg-sink.json │ │ └── init.sh │ └── sources │ └── temp-source-connector │ ├── ... └── trino └── catalog └── iceberg.properties
Ничего особо нового. Всё те же коннекторы и настройки Trino.
Transform: обогащение сообщений с помощью Kafka Streams

До этого момента мы просто передавали данные между системами. В потоковой обработке (stream processing) сообщения обрабатываются по мере их поступления. Накопления, как в batch processing, здесь не происходит. Потоковую обработку можно разделить на два вида: без состояния (stateless) и с состоянием (stateful).
В нашем примере для аналитики полезно иметь в сообщениях температуру в градусах Цельсия, часовой пояс, локальное время и индекс H3. Для такого обогащения явно не нужно никакое состояние, так как каждое событие не зависит от других. Это трансформация типа map. Реализовать её можно несколькими способами. Например, можно написать программу на Java и через низкоуровневый Kafka API: в цикле вычитывать сообщения из топика, формировать новые расширенные сообщения и отправлять их в другой топик. Но удобнее взять более высокоуровневую библиотеку Kafka Streams. С её помощью такие программы можно писать в относительно декларативном стиле.
docker compose down -v git clean -fd (cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg --tags SOURCE,ICEBERG,TRINO,STREAMS)
Появилась директория с кодом:
tree ├── streams │ ├── Dockerfile │ ├── repo-confluent.tar.gz │ ├── sources │ │ └── temp-enricher │ │ ├── build.gradle │ │ ├── settings.gradle │ │ └── src │ │ └── main │ │ └── java │ │ └── dev │ │ └── miron │ │ └── streams │ │ └── temp │ │ └── TempEnricherApp.java │ └── temp-enricher.properties
Отрывок кода в относительно декларативном стиле:
StreamsBuilder builder = new StreamsBuilder(); builder.stream(inputTopic, Consumed.with(keySerde, valueSerde)) .mapValues(value -> enrich(value, h3Resolution)) .to(outputTopic, Produced.with(keySerde, valueSerde));
И ещё примерно 300 строк написаны уже не в декларативном стиле.
Запустим:
docker compose up -d --build
И получим ещё один топик earth-temp-enriched http://localhost:8080/ui/clusters/local/all-topics/earth-temp-enriched/messages?valueSerde=SchemaRegistry&limit=100&mode=LATEST

Этот топик мы тоже через коннектор перегоняем в таблицу Iceberg. Запрос к ней:

Transform: аналитический процессинг на Flink

В прошлой части подход получился не таким уж декларативным. А если бы пришлось реализовывать более сложную трансформацию с сохранением состояния, учётом течения времени и запаздывающих событий, то было бы ещё сложнее. Flink позволяет реализовывать сложные трансформации на языке SQL. Более декларативный подход придумать трудно.
docker compose down -v git clean -fd (cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg --tags SOURCE,FLINK)
В коде системы появились сервисы для Flink и скрипт трансформации:
tree ├── flink │ ├── Dockerfile │ ├── flink-libs.tar.gz │ └── init │ └── temp-avg-1min.sql
В этой трансформации вычисляются средние значения по минутным окнам. А код в полностью декларативном стиле описывает это: подключение к топику, отбор сообщений с полной информацией, расчёт средних по минутным окнам и запись результата во второй топик:
CREATE TABLE temp_raw ( sensorId STRING NULL, `timestamp` TIMESTAMP(3) NULL, latitude DOUBLE NULL, longitude DOUBLE NULL, temperatureF DOUBLE NULL, WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'earth-temp', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'flink-temp', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'earliest', 'format' = 'avro-confluent', 'avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:8081' ); CREATE VIEW temp_strict AS SELECT sensorId, `timestamp`, latitude, longitude, temperatureF FROM temp_raw WHERE sensorId IS NOT NULL AND `timestamp` IS NOT NULL AND latitude IS NOT NULL AND longitude IS NOT NULL AND temperatureF IS NOT NULL; CREATE TABLE temp_avg ( sensorId STRING NOT NULL, window_start TIMESTAMP(3) NOT NULL, window_end TIMESTAMP(3) NOT NULL, latitude DOUBLE NOT NULL, longitude DOUBLE NOT NULL, temperatureF DOUBLE NOT NULL ) WITH ( 'connector' = 'kafka', 'topic' = 'earth-temp-avg', 'properties.bootstrap.servers' = 'kafka:9092', 'key.format' = 'raw', 'key.fields' = 'sensorId', 'format' = 'avro-confluent', 'avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:8081' ); INSERT INTO temp_avg SELECT sensorId, window_start, window_end, AVG(latitude) AS latitude, AVG(longitude) AS longitude, AVG(temperatureF) AS temperatureF FROM TABLE( TUMBLE(TABLE temp_strict, DESCRIPTOR(`timestamp`), INTERVAL '1' MINUTE) ) GROUP BY sensorId, window_start, window_end;
Запустим:
docker compose up -d --build
И через некоторое время появится топик earth-temp-avg с такими сообщениями:

{ "sensorId": "ca67911wocgu23dp169a7z9ta", "window_start": "2026-04-12T07:06:00Z", "window_end": "2026-04-12T07:07:00Z", "latitude": 0.7856400886251921, "longitude": 113.54167794725345, "temperatureF": 83.80999999999999 }
Кластером Flink можно управлять здесь http://localhost:8180:

В отличие от приложения на Kafka Streams, Flink является полноценной распределённой системой для обработки потоков данных.
Serve: кэш в памяти на Redis

На прошлом шаге мы обошлись без персистентного хранилища. Из кубиков LEGO можно сложить и такую архитектуру. Но для завершённости не хватает компонента Serve, чтобы приложения могли получить актуальные данные о средних температурах. Для дашборда, например, нам не нужна история, а нужны данные, которые отражают ситуацию здесь и сейчас. Добавляем компонент Redis:
docker compose down -v git clean -fd (cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg --tags SOURCE,FLINK,REDIS)
Redis будет хранить для датчиков их последние средние значения в памяти. На этом примере хорошо видна отличительная особенность потоковых систем. В классическом случае мы сохраняли бы все показания в БД, а при запросе пользователя отправляли бы туда SQL-запрос. В случае потоковой системы каждое поступающее значение сразу же участвует в расчёте, и как только минутный интервал заканчивается, результат вычисления отправляется дальше по потоку. В нашем случае — в Redis.
При такой архитектуре можно быстро менять SQL во Flink или добавлять новые расчёты и сразу получать в Redis новые данные, на которых можно строить обновлённые или новые отчёты. С персистентным хранилищем пришлось бы решать, что делать с уже накопленными данными: удалять их или пересчитывать их. При этом нельзя сказать, что история пропадает. Kafka всё равно сохраняет лог сообщений с настраиваемой глубиной хранения. И при пересчёте отчёта их можно учесть.
Реализовано всё так же. Дополнительный коннектор и сервис в docker-compose.yml. Запустим систему:
docker compose up -d --build
И подключимся к Redis через DataGrip:

Посмотрим доступные ключи и их значения:

Ingest: брокер сообщений для устройств MQTT

Опрос датчиков — сомнительный вариант ingest-модели pull. Обычно датчики сами отправляют свою телеметрию — это ingest-модель push. Теоретически датчик мог бы выступить как producer и отправлять сообщения напрямую в Kafka. Однако протокол Kafka слишком тяжёл для устройств и требователен к качеству связи. Для IoT чаще используют более простой протокол MQTT. Сервисы, которые реализуют этот протокол, называют MQTT-брокерами. В отличие от Kafka они не рассчитаны на надёжное хранение, распределённую обработку сообщений и возможность повторного чтения. Их задача проще: принять сообщение от устройства и доставить его подписчикам (модель publisher/subscriber). Зато с такими брокерами датчики могут работать в условиях плохой связи, малого объёма трафика и жёстких ограничений по энергии. Связка MQTT и Kafka получается устойчивой и практичной. Попросим конструктор добавить MQTT-брокер и эмулятор датчиков, а потом сразу всё запустим:
docker compose down -v git clean -fd (cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg --tags MQTT,FLINK,REDIS) docker compose up -d --build
Конструктор добавил MQTT-коннектор и код эмулятора на языке Python:
tree ├── kafka-connect │ ├── connectors-pack.tar.gz │ ├── Dockerfile │ ├── init │ │ ├── earth-temp-avg-redis-sink.json │ │ ├── earth-temp-mqtt-source.json │ │ ├── earth-temp-redis-sink.json │ │ └── init.sh │ └── sources └── mqtt ├── mosquitto.conf └── temp-sensor-emulator ├── app.py ├── Dockerfile └── requirements.txt
В docker-compose.yml кроме MQTT-брокера eclipse-mosquitto и эмулятора датчиков был добавлен и UI, доступный по адресу http://localhost:8086:

Подключимся к брокеру:

Датчики отправляют сообщения в топики MQTT. Чтобы их получать, надо на них подписаться. В нашем случае каждый датчик отправляет сообщения в свой собственный топик с именем sensors/{sensorId}/telemetry. Подпишемся на все топики такого вида:

Теперь в UI видны все поступающие сообщения:

Всё вместе

А теперь запустим все системы:
docker compose down -v git clean -fd (cd bricks && docker compose run --rm --user "$(id -u):$(id -g)" lg --tags MQTT,SOURCE,STREAMS,POSTGRES,ICEBERG,TRINO,FLINK,REDIS) docker compose up -d --build
Вся интеграция завязана на коннекторы, поэтому кубики LEGO так легко переставлять местами:
