Привет, Хабр!

Сегодня мы рассмотрим реализацию Outbox паттерна в разных ЯП. Цель простая: одним атомарным действием обновлять бизнес-данные и фиксировать факт события, а затем надежно доставлять его в брокер. Разберем общую схему, конкуренцию воркеров, ретраи, дедупликацию, метрики и покажу, как это собрать на C#, Java, Go, Python и Node.js.

Outbox-паттерн придумали, чтобы решить простую, но очень неприятную проблему: сервис обновил данные в своей базе, а сообщение во внешний мир отправить не получилось. Например, пользователь зарегистрировался, в таблице он появился, а сообщение «UserCreated» в Kafka не улетело. И всё, в соседних сервисах этого события как будто не было. Получается рассинхрон, который потом не разгребёшь.

Смысл паттерна в том, что мы не пытаемся одним махом сохранить и в базу, и в брокер. Вместо этого всё делаем в два шага. В транзакции вместе с бизнес-данными сохраняем строку в таблицу outbox: тип события, payload, ключ. Это атомарная операция, или пишется всё, или ничего. А дальше уже отдельный воркер вычитывает эти строки и публикует их в Kafka, RabbitMQ или куда угодно. Если брокер временно недоступен, строка просто остаётся в outbox и будет доставлена при следующей попытке.

В итоге мы никогда не потеряем событие, если база его зафиксировала. Да, событие может прийти дважды, поэтому на стороне потребителей нужен идемпотентный код (чтобы повтор не сломал логику). Зато вся схема прозрачная: есть таблица-очередь, есть фоновые диспатчеры, есть метрики по зависшим событиям. Для маленьких сервисов это можно написать вручную, для крупных используют готовые решения вроде Debezium Outbox SMT или встроенный outbox в MassTransit и NServiceBus.

Общая схема данных и инварианты

Исходим из Postgres. Минимально рабочая модель:

-- outbox_events: единица отправки, одна строка = одно событие.
CREATE TABLE outbox_events (
  id            uuid PRIMARY KEY,
  aggregate_type text NOT NULL,
  aggregate_id   text NOT NULL,
  event_type     text NOT NULL,
  payload        jsonb NOT NULL,
  headers        jsonb NOT NULL DEFAULT '{}'::jsonb,
  created_at     timestamptz NOT NULL DEFAULT now(),
  available_at   timestamptz NOT NULL DEFAULT now(),
  attempts       int NOT NULL DEFAULT 0,
  status         text NOT NULL DEFAULT 'PENDING', -- PENDING | SENT | FAILED
  dedup_key      text,  -- для идемпотентности на стороне потребителя или брокера
  partition_key  text,  -- для фиксации порядка по агрегату
  seq            bigint  -- монотонный номер по aggregate_id (опционально)
);

CREATE INDEX ON outbox_events (status, available_at);
CREATE INDEX ON outbox_events (partition_key);
CREATE UNIQUE INDEX IF NOT EXISTS outbox_dedup_uniq ON outbox_events(dedup_key) WHERE dedup_key IS NOT NULL;

-- Очередь чтения конкурентными воркерами: SKIP LOCKED позволяет безопасно шардировать нагрузку.
-- Важно ограничивать пачку.

Инварианты:

  1. Пишем бизнес-строку и outbox-строку в ОДНОЙ транзакции.

  2. Доставщик забирает пачку PENDING с FOR UPDATE SKIP LOCKED, чтобы воркеры не дрались.

  3. Ретраим с backoff, считаем attempts и логируем ошибку целиком.

  4. На потребителе либо идемпотентное применение, либо дедуп по dedup_key.

  5. Порядок по агрегату обеспечиваем partition_key и seq, и партиционируем события по ключу в брокере.

Алгоритм диспатчера

Псевдокод для понимания, он одинаковый для всех реализаций ниже:

loop:
  begin;
  rows = select * from outbox_events
         where status = 'PENDING' and available_at <= now()
         order by created_at
         for update skip locked
         limit N;
  if rows empty: commit; sleep(small); continue;

  for row in rows:
    try send_to_broker(row)
       mark SENT
    catch e:
       attempts++
       if attempts < max_attempts:
         available_at = now() + backoff(attempts)
         keep PENDING
       else:
         status = 'FAILED'
  commit;

Если Kafka уже включена в контур, можно вместо poller использовать Debezium Outbox SMT на коннекторе, который вычитывает изменения из outbox таблицы и публикует в топики без вашего кода.

