Для тех, кому хочется сразу посмотреть код: репозиторий сервиса — в конце текста.


Откуда задача

Нужен сервис, который централизованно выполняет исходящие HTTP-запросы для экосистемы микросервисов и интеграций. Постановка на уровне требований:

  1. Два режима входа — и синхронный (ответ нужен вызывающей стороне), и асинхронный (достаточно принять задачу и отдать результат «куда-то ещё»).

  2. Два канала постановки — удобно и через HTTP API, и напрямую в Kafka (без лишнего hop через HTTP).

  3. Rate limit — защита квот и предсказуемое поведение при 429 со стороны внешних API.

  4. Кеш ответов — снижение нагрузки на внешние системы и наши же ресурсы.

  5. Строгий порядок там, где он важен — если указан ключ партиции в «ordered»-режиме, сообщения по этому ключу не должны перемешиваться.

  6. Масштаб по числу ключей — сотни тысяч и миллионы логических партиций при ограниченном числе воркеров; одна «тяжёлая» партиция не должна блокировать остальные.

  7. Ретраи и отложенные сообщения — экспоненциальные паузы, очередь «на завтра», планирование на произвольный горизонт.

  8. Единая точка наблюдаемости по действиям — желательно, чтобы «что произошло с задачей» оставалось в журнале событий (логе), а не только в памяти воркера.

Ни одна «одна технология» не закрывает это без слоёв. Сначала — почему в стеке именно Kafka, PostgreSQL и Redis; дальше — как мы спроектировали сервис Requester: контекст, контракты запроса/ответа, движение данных, внутренние воркеры, graceful shutdown, детали rate limit / retry / cache / отложенных задач, wake-up, тестирование и узкое место с большими payload в Redis.


Выбор стека

Асинхронный режим — это в первую очередь очередь: задачи копятся и обрабатываются не в момент вызова API, а позже, конкурируя за пул воркеров. Притом нам нужен строгий порядок там, где бизнес этого требует (по логическому ключу), и при этом журналируемый вход/выход для интеграций. Для такой модели Kafka — естественный выбор: распределённый append-only лог, топики, партиции, consumer groups, долговременное хранение, высокая пропускная способность. Входные и выходные события сервиса мы ведём через Kafka, чтобы единая цепочка «приняли задачу → обработали → отдали ответ» оставалась в брокере.

Отложенные сообщения и ретраи «на часы и дни» нельзя свести только к retention и «переливанию» в рамках одного брокера без надёжного учёта на стороне приложения. Нужна долговечная, транзакционно предсказуемая таблица фактов: когда задача должна снова стать готовой, какая у неё полезная нагрузка, как избежать дублей при гонке воркеров. Для этого оптимальна PostgreSQL: диск, SKIP LOCKED, индексы по scheduled, бэкапы, привычные операции для outbox-шаблона.

Остаётся зазор: ни Kafka, ни PostgreSQL сами по себе удобно не закрывают сценарий «миллионы логических партиций (ключей) + общий пул воркеров + справедливая конкуренция + строгий порядок внутри ключа + откладывание на секунды + ретраи с блокировкой партиции». В consumer group Kafka партиции привязаны к консьюмерам: это не тот же паттерн, что динамическая task-очередь, где воркер в следующий тик берёт следующую готовую задачу из любой логической партиции. Заводить отдельную физическую сущность на каждый бизнес-ключ в Kafka или в RabbitMQ — нереалистично. Outbox в PostgreSQL отлично хранит «запланировано на завтра», но как планировщик следующего тика для сотен тысяч ключей с разным scheduled и приоритетом он не заменяет лёгкий in-memory движок с Lua-атомарностью.

Оптимальный третий слой — Redis и очередь задач на нём: один инстанс (или кластер по мере роста) хранит состояние планировщика — ZSET, блокировки ordered-партиций, идемпотентность, heartbeat воркеров — без миллионов отдельных «очередей» в смысле брокера. Как устроена модель партиция = ordered-порядок / общий пул / reject и отложенные сообщения, rate limit как cooldown партиции, зачем Lua и какие компромиссы по памяти и durability — мы подробно разобрали в отдельной статье: «Очередь на Redis с Lua: порядок в партициях, общий пул воркеров и отложенные сообщения».

