
Прошлая статья была обзорной — что такое redb.Route, зачем нам понадобился свой Apache Camel под .NET, как выглядит боевой маршрут. Если не читали, коротко: это fluent C# DSL для интеграции — 22 коннектора (~30 URI-схем, если считать https/wss/es-варианты), ~30 паттернов EIP нативно через 41 процессор, 8 in-process компонентов, компилируемый expression-движок. Сегодня заходим внутрь. Не список фич, а рабочий разбор.
Серия будет длинной, поэтому сразу скажу, что и в каком порядке:
Часть 1 (эта) — четыре in-process канала +
Exchange. Фундамент, на котором стоит всё остальное.Часть 2 —
Splitter+Aggregator: один exchange → много → один, с ограниченным параллелизмом и контрактом стратегии агрегации.Части 3–5 — кластеры EIP: роутинг, трансформация, надёжность.
Часть 6 — expression-движок: компилируемый мини-язык с Tokenizer → Parser → AST →
IL, ~22 встроенных функции, 17 предикатных строителей — это отдельная статья, потому что это отдельный язык.Части 7+ — по одному коннектору на статью, каждый пример из реального продакшна.
Начинаем с фундамента — намеренно. Каждый паттерн из части 3–5, каждое выражение из части 6 и каждый коннектор из части 7+ стоят на двух вещах: канале, который несёт сообщение между сегментами маршрута, и Exchange, который и есть это сообщение. Разберитесь с ними правильно — и остальная серия просто композиция. Ошибитесь — и будете час искать, почему транзакция тихо не откатилась.
Начнём с фундамента, на котором стоит вообще всё остальное в библиотеке. Любой маршрут — это From → [процессоры] → To. Между сегментами маршрута что-то «течёт», и течёт оно по каналам. Каналов внутри процесса четыре: direct, direct-vm, seda, vm. А то, что по ним течёт, — это Exchange. Разберёмся сначала с тем, что течёт, а потом с тем, по чему.
И сразу предупреждение, ради которого половина статьи и написана: я залез в исходники собственного движка и нашёл там пару мест, где XML-doc-комментарий отстал от кода. Причина понятная: redb.Route проектировался на основе собственного опыта работы с Camel — и модель Exchange, и doc-комментарии писались с той же картиной в голове. Позже Apache Camel в версии 3 эту модель пересмотрел (задепрекейтил getOut()/setOut(), схлопнул набор паттернов) — и часть наших формулировок просто не успела за этим переосмыслением. Так что дальше — не пересказ документации, а то, что реально происходит в рантайме.
Exchange — сердце, и оно непростое
Всё, что движется по маршруту, — это IExchange. Не byte[], не ваш DTO, а небольшой объект, внутри которого происходит больше, чем кажется по названию. Вот форма, которая важна:
public interface IExchange : IAsyncDisposable { IMessage In { get; set; } // основное сообщение — есть всегда IMessage? Out { get; set; } // ответ — ленивый, null пока не понадобится ExchangePattern Pattern { get; set; } // InOnly (default), InOut или OutOnly IDictionary<string, object?> Properties { get; } // метаданные уровня маршрута Exception? Exception { get; set; } // ошибка, in-band bool ExceptionHandled { get; set; } string ExchangeId { get; } // идентичность, переживает клонирование IExchange Clone(); // копия + НОВЫЙ DI-scope IServiceProvider? ServiceProvider { get; } // DI-scope на сообщение }
И сообщение, которое он несёт:
public interface IMessage { object? Body { get; set; } // ваш payload — любой объект или null string? ContentType { get; set; } IDictionary<string, object?> Headers { get; } // метаданные, которые УХОДЯТ в брокер T? GetHeader<T>(string key); IMessage Clone(); }
Пять вещей про этот объект определяют поведение всех каналов и всех паттернов. И ни одна из них не очевидна по сигнатуре типа.
1. In против Out и Pattern
In — сообщение, которое идёт по маршруту. Out — ответ, и он ленивый: для дефолтного паттерна InOnly он остаётся null и вообще не аллоцируется. Он появляется только когда процессор его явно выставил (request/reply или .Respond()).
Паттернов три, а не два — это модель Apache Camel 2.x, перенесённая как есть:
public enum ExchangePattern { InOnly = 0, // fire-and-forget. Результат пишется в In; Out остаётся null. (default) InOut = 1, // request/reply. Оригинал лежит в In, ответ пишется в Out. OutOnly = 2, // явный ответ через .Respond(); RPC-ответ берётся из Out. }
А вот часть, которую сигнатура прячет, и единственное место, где все ошибаются: HasOut не говорит вам, где лежит ответ. Даже на InOut-обмене процессор не обязан заполнять Out — если он просто мутировал In.Body, результат лежит в In. Поэтому сам фреймворк никогда не доверяет HasOut для поиска ответа. Он читает Out ?? In, каждый раз:
// ProducerTemplate.RequestBody — каноническое правило извлечения ответа exchange.Pattern = ExchangePattern.InOut; await producer.Process(exchange); return exchange.Out?.Body ?? exchange.In.Body; // Out если есть, иначе In
Это дословно то, как ProducerTemplate в Camel достаёт результат (getResultMessage: «есть Out → берём Out, иначе In»). Скопируйте это правило к себе в код — exchange.Out ?? exchange.In — и никогда не будете гоняться за ответом, который тихо остался в In. HasOut — это факт про аллокацию, а не про то, где данные; не ветвитесь по нему на ответ.
И честная ремарка для тех, кто пришёл с JVM: живое сообщение Out и паттерн OutOnly — это семантика Camel 2.x. В Camel 3+ getOut()/setOut() задепрекейтили и схлопнули набор паттернов в сторону InOnly/InOut — именно потому что отдельное Out-сообщение копировало заголовки и плодило тонкие баги. redb.Route намеренно держит более полную модель 2.x — но если вы из современного Camel, это первое, что бросится в глаза.
2. Properties против Headers — различие, через которое утекают баги
Оба — IDictionary<string, object?>. И они не взаимозаменяемы:
In.Headersуезжают вместе с сообщением в брокер. Положили сюдаcorrelationId— Kafka/RabbitMQ унесут его дальше по цепочке.exchange.Properties— это метаданные уровня маршрута:RouteId, маркеры транзакций, ваше рабочее состояние. Они не покидают процесс (XML-doc интерфейса так и говорит: «Does NOT travel to brokers — use In.Headers for that»). Сюда кладут хэндлDbContextили счётчик ретраев.
Положите значение не в тот словарь — и оно либо не доедет до консьюмера (использовали Properties), либо утечёт внутреннее состояние наружу, в брокер (использовали Headers). Компилятор это не поймает: оба — просто string-ключевые словари. Понимание, что есть что, — это половина умения пользоваться фреймворком.
Читайте через типизированные аксессоры, а не кастуя руками:
var attempt = exchange.GetProperty<int>("retryCount"); // уровень маршрута, остаётся в процессе var corr = exchange.In.GetHeader<string>("correlationId"); // уезжает в брокер
Какие ключи фреймворк пишет сам
Это та часть, которая делает раздел конкретным, и ответ на вопрос «а что вообще лежит в Properties?». Pipeline и процессоры по мере движения exchange заполняют набор зарезервированных ключей. Единого файла-констант ExchangeProperties нет — каждый ключ живёт рядом с процессором, который им владеет, — но вот реальный реестр, добытый из исходников.
exchange.Properties — уровень маршрута, не покидают процесс:
Ключ |
Константа в коде |
Кто пишет |
Смысл |
|---|---|---|---|
|
|
|
стек transacted-действий для синхронизации |
|
|
|
живой |
|
|
Idempotent Consumer |
|
|
|
Claim Check |
стек ключей сохранённого payload'а |
|
|
|
результат валидации текущего exchange |
|
— (streaming splitter) |
streaming Splitter |
текущий счётчик частей сплита |
|
— (префикс) |
DI-обвязка |
именованные дочерние DI-scope'ы, освобождаются |
Плюс RouteId поднят до first-class свойства на самом exchange (exchange.RouteId), а не просто записи в словаре — именно его логгер печатает как [rId:…].
In.Headers — уезжают вместе с сообщением:
Camel-совместимые заголовки сообщения — из того же мира, что ожидают пользователи Camel. Splitter, например, штампует каждую часть:
// SplitterProcessor — каждая часть сплита несёт свои координаты splitMessage.Headers["CamelSplitIndex"] = index; // позиция, с нуля splitMessage.Headers["CamelSplitSize"] = total; // размер пачки splitMessage.Headers["CamelSplitComplete"] = index == total - 1; // последняя?
а каждый транспорт добавляет свои namespaced-константы заголовков — KafkaHeaders (redbKafka.Topic, redbKafka.Partition, redbKafka.Offset, …), SqlHeaders (redbSql.rowCount, redbSql.generatedKeys, …), SignalRHeaders (redbSignalR.ConnectionId, …), TcpHeaders, WsHeaders, ElasticsearchHeaders. Каждый — это static class из public const string, чтобы биндиться на KafkaHeaders.Offset, а не на stringly-typed "redbKafka.Offset", в котором легко опечататься. Правило простое: всё с префиксом Camel* или redb<Transport>.* — это заголовок (уезжает в брокер); всё в Properties — ваше и только процесса.
3. Исключение едет in-band
Когда процессор бросает, исключение не просто раскручивает стек — оно ловится в exchange.Exception, рядом с флагом ExceptionHandled. Именно это позволяет dead-letter-маршруту ветвиться по тому, почему что-то упало (when e.Exception is TimeoutException → …), а не просто по тому, что упало. Ошибка становится данными, по которым можно маршрутизировать. На этом мы плотно стоим в выпуске про обработку ошибок.
4. ExchangeId переживает клонирование
Каждый exchange при создании получает ExchangeId на основе Guid. Неочевидная часть: Clone() его сохраняет. Сплит на 500 частей или seda-хоп, который клонирует exchange, держат тот же id — поэтому логи и трейсы сшиваются обратно в одно происхождение. Идентичность переживает копирование — и это сделано намеренно.
5. DI-scope — и четыре способа скопировать exchange
Вот часть, которая действительно непростая, и именно она объясняет, почему каналы ведут себя так, как ведут.
Exchange может владеть DI-scope — IServiceScope на сообщение. Процессоры резолвят scoped-сервисы (DbContext, IRedbService, …) из exchange.ServiceProvider и получают те же инстансы на всё время жизни этого exchange. TransactionScope живёт ровно в этом scope. Поэтому вопрос «эти два exchange в одной транзакции?» сводится к «они делят DI-scope?».
Способов скопировать exchange — четыре, и отличаются они только тем, что делают с этим scope:
Метод |
Body/Headers |
DI-scope |
Владеет scope? |
Зачем |
|---|---|---|---|---|
|
копируются |
новый scope |
да |
передача в другой поток ( |
|
копируются |
общий с родителем |
нет |
параллельный fan-out внутри транзакции родителя |
|
новое сообщение |
новый scope |
да |
производный exchange, независимая жизнь |
|
новое сообщение |
общий с родителем |
нет |
последовательные дети на одном соединении |
// из Exchange.Clone() — ветка, создающая scope if (_scopeFactory != null) { clone._scopeFactory = _scopeFactory; clone._scope = _scopeFactory.CreateScope(); // <-- совершенно новый scope }
// из Exchange.CloneLinked() — ветка, делящая scope _ownsScope = false, _scope = _scope, // <-- ТОТ ЖЕ scope, и мы его не диспозим _scopeFactory = _scopeFactory
Запомните эту таблицу. Вся история транзакций seda против direct, и Multicast против брокерного хопа, — это просто какая строка сработала. Clone() (новый scope) — новая транзакция; CloneLinked() (общий scope) — та же самая.
Есть ещё ReleaseScopes() — он диспозит DI-scope'ы не трогая Body, чтобы аггрегатор мог освободить соединения к БД раньше, всё ещё держа накопленные данные сообщения. А DisposeAsync() чистит и тело (стримы, stream-кэши), и scope'ы. Объект реализует IAsyncDisposable не просто так.
Грабли: Clone() НЕ делает глубокую копию Body
Читаем Message.Clone() буквально:
public IMessage Clone() { var clone = new Message(Body) { ContentType = ContentType }; // ссылка на Body копируется foreach (var kvp in _headers) clone._headers[kvp.Key] = kvp.Value; return clone; }
Заголовки получают свежий словарь. Body копируется по ссылке. После seda-хопа exchange продюсера и клон воркера имеют независимые заголовки, properties и DI-scope'ы — но указывают на тот же самый объект тела. Если это тело — мутабельный List<T> или POCO, и обе стороны в него пишут, у вас гонка данных, которую клонирование как будто предотвратило. XML-doc обещает «deep copy» — писали с моделью Camel в голове, где это звучало естественно; честная правда — «deep copy всего, кроме payload'а». Считайте тело иммутабельным, пока оно в полёте, или клонируйте его сами.
Два API намеренно
Последнее, что заметите в IntelliSense: у каждого члена есть C#-идиоматичная форма (In, Body, GetHeader<T>) и Java-style алиас (getIn(), setBody(), getHeader<T>()). Это default interface methods над одним и тем же состоянием, оставленные, чтобы модель читалась так же, как Apache Camel, для тех, кто пришёл с JVM. Пользуйтесь любым — это один и тот же объект.
Теперь — каналы, которые всё это несут.
Четыре канала — две оси
direct, direct-vm, seda, vm — это как сегменты маршрута общаются друг с другом внутри процесса. Выбор между ними — самое частое, что новички делают неправильно, и теперь у вас есть словарь для почему: всё сводится к потокам и scope. Они разделяются по двум осям — синхронный/асинхронный и один контекст/между контекстами:
Схема |
Sync/Async |
Scope |
Клонирует exchange? |
Та же транзакция? |
|---|---|---|---|---|
|
синхронный |
один контекст |
нет |
да — тот же поток, тот же scope |
|
синхронный |
между контекстами |
нет |
да |
|
асинхронный |
один контекст |
да ( |
нет — новый scope |
|
асинхронный |
между контекстами |
да |
нет |
direct — это контраст. А seda — где живёт реальная работа и реальные грабли, поэтому на нём и задержимся.
direct:// — вызов метода, надевший URI
direct — это не очередь. Ни потока, ни буфера. Продюсер, отправляя в direct-эндпоинт, вызывает процессор консьюмера синхронно, в том же потоке:
// весь DirectProducer.Process целиком var processor = _endpoint.ConsumerProcessor ?? throw new InvalidOperationException("No consumer registered for direct endpoint ..."); await processor.Process(exchange, ct);
Exchange не клонируется. Тот же объект, тот же поток, тот же DI-scope — прямиком к консьюмеру. Отсюда три следствия:
Исключения проходят обратно к вызывающему. Throw в
direct-консьюмере всплывает в маршруте продюсера, где его ловитOnException/DoTry. (Вспомните §3 — оно ещё и садится наexchange.Exception.)Это та же транзакция. Тот же scope из таблицы выше, поэтому
direct-хоп внутри блока.Transaction()коммитится и откатывается вместе со всем вокруг.Консьюмер должен быть запущен первым, иначе отправка бросит.
directразделяет ваши определения маршрутов на именованные суб-маршруты — но не ваши потоки.
Вот и вся натура direct: zero-cost внутритранзакционный вызов, которому можно дать URI и переиспользовать. У него нет параметров, потому что нет машинерии. Используйте, чтобы разбить большой маршрут на читаемые переиспользуемые куски.
seda:// — асинхронная очередь, в деталях
seda (Staged Event-Driven Architecture) — противоположность direct в каждой клетке таблицы. Это настоящая in-memory очередь на System.Threading.Channels. Продюсер кладёт и сразу возвращается; один или несколько фоновых воркеров вычерпывают очередь на своих потоках.
// SedaProducer.Process — целиком var copy = exchange.Clone(); // §5: новый scope. ловушка живёт здесь. await _endpoint.Queue.Writer.WriteAsync(copy, ct);
В эти две строки зашиты два факта, и всё остальное про seda из них следует: он клонирует (поэтому воркер и продюсер никогда не делят scope — это Clone(), строка 1 таблицы, новый scope) и возвращается до того, как работа сделана (поэтому поток продюсера и его транзакция едут дальше).
Параметры
seda принимает три, все в URI: seda://name?concurrentConsumers=4&size=1000&timeout=30000.
Параметр |
Default |
Что делает |
Когда менять |
|---|---|---|---|
|
|
Сколько воркер-петель вычерпывают очередь параллельно |
Поднять, когда низ медленнее притока и порядок не важен |
|
|
Макс. размер очереди. |
Ставить границу всегда, когда продюсер может обгонять консьюмера (в проде — почти всегда) |
|
|
Объявлен как ожидание постановки в ограниченную очередь — см. честную ремарку ниже |
— |
concurrentConsumers — пропускная способность ценой порядка
Один воркер — дефолт, держит строгий FIFO. Подъём раскручивает N независимых петель:
// SedaConsumer.RunAsync _workers = new Task[_options.ConcurrentConsumers]; for (var i = 0; i < _options.ConcurrentConsumers; i++) _workers[i] = WorkerLoop(pollCt, processingCt);
// каждый воркер await foreach (var exchange in _endpoint.Queue.Reader.ReadAllAsync(pollCt)) { await ProcessWithTracking(exchange, processingCt); Interlocked.Increment(ref _processedCount); }
Два следствия, которые стоит сказать прямо:
Вы меняете порядок на пропускную способность. При
concurrentConsumers=1канал работает в режимеSingleReader(реальная оптимизация вSystem.Threading.Channels), и сообщения выходят по порядку. При N>1 N воркеров тянут конкурентно, и строгий FIFO исчезает — сообщение 2 может завершиться раньше сообщения 1. Поднимайте, только если обработка не по порядку допустима.Это per-endpoint снятие backpressure: медленный низ перестаёт блокировать верхнего продюсера, потому что продюсер только пишет в очередь и уходит.
size — ограниченная против безграничной, и почему почти всегда нужна ограниченная
Это параметр, который пропускают и жалеют. Эндпоинт выбирает реализацию канала по size:
Queue = options.Size > 0 ? Channel.CreateBounded<IExchange>(new BoundedChannelOptions(options.Size) { FullMode = BoundedChannelFullMode.Wait, // продюсер ждёт свободный слот SingleReader = options.ConcurrentConsumers == 1, SingleWriter = false }) : Channel.CreateUnbounded<IExchange>(/* ... */); // растёт, пока не кончится память
size=0(дефолт, без границ): очередь растёт так же быстро, как продюсеры пишут. Если консьюмер не успевает — это утечка памяти с дополнительными шагами. Ок для всплесковой работы с ограниченным объёмом; опасно для firehose.size>0(ограничено):FullMode = Waitзначит, что полная очередь заставляет продюсера ждать свободный слот — backpressure, который толкает замедление вверх по течению, а не в вашу кучу. Это то, что нужно в проде.
// ограниченная SEDA: 4 воркера, максимум 1000 в очереди, продюсер ждёт при заполнении From("seda://enrich?concurrentConsumers=4&size=1000") .Process(async (ex, ct) => await Enrich(ex, ct)) .To("rabbitmq://enriched");
timeout — честная ремарка
Объект опций документирует timeout (дефолт 30000 мс) как ожидание постановки для ограниченной очереди. Скажу прямо: в текущем коде продюсер кладёт через WriteAsync(copy, ct) и не применяет этот таймаут — полная ограниченная очередь заставляет продюсера ждать на канале, пока не освободится слот или не сработает CancellationToken, а не пока не пройдёт 30 секунд. Так что сегодня рассчитывайте на size и cancellation token; считайте timeout объявленным-но-ещё-не-подключённым и не стройте на нём дедлайн-предположений. (Помечаю это здесь, потому что угадывать поведение фреймворка по doc-комментарию — это ровно так, как доезжают до бага.)
Graceful shutdown — seda дренирует, а не дропает
Когда маршрут останавливается, seda не выбрасывает то, что уже в очереди:
// SedaConsumer.OnStopAccepting _endpoint.Queue.Writer.TryComplete(); // перестаём принимать новое; даём читателям дочитать
Завершение writer'а заставляет петлю ReadAllAsync воркеров дочитать оставшиеся элементы и затем чисто выйти (SedaConsumer — это DrainableConsumer). На graceful-остановке оставшиеся в очереди exchange'и обрабатываются, а не теряются.
Оговорка про долговечность
seda — in-memory и недолговечная. Graceful-остановка дренирует; падение или жёсткий kill — нет: всё, что было в канале, пропадает. seda — это at-most-once через рестарт процесса. Когда нужно, чтобы очередь пережила рестарт, — это брокер (rabbitmq, kafka), а не seda. seda — для развязки внутри процесса, не для долговечности.
Ловушка транзакции, раз и навсегда
Теперь §5 окупается. Поскольку seda зовёт Clone() — строка 1, новый DI-scope в другом потоке — всё, что после seda://-хопа, не в транзакции вызывающего.
From("kafka://orders") .Transaction() .To(Sql.Execute("INSERT …").Transacted()) .To("seda://post-process") // выполняется в НОВОМ scope, в другом потоке, .EndTransaction(); // ВНЕ этой транзакции
Если post-process бросит, INSERT выше уже закоммичен — seda-хоп вышел из транзакции в момент клонирования. Эту ошибку каждый делает ровно один раз. Лечится таблицей: если хоп должен делить транзакцию — используйте direct (нет клона, тот же scope); если вы действительно хотите отдать работу и ехать дальше — seda правилен, и вы принимаете новую границу. DSL один и тот же; всё решает scope.
Ментальная модель: direct = вызов функции, seda = почтовый ящик. Один сохраняет ваш поток и вашу транзакцию; другой меняет оба на пропускную способность и изоляцию.
direct-vm:// и vm:// — те же два, но между модулями
В мульти-модульном хосте (так redb.Tsak гоняет несколько модулей в одном процессе) каждый модуль — свой RouteContext. Обычные direct и seda ограничены одним контекстом: продюсер в модуле A не видит direct-консьюмера в модуле B. Варианты с -vm снимают ровно эту стену, разделяя реестр процессоров — а для vm ещё и канал — через DI-синглтон SharedVmRegistry:
direct-vm://— синхронный, межконтекстный, без клона. Консьюмер в модулеbillingвыставляетdirect-vm://charge; продюсер вordersзовёт его как локальный внутритранзакционный метод.vm://— асинхронный, межконтекстный, клонирует-и-кладёт-в-очередь. Межмодульный близнецseda, с теми же параметрамиconcurrentConsumersиsize(и тем жеClone(), а значит той же границей транзакции и той же оговоркой про общий Body).
Правило переносится чисто: direct-vm — для синхронного межмодульного вызова, делящего транзакцию вызывающего; vm — для отдай-и-едь-дальше между модулями. Та же семантика, что у их внутриконтекстных близнецов, — просто радиус поражения шире.
Как выбрать канал
Всё решение в одной таблице:
Вам нужно… |
Канал |
|---|---|
Переиспользуемый суб-маршрут, тот же поток, внутри моей транзакции |
|
То же, но консьюмер живёт в другом модуле |
|
Отдать работу фоновому воркеру и не ждать |
|
То же, через границы модулей |
|
Переживание рестарта процесса |
не канал — брокер ( |
И два факта, которые двигают каждую строку: клонирует ли он (новый scope = новая транзакция) и возвращается ли до того, как работа сделана. Всё остальное — детали.
Что дальше
Это фундамент. Теперь вы знаете, что течёт по маршруту (Exchange — In/Out, Properties против Headers, in-band-исключения и четыре scope-зависимых варианта клонирования) и четыре канала, которые его несут (direct/direct-vm синхронные-и-внутритранзакционные, seda/vm асинхронные-и-изолированные), вплоть до параметра, который решает, применит ваша очередь backpressure или съест вашу кучу.
Дальше в серии — Splitter + Aggregator. Размножаем одно сообщение во много, обрабатываем с ограниченным параллелизмом и собираем обратно — и различие Clone() против CloneLinked() из §5 оказывается всей историей о том, делит ли сплит транзакцию родителя. Плюс контракт стратегии аггрегации, где первый вызов отдаёт вам null-аккумулятор. Подписывайтесь, если хотите застать выпуск.
Если что-то здесь с вами поспорило — особенно граница транзакции seda или общий-Body-клон — напишите в комментариях. Эта обратная связь — ровно то, ради чего ранний OSS-релиз и существует.
Ссылки
redb.Route |
|
redb.Tsak (рантайм / мульти-модульный хост) |
|
redb.Core (хранилище) |
|
Обсуждения |
Всё под Apache 2.0.