В микросервисной архитектуре мы постоянно сталкиваемся с задачей: сохранить изменения в базе и гарантированно отправить событие в Kafka. На первый взгляд звучит просто — сделал транзакцию, отправил сообщение, закоммитил. Но в реальности между базой данных и брокером сообщений никакой общей транзакции нет.
Именно здесь мы сталкиваемся с самыми частыми ошибками:
В БД данные сохранились, а сообщение в Kafka не улетело
или наоборот — Сообщение ушло, а транзакция в БД откатилась.
Такие баги сложно воспроизвести, сложно отлаживать и ещё сложнее объяснять, почему данные между сервисами разъезжаются.
Паттерн Transaction Outbox решает эту проблему уже много лет. Он изолирует бизнес-логику от технической — мы сохраняем событие в таблицу внутри транзакции, а отправкой сообщений занимается отдельный компонент. Но каждая команда и каждый сервис реализуют Outbox по-своему — где-то шедулер, где-то cron, где-то retry-таблица, где-то кастомный SELECT FOR UPDATE. В итоге код размазывается, дублируется и становится неподдерживаемым.
В этой статье я покажу, как я вынес всю логику Transaction Outbox в отдельный Spring Boot Starter, который можно подключить одной зависимостью. Он создаёт таблицу Outbox, конфигурирует шедулер, отвечает за отправку в Kafka и очистку, позволяя микросервисам сосредоточиться только на бизнес-логике.
Что такое transaction outbox паттерн
Transaction Outbox — это архитектурный паттерн, который гарантирует надёжную доставку событий во внешние системы (Kafka, RabbitMQ, Webhooks) за счёт сохранения сообщений в отдельную outbox-таблицу внутри той же транзакции, что и бизнес-изменения в БД.
Отправка сообщений выполняется асинхронно — отдельным процессом или шедулером, который читает таблицу outbox и публикует события.
Паттерн решает проблему несогласованности данных между БД и брокером сообщений, когда:
транзакция в БД может закоммититься, а отправка в Kafka — упасть
сообщение может улететь в Kafka, но транзакция БД — откатится
Напишем spring-boot-starter для реализации паттерна
Я буду использовать стек Kotlin + Spring + JOOQ. Этот стартер также подойдет под Java + Spring + JPA. Мой выбор основан на стеке технологий на проекте)
Мы будем сохранять данные о сообщении в бд, отправлять их шедулером в кафку в нужные топики.
Начнем с создания файла resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
куда мы пропишем путь до класса конфигурации:
com.example.outbox.configuration.OutBoxAutoConfiguration
Так как нам нужно сохранять данные в таблицу, которую нужно создать специально для стартера, есть несколько вариантов:
-
Миграция находится в стартере
Плюсы:Единая точка нахождения кода для структуры outbox-таблицы.
Не нужно копировать миграции в каждый сервис — подключил стартер, и всё само прилетело.
Гарантия совместимости — версия стартера = версия схемы.
Минимум человеческого фактора
Минусы:
Нужно аккуратно работать с flyway-путями, чтобы не пересекаться с миграциями приложения.(Мы используем R миграцию)
-
Миграция находится на стороне сервисов
Плюсы:
Полный контроль у сервиса — может расширять схему, накатывать кастомные поля, индексы.
Можно тонко настраивать миграции под свои юзкейсы.
Минусы:
Легко получить рассинхрон стартер ↔ схема БД.
Каждый сервис обязан вручную добавлять миграцию — копипаста, ошибки.
Сложнее обновлять стартер (надо помнить о версиях миграций).
Мы же будем выбирать первый вариант, будем использовать flyway, но миграцию сделаем с префиксом R, которая не будет мешать другим миграциям с префиксом V. Так как checksum миграции изменяться не будет, то она будет наката 1 раз, также будем использовать CREATE TABLE IF NOT EXISTS.
Напишем скрипт миграции resources/db/migration/R__create-outbox-table.sql:
CREATE TABLE IF NOT EXISTS outbox_messages
(
id UUID NOT NULL PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'WAITING',
payload JSONB NOT NULL,
last_error TEXT,
created_at TIMESTAMP NOT NULL DEFAULT LOCALTIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT LOCALTIMESTAMP
);
Таблица очень простая, она может изменяться в зависимости от ваших бизнес требований. Например добавить поля object_id или object_type для идентификации объекта, если их может быть несколько и тд. Также тут стоит по дефолту status = 'WAITITNG', тут тоже все зависит от вашей статусной модели.
Затем нам нужно сохранить объект в бд, тут опять же все зависит от вашего стека, либо это обычный JpaRepository и метод save, либо кастомное сохранение объекта в зависимости от вашей логики.
В моем случае метод выглядит так:
fun <T> createMessage(messageObject: T) {
val payload = objectMapper.writeValueAsString(messageObject)
newRecord().apply {
this.id = UUID.randomUUID()
this.payload = JSONB.valueOf(payload)
this.insert()
}
}
Также нам нужен метод для поиска объектов и отправки, так как мы получаем объекты, чтобы их изменить (после отправки меняется статус), я достаю их через select for update
fun findWaiting(limit: Int = 500) = dslContext.selectFrom(OUTBOX_MESSAGES)
.where(OUTBOX_MESSAGES.STATUS.eq(OutboxMessageStatus.WAITING))
.orderBy(OUTBOX_MESSAGES.CREATED_AT.asc())
.limit(limit)
.forUpdate()
.noWait()
.fetch()
Также я добавил бы метод очистки таблицы, тут все зависит от того, сколько вы хотите хранить сообщения и собираетесь ли чистить таблицу:
fun deleteOldMessages(deleteDays: Int = 30) {
val currentDate = LocalDateTime.now()
val thirtyDaysAgo = currentDate.minus(deleteDays, ChronoUnit.DAYS)
dslContext.deleteFrom(OUTBOX_MESSAGES)
.where(OUTBOX_MESSAGES.UPDATED_AT.lessOrEqual(thirtyDaysAgo))
.execute()
}
Давайте теперь напишем OutboxProperties для того, чтобы задать параметры конфигурации:
data class OutboxProperties(
var sendTopic: String = "", // Здесь может быть массив, если вы собираетесь отправлять в несколько топиков
var limitMessage: Int = 500,
var deleteOldMessageCron: String = "0 0 12 * * *",
var deleteMessageDays: Int = 30,
var sendMessageDelay: String = "60000"
)
Теперь давайте напишем сервис, который будет отправлять сообщения в kafka:
open class OutboxMessageService(
private val outboxProperties: OutboxProperties,
private val outboxMessageRepository: OutboxMessageRepository,
private val rtmKafkaTemplate: KafkaTemplate<String, JsonNode>,
private val objectMapper: ObjectMapper
) {
private val logger = LoggerFactory.getLogger(OutboxMessageService::class.java)
@Transactional
open fun deleteOldMessages() {
outboxMessageRepository.deleteOldMessages(outboxProperties.deleteMessageDays)
}
@Transactional
open fun findAndSendMessages(): List<OutboxMessagesRecord> {
val waitingMessages = outboxMessageRepository.findWaiting(outboxProperties.limitMessage)
if (waitingMessages.isEmpty()) {
logger.info("Сообщений для отправки в топик ${outboxProperties.sendTopic} не найдено")
return emptyList()
}
waitingMessages.forEach {
logger.info("Отправка сообщения с id ${it.id} в топик $outboxProperties")
sendMessage(objectMapper.readTree(it.payload?.data()?.trim()))
}
val updateIds = waitingMessages.mapNotNull { it.id }
outboxMessageRepository.updateSentStatusByIds(updateIds)
return waitingMessages
}
private fun sendMessage(message: JsonNode) {
rtmKafkaTemplate.send(
outboxProperties.sendTopic,
message
)
}
}
Следующим шагом будет написание OutboxMessagesScheduler, который будет раз в N время работать с сообщениями:
class OutboxMessagesScheduler(
private val outboxMessageService: OutboxMessageService,
private val properties: OutboxProperties
) {
private val logger = LoggerFactory.getLogger(OutboxMessagesScheduler::class.java)
@Scheduled(fixedDelayString = "\${outbox.send-message-delay:60000}")
fun sendMessage() {
logger.info("Начало отправки сообщений в топик ${properties.sendTopic}")
var waitingMessages = outboxMessageService.findAndSendMessages()
while (waitingMessages.isNotEmpty()) {
waitingMessages = outboxMessageService.findAndSendMessages()
}
logger.info("Отправка сообщений в топик ${properties.sendTopic} завершена")
}
@Scheduled(cron = "\${outbox.delete-old-message-cron:0 0 12 * * *}")
fun deleteOldMessages() {
logger.info("Начало очистки таблицы сообщений")
outboxMessageService.deleteOldMessages()
logger.info("Очистка сообщений завершена")
}
}
И наконец напишем OutBoxAutoConfiguration для конфигурации наших бинов:
@Configuration
@EnableScheduling
@EnableConfigurationProperties(OutBoxAutoConfiguration::class)
@ConfigurationPropertiesScan("com.example.outbox.configuration")
class OutBoxAutoConfiguration {
@Bean
fun outBoxRepository(
@Value("\${spring.flyway.default-schema}") schema: String,
dslContext: DSLContext,
objectMapper: ObjectMapper
): OutboxMessageRepository {
dslContext.settings().withRenderMapping(
RenderMapping()
.withSchemata(
MappedSchema().withInput("public").withOutput(schema)
)
)
return OutboxMessageRepository(dslContext, objectMapper)
}
@Bean
@ConfigurationProperties(prefix = "outbox")
fun customOutBoxProperties() = OutboxProperties()
@Bean
fun outboxService(
outboxProperties: OutboxProperties,
outboxRepository: OutboxMessageRepository,
rtmKafkaTemplate: KafkaTemplate<String, JsonNode>,
objectMapper: ObjectMapper,
) = OutboxMessageService(outboxProperties, outboxRepository, rtmKafkaTemplate, objectMapper)
@Bean
fun scheduler(
outboxMessageService: OutboxMessageService,
properties: OutboxProperties
) = OutboxMessagesScheduler(outboxMessageService, properties)
}
Значения конфигов в application.yml файле:
outbox:
send-topic: тут топик
limit-message: 120
delete-old-message-cron: 1 * * * * *
delete-message-days: 30
outbox.send-message-delay: 1000
Теперь остается только подключить стартер в ваш проект, сохранить сообщение через repository и все, готово)
Расширения, которые стоит добавить в реальном проекте
В этой статье мы сосредоточились на минимально жизнеспособной реализации Transaction Outbox, чтобы показать саму идею и архитектурный подход.
Однако в реальных системах почти всегда добавляют:
Retry-механику с backoff и лимитом попыток
Идемпотентность на стороне consumer’ов через таблицу processed_messages (сохранять идентификаторы в сервисе чтения сообщения и не читать повторно сообщения с тем же id)
Метрики (успешно/ошибка/время обработки)
Dead Letter Queue при превышении лимита retry
Эти элементы легко надстраиваются поверх базового стартера и не усложняют его архитектуру.
Итог
Паттерн Transaction Outbox — один из самых надежных способов обеспечить согласованность данных между вашей БД и брокером сообщений. Он устраняет проблему двойной записи, разрывов транзакций и потерянных событий. В этой статье мы разобрали, как реализовать этот паттерн в виде удобного Spring Boot Starter’а, который инкапсулирует логику и позволяет сервисам сосредоточиться на бизнес - функциональности.
Всем спасибо за внимание и хорошего дня!
Комментарии (3)

Gabenskiy
15.11.2025 18:08В этой архитектуре есть один существенный изъян: при большой нагрузке авто вакуум будет сильно ресурсов кушать. Лучше модернизировать архитектуру так: добавляем интерфейс, в дефолтной реализации которого удаляем по одному, а при необходимости пользователь библиотеке переопределит этот интерфейс с использованием партиционирования

remindscope
15.11.2025 18:08Удаление старых записей стоит еще и по статусу проверять, а то мало ли что.
Elinkis
Благодарю! Возьму в свой проект