Роль smart-redis-queue

В Requester эту роль играет библиотека smart-redis-queue: ordered-партиции с префиксом !, приоритеты, prefetch, Reject/RejectWithDelay, heartbeat и возврат задач при смерти воркера — то есть всё, что в предыдущей статье описано на уровне структур ключей и Lua-скриптов. Requester не дублирует эту механику — он использует очередь как движок планирования: что лежит в payload задачи (полный JSON или ссылка на offset в Kafka) — слой контракта сервиса.


Контекст системы (уровень «ящиков»)

Внешние сервисы могут:

  • вызывать HTTP API Requester;

  • публиковать задачи в топик Kafka in.

Requester выполняет HTTP к целевым URI (или специальный режим без реального HTTP — см. wake up), учитывает лимиты, кеш, порядок и ретраи. Все ответы попадают в топик Kafka out. Дальше потребители забирают результаты сами, либо стоит Redpanda Connect (или аналог), который читает out и по ключам / заголовкам / полям proxyData раскидывает сообщения в нужные топики или очереди целевых сервисов.

Зачем такая «лестница» из компонентов. Kafka даёт append-only журнал: что задача вошла в систему и что из неё вышло — остаётся в топиках. Redis-очередь решает партиционирование и планирование с общим пулом воркеров без заведения миллиона физических очередей. PostgreSQL — долговременный outbox для задач «далеко в будущем» и длинных бэкоффов, чтобы не держать гигантские ZSET и не упираться в модель отложенных сообщений только в Redis.


C4: контейнеры (кратко)

Контейнер

Роль

Requester (процесс)

HTTP API, consumer in, пул воркеров Redis, пул воркеров PostgreSQL→Kafka, consumer out для sync, Kafka producer.

Kafka

Топик in — вход; топик out — выход; единый лог и точка интеграции для Connect.

Redis

Rate limiter, кеш ответов, очередь задач (smart-redis-queue).

PostgreSQL

Таблица outbox: задачи с scheduled в будущем, ретраи с задержкой > 1 с (для не-ordered сценариев).


Контракты: запрос и ответ

Единая форма задачи в HTTP (POST /request) и в Kafka (топик in, value — JSON) совпадает по смыслу: тело = объект с полями request и meta. Ответ, который забирают из топика out (и в режиме sync: true — ещё и в HTTP), — объект response + meta.

request — что выполнить

Поле

Тип / формат

За что отвечает

uri

строка

Целевой URL исходящего HTTP-вызова. Пустая строка — режим wake up: реальный HTTP не делаем, body/headers уходят в ответ «как есть» (см. ниже).

method

строка

HTTP-метод (GET, POST, …).

body

строка

Тело запроса (как правило JSON в виде строки; сервис не парсит схему — это контракт с целевым API).

headers

объект строк

Заголовки к целевому запросу.

meta — политика обработки и идентификаторы

Поле делает сервис гибким, позволяя настраивать стратегии работы сервиса на клиенте который шлет запросы.

Поле

Тип / формат

За что отвечает

requestId

строка, обязателен

Сквозной идентификатор задачи: попадает в out, в ключ Kafka, в meta ответа; по нему маршрутизируют downstream и в sync ждут результат в Hub.

traceparent

строка, опционально

W3C Trace Context: сервис принимает от клиента и продлевает в трейсинг; при пустом значении может заполняться на входе HTTP.

partition

строка, опционально

Логическая партиция очереди в Redis. Если имя начинается с !, включён ordered-режим: строгий порядок внутри ключа, Reject/rate limit ведут себя иначе, чем у обычных партиций.

sync

bool, опционально

true — дождаться результата в HTTP (пока воркер не положит ответ в out, out-consumer доставит в Hub). false или отсутствует — 202 Accepted, результат только в out.

scheduled

время (RFC3339), опционально

Когда первый раз считать задачу готовой к исполнению. Влияет на маршрут Redis (близкое будущее) vs PostgreSQL (дальше ~1 с).

proxyData

произвольный JSON

Прозрачный конверт: Requester не интерпретирует — копирует в meta ответа, чтобы Connect / потребители могли маршрутизировать по своим полям (целевой сервис, тенант, тип события и т.д.).

