Для тех, кому хочется сразу посмотреть код: репозиторий сервиса — в конце текста.
Откуда задача
Нужен сервис, который централизованно выполняет исходящие HTTP-запросы для экосистемы микросервисов и интеграций. Постановка на уровне требований:
Два режима входа — и синхронный (ответ нужен вызывающей стороне), и асинхронный (достаточно принять задачу и отдать результат «куда-то ещё»).
Два канала постановки — удобно и через HTTP API, и напрямую в Kafka (без лишнего hop через HTTP).
Rate limit — защита квот и предсказуемое поведение при 429 со стороны внешних API.
Кеш ответов — снижение нагрузки на внешние системы и наши же ресурсы.
Строгий порядок там, где он важен — если указан ключ партиции в «ordered»-режиме, сообщения по этому ключу не должны перемешиваться.
Масштаб по числу ключей — сотни тысяч и миллионы логических партиций при ограниченном числе воркеров; одна «тяжёлая» партиция не должна блокировать остальные.
Ретраи и отложенные сообщения — экспоненциальные паузы, очередь «на завтра», планирование на произвольный горизонт.
Единая точка наблюдаемости по действиям — желательно, чтобы «что произошло с задачей» оставалось в журнале событий (логе), а не только в памяти воркера.
Ни одна «одна технология» не закрывает это без слоёв. Сначала — почему в стеке именно Kafka, PostgreSQL и Redis; дальше — как мы спроектировали сервис Requester: контекст, контракты запроса/ответа, движение данных, внутренние воркеры, graceful shutdown, детали rate limit / retry / cache / отложенных задач, wake-up, тестирование и узкое место с большими payload в Redis.
Выбор стека
Асинхронный режим — это в первую очередь очередь: задачи копятся и обрабатываются не в момент вызова API, а позже, конкурируя за пул воркеров. Притом нам нужен строгий порядок там, где бизнес этого требует (по логическому ключу), и при этом журналируемый вход/выход для интеграций. Для такой модели Kafka — естественный выбор: распределённый append-only лог, топики, партиции, consumer groups, долговременное хранение, высокая пропускная способность. Входные и выходные события сервиса мы ведём через Kafka, чтобы единая цепочка «приняли задачу → обработали → отдали ответ» оставалась в брокере.
Отложенные сообщения и ретраи «на часы и дни» нельзя свести только к retention и «переливанию» в рамках одного брокера без надёжного учёта на стороне приложения. Нужна долговечная, транзакционно предсказуемая таблица фактов: когда задача должна снова стать готовой, какая у неё полезная нагрузка, как избежать дублей при гонке воркеров. Для этого оптимальна PostgreSQL: диск, SKIP LOCKED, индексы по scheduled, бэкапы, привычные операции для outbox-шаблона.
Остаётся зазор: ни Kafka, ни PostgreSQL сами по себе удобно не закрывают сценарий «миллионы логических партиций (ключей) + общий пул воркеров + справедливая конкуренция + строгий порядок внутри ключа + откладывание на секунды + ретраи с блокировкой партиции». В consumer group Kafka партиции привязаны к консьюмерам: это не тот же паттерн, что динамическая task-очередь, где воркер в следующий тик берёт следующую готовую задачу из любой логической партиции. Заводить отдельную физическую сущность на каждый бизнес-ключ в Kafka или в RabbitMQ — нереалистично. Outbox в PostgreSQL отлично хранит «запланировано на завтра», но как планировщик следующего тика для сотен тысяч ключей с разным scheduled и приоритетом он не заменяет лёгкий in-memory движок с Lua-атомарностью.
Оптимальный третий слой — Redis и очередь задач на нём: один инстанс (или кластер по мере роста) хранит состояние планировщика — ZSET, блокировки ordered-партиций, идемпотентность, heartbeat воркеров — без миллионов отдельных «очередей» в смысле брокера. Как устроена модель партиция = ordered-порядок / общий пул / reject и отложенные сообщения, rate limit как cooldown партиции, зачем Lua и какие компромиссы по памяти и durability — мы подробно разобрали в отдельной статье: «Очередь на Redis с Lua: порядок в партициях, общий пул воркеров и отложенные сообщения».
Роль smart-redis-queue
В Requester эту роль играет библиотека smart-redis-queue: ordered-партиции с префиксом !, приоритеты, prefetch, Reject/RejectWithDelay, heartbeat и возврат задач при смерти воркера — то есть всё, что в предыдущей статье описано на уровне структур ключей и Lua-скриптов. Requester не дублирует эту механику — он использует очередь как движок планирования: что лежит в payload задачи (полный JSON или ссылка на offset в Kafka) — слой контракта сервиса.
Контекст системы (уровень «ящиков»)
Внешние сервисы могут:
вызывать HTTP API Requester;
публиковать задачи в топик Kafka
in.
Requester выполняет HTTP к целевым URI (или специальный режим без реального HTTP — см. wake up), учитывает лимиты, кеш, порядок и ретраи. Все ответы попадают в топик Kafka out. Дальше потребители забирают результаты сами, либо стоит Redpanda Connect (или аналог), который читает out и по ключам / заголовкам / полям proxyData раскидывает сообщения в нужные топики или очереди целевых сервисов.

