В микросервисной архитектуре мы постоянно сталкиваемся с задачей: сохранить изменения в базе и гарантированно отправить событие в Kafka. На первый взгляд звучит просто — сделал транзакцию, отправил сообщение, закоммитил. Но в реальности между базой данных и брокером сообщений никакой общей транзакции нет.

Именно здесь мы сталкиваемся с самыми частыми ошибками:
В БД данные сохранились, а сообщение в Kafka не улетело
или наоборот — Сообщение ушло, а транзакция в БД откатилась.

Такие баги сложно воспроизвести, сложно отлаживать и ещё сложнее объяснять, почему данные между сервисами разъезжаются.

Паттерн Transaction Outbox решает эту проблему уже много лет. Он изолирует бизнес-логику от технической — мы сохраняем событие в таблицу внутри транзакции, а отправкой сообщений занимается отдельный компонент. Но каждая команда и каждый сервис реализуют Outbox по-своему — где-то шедулер, где-то cron, где-то retry-таблица, где-то кастомный SELECT FOR UPDATE. В итоге код размазывается, дублируется и становится неподдерживаемым.

В этой статье я покажу, как я вынес всю логику Transaction Outbox в отдельный Spring Boot Starter, который можно подключить одной зависимостью. Он создаёт таблицу Outbox, конфигурирует шедулер, отвечает за отправку в Kafka и очистку, позволяя микросервисам сосредоточиться только на бизнес-логике.

Что такое transaction outbox паттерн

Transaction Outbox — это архитектурный паттерн, который гарантирует надёжную доставку событий во внешние системы (Kafka, RabbitMQ, Webhooks) за счёт сохранения сообщений в отдельную outbox-таблицу внутри той же транзакции, что и бизнес-изменения в БД.

Отправка сообщений выполняется асинхронно — отдельным процессом или шедулером, который читает таблицу outbox и публикует события.

Паттерн решает проблему несогласованности данных между БД и брокером сообщений, когда:

  • транзакция в БД может закоммититься, а отправка в Kafka — упасть

  • сообщение может улететь в Kafka, но транзакция БД — откатится

Напишем spring-boot-starter для реализации паттерна

Я буду использовать стек Kotlin + Spring + JOOQ. Этот стартер также подойдет под Java + Spring + JPA. Мой выбор основан на стеке технологий на проекте)

Мы будем сохранять данные о сообщении в бд, отправлять их шедулером в кафку в нужные топики.

Начнем с создания файла resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
куда мы пропишем путь до класса конфигурации:

com.example.outbox.configuration.OutBoxAutoConfiguration

Так как нам нужно сохранять данные в таблицу, которую нужно создать специально для стартера, есть несколько вариантов:

  1. Миграция находится в стартере


    Плюсы:

    • Единая точка нахождения кода для структуры outbox-таблицы.

    • Не нужно копировать миграции в каждый сервис — подключил стартер, и всё само прилетело.

    • Гарантия совместимости — версия стартера = версия схемы.

    • Минимум человеческого фактора

    Минусы:

    • Нужно аккуратно работать с flyway-путями, чтобы не пересекаться с миграциями приложения.(Мы используем R миграцию)

  2. Миграция находится на стороне сервисов

    Плюсы:

    • Полный контроль у сервиса — может расширять схему, накатывать кастомные поля, индексы.

    • Можно тонко настраивать миграции под свои юзкейсы.

    Минусы:

    • Легко получить рассинхрон стартер ↔ схема БД.

    • Каждый сервис обязан вручную добавлять миграцию — копипаста, ошибки.

    • Сложнее обновлять стартер (надо помнить о версиях миграций).

Мы же будем выбирать первый вариант, будем использовать flyway, но миграцию сделаем с префиксом R, которая не будет мешать другим миграциям с префиксом V. Так как checksum миграции изменяться не будет, то она будет наката 1 раз, также будем использовать CREATE TABLE IF NOT EXISTS.

Напишем скрипт миграции resources/db/migration/R__create-outbox-table.sql:

CREATE TABLE IF NOT EXISTS outbox_messages
(
    id          UUID                  NOT NULL PRIMARY KEY,
    status      TEXT                  NOT NULL DEFAULT 'WAITING',
    payload     JSONB                 NOT NULL,
    last_error  TEXT,
    created_at  TIMESTAMP             NOT NULL DEFAULT LOCALTIMESTAMP,
    updated_at  TIMESTAMP             NOT NULL DEFAULT LOCALTIMESTAMP
);