limiter

объект, опционально

Rate limit до HTTP (и до кеша). См. вложенные поля.

retry

объект, опционально

Политика повторов при 429/5xx/сети для не-ordered; для ordered — используется иначе (reject с задержкой, см. раздел про retry).

cache

объект, опционально

Кеш ответов: ключ и TTL; кладём только успешные 2xx.

meta.limiter:

Поле

За что отвечает

key

Ключ в Redis для счётчиков/окон (общий квотный «ведро»-идентификатор).

algorithm

Имя стратегии: token-bucket, leaky-bucket, fixed-window-counter, sliding-window-log, sliding-window-counter.

rates

Список пар окно + лимит: каждый элемент — duration (строка длительности, по правилам Go time.ParseDuration: 1h, 3s, 500ms, …) и value (целое число разрешённых срабатываний за окно).

meta.retry:

Поле

За что отвечает

max

Максимальное число попыток (с учётом ретраев после первого запуска).

delay / maxDelay

Стартовая и максимальная пауза между попытками (строка длительности).

multiplier

Множитель экспоненциального бэкоффа.

jitter

Доля случайного разброса вокруг задержки (снижает «столбики» нагрузки).

meta.cache:

Поле

За что отвечает

key

Ключ записи в Redis (префикс кеша добавляется на стороне сервиса).

ttl

Время жизни кешированного ответа.

В Kafka доработанная копия задачи (после consumer/processor) также содержит служебные поля верхнего уровня: tryCount (число уже выполненных/запланированных попыток), createdAt (время постановки) — в HTTP при первой отправке клиент их не передаёт, сервис при необходимости проставляет при сериализации Task.

response + meta (топик out и тело sync-ответа)

Поле

Раздел

За что отвечает

response.status

response

HTTP-статус целевого ответа (или синтетический 200 при wake up, 5xx при внутренней ошибке).

response.body / response.headers

response

Тело и заголовки ответа.

meta.requestId

meta

Тот же requestId, что во входе — связка «запрос–ответ».

meta.tryCount

meta

Сколько раз по сути «доходили» до исхода (учёт ретраев).

meta.time

meta

Время обработки на стороне Requester (длительность).

meta.cached

meta

true, если ответ взят из кеша, а не сходил в сеть.

meta.waitTime

meta

Ожидание, связанное с лимитером/паузой (для sync при 429 — в т.ч. для заголовка Retry-After).

meta.proxyData

meta

Эхо из входа: тот же произвольный JSON для downstream.

Сериализация длительностей: в моделях ответа поля meta.time и meta.waitTime имеют тип time.Duration; стандартный encoding/json в Go сериализует их целым числом наносекунд. Если понадобится строка вида 150ms на wire — это уже смена контракта (кастомный MarshalJSON).


Движение данных: от сообщения до ответа

Ключевая идея: в Kafka уже лежит полное тело задачи (JSON). Consumer in не обязан таскать этот JSON в Redis целиком.

Ветвление у consumer in

  • Если задача готова или наступит в пределах ~1 секунды — в Redis уходит лёгкая запись: ссылка на позицию в логе Kafka (topic, partition, offset) — десятки байт JSON.

  • Если отложена дальше — полный payload пишется в PostgreSQL (outbox). Когда наступает время, воркеры PostgreSQL снова отправляют задачу в Kafka in, откуда она обрабатывается обычным путём.

Зачем из PostgreSQL снова в Kafka, а не «сразу в Redis»

  1. Единый путь — вся жизнь задачи снова проходит через in: проще рассуждать, проще трассировка, один формат сообщения.

  2. Журнал — в in остаётся запись о том, что отложенная задача «проснулась»; это не теряется внутри закрытого цикла БД→приложение.

  3. Порядок относительно других событий — новая запись в in упорядочивается так же, как и остальной поток (при одной партиции топика in — глобально FIFO на уровне топика; детальная упорядоченность по бизнес-ключу обеспечивается уже Redis-очередью для ordered-партиций).

Порядок и шаринг воркеров между логическими партициями на этом пути обеспечивает слой Redis; зачем он нужен в связке с Kafka и PostgreSQL — в разделе «Выбор стека» выше.


Логика приложения: из чего состоит процесс

