Привет, Хабр. Я Витя Михайлов, Backend Lead в Garage Eight, и больше 6 лет у меня RabbitMQ в продакшене. За это время мы с командой узнали много интересных вещей. О них я рассказывал этим летом, выступая на Saint Highload++ с докладом про рецепт правильного приготовления RabbitMQ. Там же я поделился 8 практическими кейсами. Сегодня в статье разберу один из них — Publish without Confirmation. Расскажу, почему из коробки Publish не имеет гарантий доставки в RabbitMQ, и что с этим делать.
Сразу прыгаем в карьер с разбега. Метод отправки сообщения basic.publish не имеет возвратной части по протоколу AMQP 0-9-1, а на нем стоит весь RabbitMQ. C версии 4.0 вшита поддержка AMQP 1.0, но там фактически просто декораторы заехали под капот кролика, да и basic.publish не изменился по спеке 1.0.

У каждого такого метода есть возвратная часть, которая говорит some-method-ok. Однако вот так выглядит вариант работы basic.publish в deafult-режиме, когда мы ни в какой дополнительный формат не переводим наш канал:

То есть наш basic.publish работает как fire and forget в семантике at most once. Ну и давайте надеяться на лучшие времена, и что мы ничего не потеряем, ведь мы же в своем маленьком закрытом сетевом контуре, все будет ок. Справедливости ради — все ок в 99.999% случаев. Впервые я об этом узнал вот в этом разделе доки.
Почему так?
Моя основная идея о причинах такого дизайна протокола AMQP в том, что этот протокол создали ребята из JPMorgan, а это прожженный финансовый сектор со своими специфическими потребностями. Смело можно предположить, что publish без подтверждения нужен для каких-нибудь реактивных финансовых данных, типа потока котировок финансовых инструментов, которые меняются по 10к раз в секунду, и мы можем отбросить часть такого потока. Допустим.
Тогда в протоколе должна быть другая часть, посвященная гарантированной доставке с подтверждениями. И да, она есть – транзакции.

Однако транзакции замедляют отправку в 250 раз (как говорит дока RMQ) и в целом заводить транзакцию под каждый publish сомнительное решение. К тому же в RMQ транзакции не поддерживают ACID и вообще пришиты сбоку как «простая форма батчинга» (дока). Именно в RMQ, не в AMQP. Но в спеке AMQP очень мало любви уделено описанию того, как должны работать транзакции.
Так что, предположим, так и задумано. По сути, транзакции — это не спасение, а рудимент, пришитый, чтобы удовлетворить спеке и всем говорить: «мы работаем по протоколу AMQP, да! мы полностью совместимы!». И я не советую использовать транзакции в RMQ, но если очень хочется, то можно изучить всю доку по транзакциям, и попробовать применить их в своём проекте.
Как это решили ребята из RabbitMQ
Решением послужил дополнительный режим работы канала, которого нет в спеке AMQP, но ребята дополнительно расширили протокол, добавив туда Confirmation семантику на basic.publish. Почитать опять же можно в доке.

По итогу получился Publisher Confirms — в целом неплохой рабочий вариант.
Пример: включаем подтверждения
Давайте посмотрим, как это выглядит на практике. Приведу для примера код на PHP (вот ссылка на именно этот экзампл), вот еще пример на Go.
Цель кода — показать, как включить режим Publisher Confirms и гарантировать доставку сообщений до RMQ.
Я тут приведу весь пример, но с вырезанными комментариями, для простоты.
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->set_ack_handler(
function (AMQPMessage $message) {
echo 'Message acked with content ' . $message->body . PHP_EOL;
}
);
$channel->set_nack_handler(
function (AMQPMessage $message) {
echo 'Message nacked with content ' . $message->body . PHP_EOL;
}
);
$channel->confirm_select();
$exchange = 'someExchange';
$channel->exchange_declare($exchange, AMQPExchangeType::FANOUT, false, false, true);
$i = 1;
$msg = new AMQPMessage($i, array('content_type' => 'text/plain'));
$channel->basic_publish($msg, $exchange);
$channel->wait_for_pending_acks();
while ($i <= 11) {
$msg = new AMQPMessage($i++, array('content_type' => 'text/plain'));
$channel->basic_publish($msg, $exchange);
}
$channel->wait_for_pending_acks();
$channel->close();
$connection->close();
Что здесь происходит
Коротко разберём, что делает код. Мы открываем соединение, включаем режим подтверждений (confirm_select) и публикуем сообщения.
Дальше вызываем wait_for_pending_acks() — он ждёт, пока брокер не пришлёт ответы (ack/nack) за все опубликованные сообщения.
На что стоит обратить внимание особо пристально:
confirm_select()— переводит наш канал в режим работы Confirmation. Теперь он начинает считать все отправленные сообщения, а канал на стороне RMQ считать все полученные и обработанные. Дальше на основе этого счетчика вы будете работать с подтверждениями.После пачки отправленных сообщений мы ждем ack’ов от сервера, ну а они собственно приходят в хэндлеры. Если не сделать хендлеров, то они все равно буду приходить и создавать их в целом не обязательно. Важно именно вызывать
wait_for_pending_acks(), он за нас подождет всего, ведь в нем живет счетчик отправленных сообщений.
На этом всё!
Теперь вы знаете все необходимое о режиме отправки сообщений с подтверждениями, вы великолепны :-)
Забавно, что ребятам потребовалось расширить протокол AMQP, чтобы добавить самый частый способ отправки сообщений в RMQ, чтобы оставаться в семантике at least once. Но с такими ситуациями мы привыкли сталкиваться повсюду, особенно в контексте Big design up front (BDUF) сервисов.
Спасибо за внимание. Кстати, у меня есть свой ламповый канал в телеграм, где я разгоняю про бэкенд, менеджмент, процессы разработки и не только.