Таблица очень простая, она может изменяться в зависимости от ваших бизнес требований. Например добавить поля object_id или object_type для идентификации объекта, если их может быть несколько и тд. Также тут стоит по дефолту status = 'WAITITNG', тут тоже все зависит от вашей статусной модели.

Затем нам нужно сохранить объект в бд, тут опять же все зависит от вашего стека, либо это обычный JpaRepository и метод save, либо кастомное сохранение объекта в зависимости от вашей логики.

В моем случае метод выглядит так:

fun <T> createMessage(messageObject: T) {
        val payload = objectMapper.writeValueAsString(messageObject)
        newRecord().apply {
            this.id = UUID.randomUUID()
            this.payload = JSONB.valueOf(payload)
            this.insert()
        }
    }

Также нам нужен метод для поиска объектов и отправки, так как мы получаем объекты, чтобы их изменить (после отправки меняется статус), я достаю их через select for update

    fun findWaiting(limit: Int = 500) = dslContext.selectFrom(OUTBOX_MESSAGES)
        .where(OUTBOX_MESSAGES.STATUS.eq(OutboxMessageStatus.WAITING))
        .orderBy(OUTBOX_MESSAGES.CREATED_AT.asc())
        .limit(limit)
        .forUpdate()
        .noWait()
        .fetch()

Также я добавил бы метод очистки таблицы, тут все зависит от того, сколько вы хотите хранить сообщения и собираетесь ли чистить таблицу:

    fun deleteOldMessages(deleteDays: Int = 30) {
        val currentDate = LocalDateTime.now()
        val thirtyDaysAgo = currentDate.minus(deleteDays, ChronoUnit.DAYS)
        dslContext.deleteFrom(OUTBOX_MESSAGES)
            .where(OUTBOX_MESSAGES.UPDATED_AT.lessOrEqual(thirtyDaysAgo))
            .execute()
    }

Давайте теперь напишем OutboxProperties для того, чтобы задать параметры конфигурации:

data class OutboxProperties(
    var sendTopic: String = "", // Здесь может быть массив, если вы собираетесь отправлять в несколько топиков
    var limitMessage: Int = 500,
    var deleteOldMessageCron: String = "0 0 12 * * *",
    var deleteMessageDays: Int = 30,
    var sendMessageDelay: String = "60000"
)

Теперь давайте напишем сервис, который будет отправлять сообщения в kafka:

open class OutboxMessageService(
    private val outboxProperties: OutboxProperties,
    private val outboxMessageRepository: OutboxMessageRepository,
    private val rtmKafkaTemplate: KafkaTemplate<String, JsonNode>,
    private val objectMapper: ObjectMapper
) {

    private val logger = LoggerFactory.getLogger(OutboxMessageService::class.java)

    @Transactional
    open fun deleteOldMessages() {
        outboxMessageRepository.deleteOldMessages(outboxProperties.deleteMessageDays)
    }

    @Transactional
    open fun findAndSendMessages(): List<OutboxMessagesRecord> {
        val waitingMessages = outboxMessageRepository.findWaiting(outboxProperties.limitMessage)
        if (waitingMessages.isEmpty()) {
            logger.info("Сообщений для отправки в топик ${outboxProperties.sendTopic} не найдено")
            return emptyList()
        }
        waitingMessages.forEach {
            logger.info("Отправка сообщения с id ${it.id} в топик $outboxProperties")
            sendMessage(objectMapper.readTree(it.payload?.data()?.trim()))
        }
        val updateIds = waitingMessages.mapNotNull { it.id }
        outboxMessageRepository.updateSentStatusByIds(updateIds)
        return waitingMessages
    }

    private fun sendMessage(message: JsonNode) {
        rtmKafkaTemplate.send(
            outboxProperties.sendTopic,
            message
        )
    }
}

Следующим шагом будет написание OutboxMessagesScheduler, который будет раз в N время работать с сообщениями:

class OutboxMessagesScheduler(
    private val outboxMessageService: OutboxMessageService,
    private val properties: OutboxProperties
) {

    private val logger = LoggerFactory.getLogger(OutboxMessagesScheduler::class.java)

    @Scheduled(fixedDelayString = "\${outbox.send-message-delay:60000}")
    fun sendMessage() {
        logger.info("Начало отправки сообщений в топик ${properties.sendTopic}")
        var waitingMessages = outboxMessageService.findAndSendMessages()
        while (waitingMessages.isNotEmpty()) {
            waitingMessages = outboxMessageService.findAndSendMessages()
        }
        logger.info("Отправка сообщений в топик ${properties.sendTopic} завершена")
    }

    @Scheduled(cron = "\${outbox.delete-old-message-cron:0 0 12 * * *}")
    fun deleteOldMessages() {
        logger.info("Начало очистки таблицы сообщений")
        outboxMessageService.deleteOldMessages()
        logger.info("Очистка сообщений завершена")
    }
}

И наконец напишем OutBoxAutoConfiguration для конфигурации наших бинов:

@Configuration
@EnableScheduling
@EnableConfigurationProperties(OutBoxAutoConfiguration::class)
@ConfigurationPropertiesScan("com.example.outbox.configuration")
class OutBoxAutoConfiguration {

    @Bean
    fun outBoxRepository(
        @Value("\${spring.flyway.default-schema}") schema: String,
        dslContext: DSLContext,
        objectMapper: ObjectMapper
    ): OutboxMessageRepository {
        dslContext.settings().withRenderMapping(
            RenderMapping()
                .withSchemata(
                    MappedSchema().withInput("public").withOutput(schema)
                )
        )
        return OutboxMessageRepository(dslContext, objectMapper)
    }

    @Bean
    @ConfigurationProperties(prefix = "outbox")
    fun customOutBoxProperties() = OutboxProperties()

    @Bean
    fun outboxService(
        outboxProperties: OutboxProperties,
        outboxRepository: OutboxMessageRepository,
        rtmKafkaTemplate: KafkaTemplate<String, JsonNode>,
        objectMapper: ObjectMapper,
    ) = OutboxMessageService(outboxProperties, outboxRepository, rtmKafkaTemplate, objectMapper)

    @Bean
    fun scheduler(
        outboxMessageService: OutboxMessageService,
        properties: OutboxProperties
    ) = OutboxMessagesScheduler(outboxMessageService, properties)
}

Значения конфигов в application.yml файле:

outbox:
  send-topic: тут топик
  limit-message: 120
  delete-old-message-cron: 1 * * * * *
  delete-message-days: 30
  outbox.send-message-delay: 1000

Теперь остается только подключить стартер в ваш проект, сохранить сообщение через repository и все, готово)

Расширения, которые стоит добавить в реальном проекте

В этой статье мы сосредоточились на минимально жизнеспособной реализации Transaction Outbox, чтобы показать саму идею и архитектурный подход.


Однако в реальных системах почти всегда добавляют:

  • Retry-механику с backoff и лимитом попыток

  • Идемпотентность на стороне consumer’ов через таблицу processed_messages (сохранять идентификаторы в сервисе чтения сообщения и не читать повторно сообщения с тем же id)

  • Метрики (успешно/ошибка/время обработки)

  • Dead Letter Queue при превышении лимита retry

Эти элементы легко надстраиваются поверх базового стартера и не усложняют его архитектуру.

Итог

Паттерн Transaction Outbox — один из самых надежных способов обеспечить согласованность данных между вашей БД и брокером сообщений. Он устраняет проблему двойной записи, разрывов транзакций и потерянных событий. В этой статье мы разобрали, как реализовать этот паттерн в виде удобного Spring Boot Starter’а, который инкапсулирует логику и позволяет сервисам сосредоточиться на бизнес - функциональности.

Всем спасибо за внимание и хорошего дня!

Комментарии (3)


  1. Elinkis
    15.11.2025 18:08

    Благодарю! Возьму в свой проект


  1. Gabenskiy
    15.11.2025 18:08

    В этой архитектуре есть один существенный изъян: при большой нагрузке авто вакуум будет сильно ресурсов кушать. Лучше модернизировать архитектуру так: добавляем интерфейс, в дефолтной реализации которого удаляем по одному, а при необходимости пользователь библиотеке переопределит этот интерфейс с использованием партиционирования


  1. remindscope
    15.11.2025 18:08

    Удаление старых записей стоит еще и по статусу проверять, а то мало ли что.