Компоненты

  1. Один consumer группы на топик in
    Достаточно для «перекладывания» сообщений в Redis или PostgreSQL. Топик in в эксплуатации разумно держать с одной партицией, чтобы не плодить параллельные ветки на этом раннем этапе: партиционирование по бизнес-ключу целиком отдаём Redis, чтобы не усложнять систему вторым уровнем партиций в Kafka.

  2. Пул воркеров PostgreSQL (SKIP LOCKED, несколько горутин)
    Забирают готовые строки, отправляют задачу обратно в in.

  3. Пул воркеров Redis (число = MAX_WORKERS)
    Берут задачи из очереди, разрешают payload (чтение из Kafka по offset при необходимости), вызывают processor.

  4. Пул Kafka Fetcher’ов
    По одному соединению на воркер: иначе один mutex на чтение по offset стал бы узким местом при сотнях воркеров.

  5. HTTP-сервер
    POST /request — постановка задачи в in; режим sync: true ждёт результат через локальный Hub, куда попадает ответ из consumer out.

  6. Out consumer
    Отдельная уникальная consumer group на инстанс (hostname + pid), старт с OffsetNewest, чтобы не читать историю при рестарте — нужен только мост в Hub для синхронных клиентов.

Запуск и graceful shutdown

Порядок остановки осмысленный для операторов:

  1. SIGINT / SIGTERM — начинаем shutdown.

  2. Фаза 1: HTTPServer.Shutdown: новые запросы не принимаем, висящие соединения могут дописать ответ.

  3. Фаза 2: воркеры — отмена контекста, WaitGroup дожидается завершения consumer in, PG movers, пула Redis, consumer out.

  4. Фаза 3: закрытие ресурсов — defer на producer’ы, Redis, пул fetcher’ов, БД.

Таймаут shutdown — до порядка минуты, чтобы не обрывать задачи в полёте без шанса на корректное завершение.


Rate limit, retry, cache, отложенные сообщения

Rate limit

Перед кешом и HTTP выполняется проверка лимитера (Redis + стратегии из семейства token/leaky/sliding/fixed window). При отказе:

  • для ordered-партиций (meta.partition начинается с !) — возвращается reject с задержкой (RejectWithDelay): очередь сама блокирует партицию на TTL, не крутим tight loop;

  • для обычных партиций — задача перепланируется на RetryAfter в Redis (если задержка короткая) или в PostgreSQL (если длинная).

Retry

На 429 и 5xx (и на ошибки сети) при наличии meta.retry и неисчерпанном max:

  • ordered — снова через reject + backoff в Redis (порядок и блокировка партиции сохраняются);

  • не ordered — задача сериализуется и уходит в Redis (≤ 1 с до следующей попытки) или в PostgreSQL (> 1 с), processor делает Ack текущей Redis-задачи, потому что retry уже «новая» постановка.

Бэкофф — экспоненциальный с потолком maxDelay и джиттером.

Кеш

При meta.cache ответы 2xx кладутся в Redis (обёртка go-redis/cache). Параллельные промахи по одному ключу схлопываются через singleflight: один реальный HTTP, остальные ждут.

Отложенные сообщения

Поле meta.scheduled: consumer in сам решает — сейчас в Redis (как правило, с offset-ref) или в PostgreSQL для дальнего горизонта.


Wake up (инициация без HTTP)

Если uri пустой, HTTP-вызов не выполняется: тело и заголовки из заявки попадают в ответ как «успешный» 200. В связке с scheduled и proxyData это удобный будильник для других сервисов: в нужный момент в out уходит сообщение, которое downstream может трактовать как событие или команду. Redpanda Connect дальше маршрутизирует по ключу или по полям payload.


Как тестировали

  • WireMock — контролируемый mock HTTP: 500, 429, задержки, детерминированные ответы для сценариев retry и rate limit.

  • k6 — нагрузочные скрипты в репозитории: отдельно limiter, retry, cache, ordered-партиции, max RPS; есть сценарий прямой записи в Kafka in и замера обработки через out (custom k6 с xk6-kafka).

  • Prometheus + Grafana — метрики инфраструктуры и сервиса (в docker-compose заготовки есть).

  • OpenTelemetry — опционально (OTEL_ENABLED=1): полный трейс от http.requestkafka.in.consumekafka.fetchprocessor.handlehttp.clientkafka.outkafka.out.consume, визуализация в Jaeger. Удобно ловить цикл PG → Kafka in → Redis → out.