Зачем такая «лестница» из компонентов. Kafka даёт append-only журнал: что задача вошла в систему и что из неё вышло — остаётся в топиках. Redis-очередь решает партиционирование и планирование с общим пулом воркеров без заведения миллиона физических очередей. PostgreSQL — долговременный outbox для задач «далеко в будущем» и длинных бэкоффов, чтобы не держать гигантские ZSET и не упираться в модель отложенных сообщений только в Redis.
C4: контейнеры (кратко)
Контейнер |
Роль |
|---|---|
Requester (процесс) |
HTTP API, consumer |
Kafka |
Топик |
Redis |
Rate limiter, кеш ответов, очередь задач (smart-redis-queue). |
PostgreSQL |
Таблица outbox: задачи с |
Контракты: запрос и ответ
Единая форма задачи в HTTP (POST /request) и в Kafka (топик in, value — JSON) совпадает по смыслу: тело = объект с полями request и meta. Ответ, который забирают из топика out (и в режиме sync: true — ещё и в HTTP), — объект response + meta.
request — что выполнить
Поле |
Тип / формат |
За что отвечает |
|---|---|---|
|
строка |
Целевой URL исходящего HTTP-вызова. Пустая строка — режим wake up: реальный HTTP не делаем, |
|
строка |
HTTP-метод ( |
|
строка |
Тело запроса (как правило JSON в виде строки; сервис не парсит схему — это контракт с целевым API). |
|
объект строк |
Заголовки к целевому запросу. |
meta — политика обработки и идентификаторы
Поле делает сервис гибким, позволяя настраивать стратегии работы сервиса на клиенте который шлет запросы.
Поле |
Тип / формат |
За что отвечает |
|---|---|---|
|
строка, обязателен |
Сквозной идентификатор задачи: попадает в |
|
строка, опционально |
W3C Trace Context: сервис принимает от клиента и продлевает в трейсинг; при пустом значении может заполняться на входе HTTP. |
|
строка, опционально |
Логическая партиция очереди в Redis. Если имя начинается с |
|
bool, опционально |
|
|
время (RFC3339), опционально |
Когда первый раз считать задачу готовой к исполнению. Влияет на маршрут Redis (близкое будущее) vs PostgreSQL (дальше ~1 с). |
|
произвольный JSON |
Прозрачный конверт: Requester не интерпретирует — копирует в |
|
объект, опционально |
Rate limit до HTTP (и до кеша). См. вложенные поля. |
|
объект, опционально |
Политика повторов при 429/5xx/сети для не-ordered; для ordered — используется иначе (reject с задержкой, см. раздел про retry). |
|
объект, опционально |
Кеш ответов: ключ и TTL; кладём только успешные 2xx. |
meta.limiter:
Поле |
За что отвечает |
|---|---|
|
Ключ в Redis для счётчиков/окон (общий квотный «ведро»-идентификатор). |
|
Имя стратегии: |
|
Список пар окно + лимит: каждый элемент — |
meta.retry:
Поле |
За что отвечает |
|---|---|
|
Максимальное число попыток (с учётом ретраев после первого запуска). |
|
Стартовая и максимальная пауза между попытками (строка длительности). |
|
Множитель экспоненциального бэкоффа. |
|
Доля случайного разброса вокруг задержки (снижает «столбики» нагрузки). |
meta.cache:
Поле |
За что отвечает |
|---|---|
|
Ключ записи в Redis (префикс кеша добавляется на стороне сервиса). |
|
Время жизни кешированного ответа. |
В Kafka доработанная копия задачи (после consumer/processor) также содержит служебные поля верхнего уровня: tryCount (число уже выполненных/запланированных попыток), createdAt (время постановки) — в HTTP при первой отправке клиент их не передаёт, сервис при необходимости проставляет при сериализации Task.
response + meta (топик out и тело sync-ответа)
Поле |
Раздел |
За что отвечает |
|---|---|---|
|
|
HTTP-статус целевого ответа (или синтетический 200 при wake up, 5xx при внутренней ошибке). |
|
|
Тело и заголовки ответа. |
|
|
Тот же |
|
|
Сколько раз по сути «доходили» до исхода (учёт ретраев). |
|
|
Время обработки на стороне Requester (длительность). |
|
|
|
|
|
Ожидание, связанное с лимитером/паузой (для sync при 429 — в т.ч. для заголовка |
|
|
Эхо из входа: тот же произвольный JSON для downstream. |
Сериализация длительностей: в моделях ответа поля meta.time и meta.waitTime имеют тип time.Duration; стандартный encoding/json в Go сериализует их целым числом наносекунд. Если понадобится строка вида 150ms на wire — это уже смена контракта (кастомный MarshalJSON).
Движение данных: от сообщения до ответа
Ключевая идея: в Kafka уже лежит полное тело задачи (JSON). Consumer in не обязан таскать этот JSON в Redis целиком.
Ветвление у consumer in
Если задача готова или наступит в пределах ~1 секунды — в Redis уходит лёгкая запись: ссылка на позицию в логе Kafka (
topic,partition,offset) — десятки байт JSON.Если отложена дальше — полный payload пишется в PostgreSQL (outbox). Когда наступает время, воркеры PostgreSQL снова отправляют задачу в Kafka
in, откуда она обрабатывается обычным путём.
Зачем из PostgreSQL снова в Kafka, а не «сразу в Redis»
Единый путь — вся жизнь задачи снова проходит через
in: проще рассуждать, проще трассировка, один формат сообщения.Журнал — в
inостаётся запись о том, что отложенная задача «проснулась»; это не теряется внутри закрытого цикла БД→приложение.Порядок относительно других событий — новая запись в
inупорядочивается так же, как и остальной поток (при одной партиции топикаin— глобально FIFO на уровне топика; детальная упорядоченность по бизнес-ключу обеспечивается уже Redis-очередью для ordered-партиций).

Порядок и шаринг воркеров между логическими партициями на этом пути обеспечивает слой Redis; зачем он нужен в связке с Kafka и PostgreSQL — в разделе «Выбор стека» выше.
Логика приложения: из чего состоит процесс
Компоненты
Один consumer группы на топик
in
Достаточно для «перекладывания» сообщений в Redis или PostgreSQL. Топикinв эксплуатации разумно держать с одной партицией, чтобы не плодить параллельные ветки на этом раннем этапе: партиционирование по бизнес-ключу целиком отдаём Redis, чтобы не усложнять систему вторым уровнем партиций в Kafka.Пул воркеров PostgreSQL (
SKIP LOCKED, несколько горутин)
Забирают готовые строки, отправляют задачу обратно вin.Пул воркеров Redis (число =
MAX_WORKERS)
Берут задачи из очереди, разрешают payload (чтение из Kafka по offset при необходимости), вызывают processor.Пул Kafka Fetcher’ов
По одному соединению на воркер: иначе один mutex на чтение по offset стал бы узким местом при сотнях воркеров.HTTP-сервер
POST /request— постановка задачи вin; режимsync: trueждёт результат через локальный Hub, куда попадает ответ из consumerout.Out consumer
Отдельная уникальная consumer group на инстанс (hostname+ pid), старт сOffsetNewest, чтобы не читать историю при рестарте — нужен только мост в Hub для синхронных клиентов.
Запуск и graceful shutdown
Порядок остановки осмысленный для операторов:
SIGINT / SIGTERM — начинаем shutdown.
Фаза 1: HTTP —
Server.Shutdown: новые запросы не принимаем, висящие соединения могут дописать ответ.Фаза 2: воркеры — отмена контекста,
WaitGroupдожидается завершения consumerin, PG movers, пула Redis, consumerout.Фаза 3: закрытие ресурсов — defer на producer’ы, Redis, пул fetcher’ов, БД.
Таймаут shutdown — до порядка минуты, чтобы не обрывать задачи в полёте без шанса на корректное завершение.
Rate limit, retry, cache, отложенные сообщения
Rate limit
Перед кешом и HTTP выполняется проверка лимитера (Redis + стратегии из семейства token/leaky/sliding/fixed window). При отказе:
для ordered-партиций (
meta.partitionначинается с!) — возвращается reject с задержкой (RejectWithDelay): очередь сама блокирует партицию на TTL, не крутим tight loop;для обычных партиций — задача перепланируется на
RetryAfterв Redis (если задержка короткая) или в PostgreSQL (если длинная).
Retry
На 429 и 5xx (и на ошибки сети) при наличии meta.retry и неисчерпанном max:
ordered — снова через reject + backoff в Redis (порядок и блокировка партиции сохраняются);
не ordered — задача сериализуется и уходит в Redis (≤ 1 с до следующей попытки) или в PostgreSQL (> 1 с), processor делает Ack текущей Redis-задачи, потому что retry уже «новая» постановка.
Бэкофф — экспоненциальный с потолком maxDelay и джиттером.
Кеш
При meta.cache ответы 2xx кладутся в Redis (обёртка go-redis/cache). Параллельные промахи по одному ключу схлопываются через singleflight: один реальный HTTP, остальные ждут.
Отложенные сообщения
Поле meta.scheduled: consumer in сам решает — сейчас в Redis (как правило, с offset-ref) или в PostgreSQL для дальнего горизонта.
Wake up (инициация без HTTP)
Если uri пустой, HTTP-вызов не выполняется: тело и заголовки из заявки попадают в ответ как «успешный» 200. В связке с scheduled и proxyData это удобный будильник для других сервисов: в нужный момент в out уходит сообщение, которое downstream может трактовать как событие или команду. Redpanda Connect дальше маршрутизирует по ключу или по полям payload.
Как тестировали
WireMock — контролируемый mock HTTP: 500, 429, задержки, детерминированные ответы для сценариев retry и rate limit.
k6 — нагрузочные скрипты в репозитории: отдельно limiter, retry, cache, ordered-партиции, max RPS; есть сценарий прямой записи в Kafka
inи замера обработки черезout(custom k6 с xk6-kafka).Prometheus + Grafana — метрики инфраструктуры и сервиса (в docker-compose заготовки есть).
OpenTelemetry — опционально (
OTEL_ENABLED=1): полный трейс отhttp.request→kafka.in.consume→kafka.fetch→processor.handle→http.client→kafka.out→kafka.out.consume, визуализация в Jaeger. Удобно ловить цикл PG → Kafka in → Redis → out.
Узкое место: большие сообщения в Redis
На практике всплыло узкое место: при больших телах задач пропускная способность заметно проседала, потому что крупный JSON гонялся через Redis (память, сеть, сериализация, время Lua/round-trip). Redis в этой архитектуре — координатор очереди, а не хранилище «толстых» документов.
Решение: в Redis кладём только ссылку на offset в Kafka (OffsetRef: topic, partition, offset). Воркер перед обработкой достаёт оригинальное сообщение из Kafka через выделенный fetcher. Пропускная способность и стабильность выросли; память Redis перестала быть линейной от размера body запросов.
Компромисс: пока сообщение не закоммичено и не вычитано, нужна согласованность retention в Kafka с горизонтом отложенных задач в Redis; для «дальних» отложенных задач по-прежнему используется PostgreSQL с полным payload.
Оптимизация через Kafka fetcher: компромиссы
Ссылка на offset вынуждает воркер перечитать запись в Kafka по конкретной позиции. Это не тот же паттерн, что последовательное чтение «хвоста» consumer group-ом: с точки зрения диска возможны скачкообразные (random) обращения к сегментам, выше доля промахов по page cache брокера по сравнению с идеальным стриминговым read.
Для порядка величины ~2500 RPS на нашем контуре это не стало практическим узким местом: у современных SSD (NVMe) произвольный ввод-вывод по-прежнему выдерживает такой профиль лучше, чем механическим дискам прошлого поколения; узким местом ранее оставались именно объём данных через Redis, а не IOPS брокера.
Retention. Если retention в топике in короткий, а воркеры отстают, теоретически сообщение по offset может уже исчезнуть из лога, пока фасад Redis ещё ссылается на него. В нашем продуктовом допущении: отправлять наружу то, что «пролежало» в очереди дольше ~5 часов, не требуется — срок retention и эксплуатационные лимиты можно согласовать с этим горизонтом. Дляё отложенных на длительные сроки задач полный payload по-прежнему живёт в PostgreSQL, а не в цепочке offset-ref.
Куда развиваться, если профиль изменится: хранить тело в отдельном объектном хранилище (S3-совместимое, MinIO и т.д.) с ключом в Redis; либо снова грузить сжатый payload в Redis (snappy/zstd), пожертвовав частью выигрыша по RAM. Как вариант, комбинировать подходы - меньше N kb в redis, все остальное в хранилище. Или даже сделать ttl store в самом приложении и если воркеры успели, то брать из памяти. Нужно экспериментировать и подбирать оптимальный вариант. Это запасной фронт работ.
На текущем этапе offset + fetcher — осмысленный компромисс: Redis остаётся лёгким, RPS отправки (producer) и общая устойчивость сценария для нас достаточны; вместо преждевременной усложнёнки отдельным store мы фиксируем цифры в нагрузочных тестах и при росте нагрузки возвращаемся к вариантам выше.
Итог
Спроектирован Requester как сервис исходящих HTTP-запросов с API и Kafka, с единым выходом в топик
outдля интеграций и Connect.Kafka — журнал и точка входа/выхода; PostgreSQL — outbox для длинных задержек; Redis + smart-redis-queue — партиции, порядок, пул воркеров, rate limit и отложенность «вблизи».
Offset-ref и Kafka fetcher сняли давление с Redis; цена — random I/O и зависимость от retention; при ~2500 RPS и согласованных SLA по «свежести» задач это приемлемо; при необходимости — отдельный store или сжатие в Redis.
Graceful shutdown, k6, WireMock, Grafana и OpenTelemetry закрывают эксплуатационный цикл: от нагрузочного теста до трассировки полного пути сообщения.
Репозиторий: https://github.com/Rinsvent/requester.
Приложение: ссылки
Комментарии (6)

pkokoshnikov
26.04.2026 17:45А что с консистентностью вашей очереди? Или можно и не выполнять запрос если редис приложет? А если в режиме консистентности то какой объём данных и сколько будет подниматься в случае сбоя? Зачем вообще скорость ретрая если задача асинхронная? Вопросов очень много

rinsvent Автор
26.04.2026 17:45А что с консистентностью вашей очереди? Или можно и не выполнять запрос если редис приложет?
Вот тут хороший вопрос! Сервис проектирвоался как замена текущей системы - сейчас уже есть хранение в редис и в случае если упадет, то - да сообщения будут потеряны. На практике у нас редис кластер и такого еще ни разу не было, поэтому на первом этапе этот момент опустил, но действительно собирался этим заняться.
Ключевая идея, что у на есть лог событий в кафка и хотел добавить команду в приложение которая в случае сбоя заново перечитает кафку (топик in) с первого offset который был не обработан (тут нужно будет слдеить за хвостом)
Преечитав кафку (топик out) можно гарантирвоать что сообщения не будут обработаны джажды + потребуется вероятно на этот момент не стартовать воркеры.А если в режиме консистентности то какой объём данных и сколько будет подниматься в случае сбоя?
Как выше написал, пока не реализовано, но если делать, то будет зависеть от того как быстро среагируем на инцидент. Сейчас средняя скорость 300 с / сек (в пиках 1700)
Допустим среагировали через 10 часов. Т очто было ранее 5 часов отсеиваем как устаревшие - получается не смогли отправить. Сейчас у бизнеса такие условия. получается за 5 часов надо вкачать
при средней скорости 300 с / сек это 5 3600 * 300 = 5_400_000 сообщений которые нужно заново загрузить в систему.
Скорость перекидывания в базу согласно бенчмаркам очереди https://habr.com/ru/articles/1018194/ могут достигать 100_000 c / сек, то есть в базу все зайдет за минуту, а разгребаться уже будет со скоростью 2500 rps примерно минут 40.
Это худший случай, когда все сообщения надо переотпрравить, если часть будет дедублицирована из-за того что ранее обрабоатлась, то должно быть быстрее. Но это пока в теории, реальные цифра можно будет увидеть только после реализацииЗачем вообще скорость ретрая если задача асинхронная?
Не совсем понял вопрос, но если суть что можно было sleep поставить, то тогда воркеры будут тормозиться и перфоманс будет сильно проседать. Так что с возвратом в очередь получается намного эффективнее

KON88
26.04.2026 17:45Прочёл статью. Вопросов действительно много.
Главная цель использования такого количества инструментов это 2500 rps? Или возможность контроля событий как в saga?

rinsvent Автор
26.04.2026 17:45Главная цель использования такого количества инструментов это 2500 rps?
В приложении используется очреедь / база / in memory хранилище. Это обычно минимум для чуть менее сложной системы) Без postgresql невозможно было бы реализовать сообщения отсылаемые через год. Без редиса невозможно было бы реализвоать партицирование и гарантию порядка для ограниченного числа воркеров. Очередь must have для такой системы.
Все инстурцменты обоснованы и описаны в статье, что за что отвечает. rps был как маркер - выдержим ли мы текущие нагрузки. Ключевые факторы - неблокирующие партиции и порядок доставки в рамках портиции. Сами требования к сервису описаны в начале статьи)
omigo777
Какую прикладную задачу решали? Или это статья ради статьи: LLM спроектировала что-то, LLM это что-то описала?
rinsvent Автор
Прикладная задача - рассылка сообщений. Сервис был написан как заммена действующей системе. И ключевой фактор чтобы 1 клиент не блочил другого - именно для этого нужен редис и разделение по партициям! Сейчас почему-то у многих пунктик на LLM - сервис проектировали инженеры)) Но статью да - LLM помогала написать, что скрывать. Без нее сильно дольше бы писал, но суть не в буквах а в идеи и вызовах с которыми столкнулись, и хотелось поделиться этим. В целом если формат зайдет есть идея пройтись по архитектуре и написать ряд уже не прикладных статей. Не часто на хабре видел статьи по проектированию где разбирались бы моменты выбора стека, движенеи данных - считаю, что может быть интересным)