C# + EF Core

Берем EF Core с перехватчиком SaveChanges для автодобавления outbox-события и BackgroundService для диспатча.

// Domain event
public sealed record UserCreated(Guid UserId, string Email);

// EF entity
public class User {
    public Guid Id { get; set; }
    public string Email { get; set; } = null!;
    public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
}

// Outbox EF entity
public class OutboxEvent {
    public Guid Id { get; set; }
    public string AggregateType { get; set; } = null!;
    public string AggregateId { get; set; } = null!;
    public string EventType { get; set; } = null!;
    public string Payload { get; set; } = null!;
    public string Headers { get; set; } = "{}";
    public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
    public DateTimeOffset AvailableAt { get; set; } = DateTimeOffset.UtcNow;
    public int Attempts { get; set; }
    public string Status { get; set; } = "PENDING";
    public string? DedupKey { get; set; }
    public string? PartitionKey { get; set; }
    public long? Seq { get; set; }
}

public sealed class AppDbContext(DbContextOptions<AppDbContext> options) : DbContext(options) {
    public DbSet<User> Users => Set<User>();
    public DbSet<OutboxEvent> Outbox => Set<OutboxEvent>();
}

Перехватчик: складываем outbox-строку в текущую транзакцию.

public sealed class OutboxSaveChangesInterceptor : SaveChangesInterceptor {
    public override ValueTask<InterceptionResult<int>> SavingChangesAsync(
        DbContextEventData eventData,
        InterceptionResult<int> result,
        CancellationToken cancellationToken = default)
    {
        var ctx = (AppDbContext)eventData.Context!;
        var entries = ctx.ChangeTracker.Entries<User>()
                         .Where(e => e.State == EntityState.Added)
                         .ToList();

        foreach (var e in entries) {
            var ev = new UserCreated(e.Entity.Id, e.Entity.Email);
            var outbox = new OutboxEvent {
                Id = Guid.NewGuid(),
                AggregateType = "User",
                AggregateId = e.Entity.Id.ToString(),
                EventType = nameof(UserCreated),
                Payload = JsonSerializer.Serialize(ev),
                DedupKey = $"user-created:{e.Entity.Id}",
                PartitionKey = e.Entity.Id.ToString()
            };
            ctx.Outbox.Add(outbox);
        }
        return base.SavingChangesAsync(eventData, result, cancellationToken);
    }
}

Диспатчер: конкурентная выборка пачки через сырой SQL с FOR UPDATE SKIP LOCKED. EF поддерживает ExecuteSql и маппинг в entity, но для адекватного контроля берем Npgsql напрямую.

public sealed class OutboxDispatcher : BackgroundService {
    private readonly IDbContextFactory<AppDbContext> _factory;
    private readonly IKafkaProducer _producer; // ваш интерфейс
    private readonly ILogger<OutboxDispatcher> _log;

    public OutboxDispatcher(IDbContextFactory<AppDbContext> factory, IKafkaProducer producer, ILogger<OutboxDispatcher> log) {
        _factory = factory; _producer = producer; _log = log;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
        while (!stoppingToken.IsCancellationRequested) {
            try {
                using var ctx = await _factory.CreateDbContextAsync(stoppingToken);
                await using var tx = await ctx.Database.BeginTransactionAsync(stoppingToken);

                // Берем пачку
                var rows = await ctx.Outbox
                    .FromSqlRaw("""
                        select * from outbox_events
                        where status = 'PENDING' and available_at <= now()
                        order by created_at
                        for update skip locked
                        limit 100
                    """).ToListAsync(stoppingToken);

                if (rows.Count == 0) {
                    await tx.CommitAsync(stoppingToken);
                    await Task.Delay(TimeSpan.FromMilliseconds(200), stoppingToken);
                    continue;
                }

                foreach (var row in rows) {
                    try {
                        await _producer.SendAsync(topic: "user-events",
                                                  key: row.PartitionKey ?? row.AggregateId,
                                                  value: row.Payload,
                                                  headers: row.Headers);
                        row.Status = "SENT";
                    }
                    catch (Exception ex) {
                        _log.LogError(ex, "outbox send failed {Id}", row.Id);
                        row.Attempts += 1;
                        if (row.Attempts < 10) {
                            row.AvailableAt = DateTimeOffset.UtcNow + Backoff(row.Attempts);
                        } else {
                            row.Status = "FAILED";
                        }
                    }
                }
                await ctx.SaveChangesAsync(stoppingToken);
                await tx.CommitAsync(stoppingToken);
            }
            catch (Exception e) {
                _log.LogError(e, "dispatcher loop error");
                await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
            }
        }
    }