Узкое место: большие сообщения в Redis

На практике всплыло узкое место: при больших телах задач пропускная способность заметно проседала, потому что крупный JSON гонялся через Redis (память, сеть, сериализация, время Lua/round-trip). Redis в этой архитектуре — координатор очереди, а не хранилище «толстых» документов.

Решение: в Redis кладём только ссылку на offset в Kafka (OffsetRef: topic, partition, offset). Воркер перед обработкой достаёт оригинальное сообщение из Kafka через выделенный fetcher. Пропускная способность и стабильность выросли; память Redis перестала быть линейной от размера body запросов.

Компромисс: пока сообщение не закоммичено и не вычитано, нужна согласованность retention в Kafka с горизонтом отложенных задач в Redis; для «дальних» отложенных задач по-прежнему используется PostgreSQL с полным payload.


Оптимизация через Kafka fetcher: компромиссы

Ссылка на offset вынуждает воркер перечитать запись в Kafka по конкретной позиции. Это не тот же паттерн, что последовательное чтение «хвоста» consumer group-ом: с точки зрения диска возможны скачкообразные (random) обращения к сегментам, выше доля промахов по page cache брокера по сравнению с идеальным стриминговым read.

Для порядка величины ~2500 RPS на нашем контуре это не стало практическим узким местом: у современных SSD (NVMe) произвольный ввод-вывод по-прежнему выдерживает такой профиль лучше, чем механическим дискам прошлого поколения; узким местом ранее оставались именно объём данных через Redis, а не IOPS брокера.

Retention. Если retention в топике in короткий, а воркеры отстают, теоретически сообщение по offset может уже исчезнуть из лога, пока фасад Redis ещё ссылается на него. В нашем продуктовом допущении: отправлять наружу то, что «пролежало» в очереди дольше ~5 часов, не требуется — срок retention и эксплуатационные лимиты можно согласовать с этим горизонтом. Дляё отложенных на длительные сроки задач полный payload по-прежнему живёт в PostgreSQL, а не в цепочке offset-ref.

Куда развиваться, если профиль изменится: хранить тело в отдельном объектном хранилище (S3-совместимое, MinIO и т.д.) с ключом в Redis; либо снова грузить сжатый payload в Redis (snappy/zstd), пожертвовав частью выигрыша по RAM. Как вариант, комбинировать подходы - меньше N kb в redis, все остальное в хранилище. Или даже сделать ttl store в самом приложении и если воркеры успели, то брать из памяти. Нужно экспериментировать и подбирать оптимальный вариант. Это запасной фронт работ.

На текущем этапе offset + fetcher — осмысленный компромисс: Redis остаётся лёгким, RPS отправки (producer) и общая устойчивость сценария для нас достаточны; вместо преждевременной усложнёнки отдельным store мы фиксируем цифры в нагрузочных тестах и при росте нагрузки возвращаемся к вариантам выше.


Итог

  • Спроектирован Requester как сервис исходящих HTTP-запросов с API и Kafka, с единым выходом в топик out для интеграций и Connect.

  • Kafka — журнал и точка входа/выхода; PostgreSQL — outbox для длинных задержек; Redis + smart-redis-queue — партиции, порядок, пул воркеров, rate limit и отложенность «вблизи».

  • Offset-ref и Kafka fetcher сняли давление с Redis; цена — random I/O и зависимость от retention; при ~2500 RPS и согласованных SLA по «свежести» задач это приемлемо; при необходимости — отдельный store или сжатие в Redis.

  • Graceful shutdown, k6, WireMock, Grafana и OpenTelemetry закрывают эксплуатационный цикл: от нагрузочного теста до трассировки полного пути сообщения.

Репозиторий: https://github.com/Rinsvent/requester.


Приложение: ссылки

Комментарии (6)


  1. omigo777
    26.04.2026 17:45

    Какую прикладную задачу решали? Или это статья ради статьи: LLM спроектировала что-то, LLM это что-то описала?


    1. rinsvent Автор
      26.04.2026 17:45

      Прикладная задача - рассылка сообщений. Сервис был написан как заммена действующей системе. И ключевой фактор чтобы 1 клиент не блочил другого - именно для этого нужен редис и разделение по партициям! Сейчас почему-то у многих пунктик на LLM - сервис проектировали инженеры)) Но статью да - LLM помогала написать, что скрывать. Без нее сильно дольше бы писал, но суть не в буквах а в идеи и вызовах с которыми столкнулись, и хотелось поделиться этим. В целом если формат зайдет есть идея пройтись по архитектуре и написать ряд уже не прикладных статей. Не часто на хабре видел статьи по проектированию где разбирались бы моменты выбора стека, движенеи данных - считаю, что может быть интересным)


  1. pkokoshnikov
    26.04.2026 17:45

    А что с консистентностью вашей очереди? Или можно и не выполнять запрос если редис приложет? А если в режиме консистентности то какой объём данных и сколько будет подниматься в случае сбоя? Зачем вообще скорость ретрая если задача асинхронная? Вопросов очень много


    1. rinsvent Автор
      26.04.2026 17:45

      А что с консистентностью вашей очереди? Или можно и не выполнять запрос если редис приложет? 

      Вот тут хороший вопрос! Сервис проектирвоался как замена текущей системы - сейчас уже есть хранение в редис и в случае если упадет, то - да сообщения будут потеряны. На практике у нас редис кластер и такого еще ни разу не было, поэтому на первом этапе этот момент опустил, но действительно собирался этим заняться.
      Ключевая идея, что у на есть лог событий в кафка и хотел добавить команду в приложение которая в случае сбоя заново перечитает кафку (топик in) с первого offset который был не обработан (тут нужно будет слдеить за хвостом)
      Преечитав кафку (топик out) можно гарантирвоать что сообщения не будут обработаны джажды + потребуется вероятно на этот момент не стартовать воркеры.

      А если в режиме консистентности то какой объём данных и сколько будет подниматься в случае сбоя?

      Как выше написал, пока не реализовано, но если делать, то будет зависеть от того как быстро среагируем на инцидент. Сейчас средняя скорость 300 с / сек (в пиках 1700)
      Допустим среагировали через 10 часов. Т очто было ранее 5 часов отсеиваем как устаревшие - получается не смогли отправить. Сейчас у бизнеса такие условия. получается за 5 часов надо вкачать

      при средней скорости 300 с / сек это 5 3600 * 300 = 5_400_000 сообщений которые нужно заново загрузить в систему.
      Скорость перекидывания в базу согласно бенчмаркам очереди https://habr.com/ru/articles/1018194/ могут достигать 100_000 c / сек, то есть в базу все зайдет за минуту, а разгребаться уже будет со скоростью 2500 rps примерно минут 40.
      Это худший случай, когда все сообщения надо переотпрравить, если часть будет дедублицирована из-за того что ранее обрабоатлась, то должно быть быстрее. Но это пока в теории, реальные цифра можно будет увидеть только после реализации

      Зачем вообще скорость ретрая если задача асинхронная?

      Не совсем понял вопрос, но если суть что можно было sleep поставить, то тогда воркеры будут тормозиться и перфоманс будет сильно проседать. Так что с возвратом в очередь получается намного эффективнее


  1. KON88
    26.04.2026 17:45

    Прочёл статью. Вопросов действительно много.

    Главная цель использования такого количества инструментов это 2500 rps? Или возможность контроля событий как в saga?


    1. rinsvent Автор
      26.04.2026 17:45

      Главная цель использования такого количества инструментов это 2500 rps?

      В приложении используется очреедь / база / in memory хранилище. Это обычно минимум для чуть менее сложной системы) Без postgresql невозможно было бы реализовать сообщения отсылаемые через год. Без редиса невозможно было бы реализвоать партицирование и гарантию порядка для ограниченного числа воркеров. Очередь must have для такой системы.
      Все инстурцменты обоснованы и описаны в статье, что за что отвечает. rps был как маркер - выдержим ли мы текущие нагрузки. Ключевые факторы - неблокирующие партиции и порядок доставки в рамках портиции. Сами требования к сервису описаны в начале статьи)