    private static TimeSpan Backoff(int attempts) {
        var delay = Math.Min(TimeSpan.FromMinutes(5).TotalMilliseconds, Math.Pow(2, attempts) * 100);
        return TimeSpan.FromMilliseconds(delay);
    }
}

Если вы уже используете NServiceBus или MassTransit, у них есть встроенный Outbox. Это экономит сотни строк кода и прошито в пайплайн.

Java 21 + Spring Boot + JPA: poller и Debezium как альтернатива

С JPA картина аналогичная: пишем сущность outbox и сохраняем в той же транзакции, где меняем доменную модель. Для диспатча используем JdbcTemplate с явным SQL.

@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
  @Id
  private UUID id;
  private String aggregateType;
  private String aggregateId;
  private String eventType;
  @Column(columnDefinition = "jsonb")
  private String payload;
  @Column(columnDefinition = "jsonb")
  private String headers;
  private OffsetDateTime createdAt;
  private OffsetDateTime availableAt;
  private Integer attempts;
  private String status;
  private String dedupKey;
  private String partitionKey;
  private Long seq;
}

Сохранение в одной транзакции:

@Service
public class UserService {
  private final UserRepository users;
  private final OutboxRepository outbox;

  @Transactional
  public UUID createUser(String email) {
    var user = users.save(new User(email));
    var ev = new OutboxEvent(/* set fields, including dedupKey and partitionKey=user.getId().toString() */);
    outbox.save(ev);
    return user.getId();
  }
}

Диспатч пачками:

@Component
public class OutboxDispatcher {
  private final JdbcTemplate jdbc;
  private final KafkaTemplate<String, String> kafka;

  @Scheduled(fixedDelay = 200)
  public void tick() {
    TransactionTemplate tt = new TransactionTemplate(new DataSourceTransactionManager(jdbc.getDataSource()));
    tt.executeWithoutResult(tx -> {
      List<Map<String,Object>> rows = jdbc.queryForList("""
        select * from outbox_events
        where status = 'PENDING' and available_at <= now()
        order by created_at
        for update skip locked
        limit 100
      """);
      for (var row : rows) {
        try {
          kafka.send("user-events",
            (String)row.get("partition_key"),
            (String)row.get("payload")).get();
          jdbc.update("update outbox_events set status='SENT' where id = ?",
            row.get("id"));
        } catch (Exception ex) {
          jdbc.update("""
            update outbox_events
            set attempts = attempts + 1,
                available_at = CASE WHEN attempts+1 < 10
                                    THEN now() + make_interval(secs => round(power(2, attempts+1) * 0.1))
                                    ELSE available_at END,
                status = CASE WHEN attempts+1 >= 10 THEN 'FAILED' ELSE status END
            where id = ?
          """, row.get("id"));
        }
      }
    });
  }
}

CDC-вариант: Debezium Outbox SMT на коннекторе Postgres. В приложении только пишемстроки в outbox, коннектор сам сформирует события в Kafka, включая ключ по aggregate_id для порядка и маршрутизацию по event_type. Фрагмент конфигурации SMT:

{
  "transforms": "outbox",
  "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.table.fields.additional.placement": "type:header:eventType,aggregate_type:header:aggregateType",
  "transforms.outbox.route.by.field": "event_type",
  "transforms.outbox.table.field.event.key": "aggregate_id",
  "transforms.outbox.table.field.event.id": "id",
  "transforms.outbox.table.field.event.type": "event_type",
  "transforms.outbox.table.field.event.payload": "payload",
  "transforms.outbox.table.field.event.timestamp": "created_at"
}

Go + pgx: контроль транзакций и конкурентный poller

Go хорош тем, что легко держать в голове контроль над транзакциями и пулы коннектов.

type OutboxEvent struct {
    ID           uuid.UUID
    AggregateType string
    AggregateID   string
    EventType     string
    Payload       []byte
    Headers       []byte
    CreatedAt     time.Time
    AvailableAt   time.Time
    Attempts      int
    Status        string
    DedupKey      *string
    PartitionKey  *string
    Seq           *int64
}

func CreateUser(ctx context.Context, db *pgxpool.Pool, email string) (uuid.UUID, error) {
    tx, err := db.BeginTx(ctx, pgx.TxOptions{})
    if err != nil { return uuid.Nil, err }
    defer tx.Rollback(ctx)

    var userID uuid.UUID
    if err := tx.QueryRow(ctx, `
        insert into users(email) values ($1) returning id
    `, email).Scan(&userID); err != nil {
        return uuid.Nil, err
    }

    ev := OutboxEvent{
        ID: uuid.New(), AggregateType: "User",
        AggregateID: userID.String(), EventType: "UserCreated",
        Payload: mustJSON(struct{UserID, Email string}{userID.String(), email}),
        PartitionKey: ptr(userID.String()), DedupKey: ptr("user-created:" + userID.String()),
        Status: "PENDING",
    }
    _, err = tx.Exec(ctx, `
        insert into outbox_events(id, aggregate_type, aggregate_id, event_type, payload, headers, status, dedup_key, partition_key)
        values ($1,$2,$3,$4,$5,'{}','PENDING',$6,$7)
    `, ev.ID, ev.AggregateType, ev.AggregateID, ev.EventType, ev.Payload, ev.DedupKey, ev.PartitionKey)
    if err != nil { return uuid.Nil, err }

    if err := tx.Commit(ctx); err != nil { return uuid.Nil, err }
    return userID, nil
}

Диспатчер с конкурентной выборкой:

func DispatchLoop(ctx context.Context, db *pgxpool.Pool, prod KafkaProducer) {
    ticker := time.NewTicker(200 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            func() {
                tx, err := db.BeginTx(ctx, pgx.TxOptions{})
                if err != nil { log.Printf("tx error: %v", err); return }
                defer tx.Rollback(ctx)

                rows, err := tx.Query(ctx, `
                    select id, aggregate_id, payload, partition_key
                    from outbox_events
                    where status = 'PENDING' and available_at <= now()
                    order by created_at
                    for update skip locked
                    limit 100
                `)
                if err != nil { log.Printf("query error: %v", err); return }

                idsSent := make([]uuid.UUID, 0, 100)
                for rows.Next() {
                    var id uuid.UUID; var aggID, payload, pkey string
                    if err := rows.Scan(&id, &aggID, &payload, &pkey); err != nil { log.Printf("scan: %v", err); return }
                    if err := prod.Send("user-events", pkey, payload); err != nil {
                        if _, e := tx.Exec(ctx, `
                            update outbox_events
                            set attempts = attempts + 1,
                                available_at = now() + (power(2, attempts+1) * interval '100 milliseconds'),
                                status = case when attempts+1 >= 10 then 'FAILED' else status end
                            where id = $1
                        `, id); e != nil { log.Printf("retry update: %v", e) }
                    } else {
                        idsSent = append(idsSent, id)
                    }
                }
                if len(idsSent) > 0 {
                    _, err = tx.Exec(ctx, `
                        update outbox_events set status='SENT' where id = any($1)
                    `, idsSent)
                    if err != nil { log.Printf("mark sent error: %v", err); return }
                }
                if err := tx.Commit(ctx); err != nil { log.Printf("commit: %v", err) }
            }()
        }
    }
}

Python 3.12 + SQLAlchemy 2.0: sync или async, разницы мало

На Python делаем то же самое. Покажу синхронный вариант для простоты.

from sqlalchemy import text
from sqlalchemy.orm import Session
import json, uuid, time
from datetime import datetime, timedelta

def create_user(session: Session, email: str):
    user_id = session.execute(text("insert into users(email) values (:e) returning id"), {"e": email}).scalar_one()
    payload = json.dumps({"user_id": str(user_id), "email": email})
    session.execute(text("""
        insert into outbox_events(id, aggregate_type, aggregate_id, event_type, payload, headers, status, dedup_key, partition_key)
        values (:id, 'User', :aid, 'UserCreated', :payload, '{}'::jsonb, 'PENDING', :dk, :pk)
    """), {
        "id": str(uuid.uuid4()),
        "aid": str(user_id),
        "payload": payload,
        "dk": f"user-created:{user_id}",
        "pk": str(user_id)
    })

def dispatch_loop(engine, producer):
    while True:
        with engine.begin() as conn:
            rows = conn.execute(text("""
                select id, payload, coalesce(partition_key, aggregate_id) as k
                from outbox_events
                where status='PENDING' and available_at <= now()
                order by created_at
                for update skip locked
                limit 100
            """)).mappings().all()
            sent_ids = []
            for r in rows:
                try:
                    producer.send("user-events", key=r["k"], value=r["payload"])
                    sent_ids.append(r["id"])
                except Exception:
                    conn.execute(text("""
                        update outbox_events
                        set attempts = attempts + 1,
                            available_at = now() + (power(2, attempts+1) * interval '100 milliseconds'),
                            status = case when attempts+1 >= 10 then 'FAILED' else status end
                        where id = :id
                    """), {"id": r["id"]})
            if sent_ids:
                conn.execute(text("update outbox_events set status='SENT' where id = any(:ids)"),
                             {"ids": sent_ids})
        time.sleep(0.2)

Node.js 22 + Prisma: транзакция через prisma.$transaction, выборка через сырой SQL

Prisma не дает удобный API на row-level locking, поэтому используем prisma.$queryRaw c FOR UPDATE SKIP LOCKED.

// сохранение
await prisma.$transaction(async (tx) => {
  const user = await tx.user.create({ data: { email } });
  await tx.$executeRawUnsafe(`
    insert into outbox_events(id, aggregate_type, aggregate_id, event_type, payload, headers, status, dedup_key, partition_key)
    values (gen_random_uuid(), 'User', $1, 'UserCreated', $2, '{}'::jsonb, 'PENDING', $3, $1)
  `, user.id, JSON.stringify({ userId: user.id, email }), `user-created:${user.id}`);
});

// диспатчер
async function dispatchOnce() {
  return prisma.$transaction(async (tx) => {
    const rows = await tx.$queryRawUnsafe(`
      select id, payload, coalesce(partition_key, aggregate_id) as k
      from outbox_events
      where status='PENDING' and available_at <= now()
      order by created_at
      for update skip locked
      limit 100
    `);
    const sent: string[] = [];
    for (const r of rows as any[]) {
      try {
        await kafka.send({ topic: "user-events", messages: [{ key: r.k, value: r.payload }] });
        sent.push(r.id);
      } catch {
        await tx.$executeRawUnsafe(`
          update outbox_events
          set attempts = attempts + 1,
              available_at = now() + (power(2, attempts+1) * interval '100 milliseconds'),
              status = case when attempts+1 >= 10 then 'FAILED' else status end
          where id = $1
        `, r.id);
      }
    }
    if (sent.length) {
      await tx.$executeRawUnsafe(`update outbox_events set status='SENT' where id = any($1)`, sent);
    }
  });
}
setInterval(() => dispatchOnce().catch(console.error), 200);

CDC-вариант: Debezium Outbox SMT

Если Kafka уже в контур включена, можно не писать диспатчер. Debezium будет слушать change-log БД и транслировать строки из outbox в топики.

Фрагмент конфигурации SMT:

{
  "transforms": "outbox",
  "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.route.by.field": "event_type",
  "transforms.outbox.table.field.event.id": "id",
  "transforms.outbox.table.field.event.key": "aggregate_id",
  "transforms.outbox.table.field.event.type": "event_type",
  "transforms.outbox.table.field.event.payload": "payload",
  "transforms.outbox.table.field.event.timestamp": "created_at",
  "transforms.outbox.table.fields.additional.placement":
    "event_version:header:eventVersion,aggregate_type:header:aggregateType"
}

Конечно, тут из минусов то, что получаем доп инфраструктуру, задержки CDC и внимательное отношение к правам и нагрузке на WAL.

Финальная проверка цели

Мы собрали полный путь: модель outbox, атомарная запись, конкурентный диспатч через SKIP LOCKED, ретраи, дедуп, порядок и метрики. Показал рабочие куски кода под пять стеков и альтернативу на CDC. Этого достаточно, чтобы внедрить паттерн без сюрпризов и заняться важным — доменной логикой. Если нужно, могу дописать секцию под ваш брокер и формат событий, или развернуть примеры под конкретную СУБД и фреймворк.


Когда вы строите систему, приходится балансировать: скорость против памяти, гибкость против поддержки, простота против расширяемости. Эти компромиссы неизбежны, и именно их чаще всего недооценивают — до тех пор, пока не начинают болеть реальные проекты. Если вы хотите глубже разобраться, как архитектурные решения отражаются на производительности и качестве кода, записывайтесь на бесплатные уроки:

Немного практики в тему — попробуйте пройти вступительный тест по Архитектуре и шаблонам проектирования и узнаете, есть ли пробелы в знаниях.

Комментарии (0)