Автор статьи: Сергей Прощаев @sproshchaev
Руководитель направления Java‑разработки в FinTech
Сообщения в RabbitMQ — это основные единицы данных, которые передаются между продюсерами и потребителями. Понимание их структуры и возможностей позволяет эффективно управлять потоком данных в распределенных системах. В этой статье мы разберем анатомию сообщений, обязательные и опциональные компоненты, а также реализуем пример отправки объекта с настройкой свойств
1. Структура сообщения в RabbitMQ
Каждое сообщение состоит из трех частей: Тело сообщения (Body), Свойства (Properties) и Routing Key.
Тело сообщения (Body)
Тело сообщения — это обязательный элемент, который содержит основные данные, передаваемые через RabbitMQ. Оно представляет собой массив байт (binary data), что делает его универсальным контейнером для любой информации.
В нем можно передавать текст, JSON, XML, изображения и даже зашифрованные данные. При этом сам RabbitMQ не анализирует содержимое тела — это задача вашего приложения. Сервер лишь доставляет байты получателю.
Почему Тело Должно Быть Массивом Байт?
RabbitMQ работает на низком уровне, поэтому требует данных в формате, который можно передать по сети без потерь. Преобразование в байты гарантирует кросс‑платформенность, так как любой популярный язык программирования Java, Python или.NET умеет работать с байтами. Через массив байт можно передавать бинарные файлы (например, изображения), а так же это ускоряет обработку данных, так как байты всегда обрабатываются быстрее, чем строки.
Что можно рекомендовать?
обязательно согласовывайте форматы данных и используйте всегда один формат (например JSON/XML) во всей системе, чтобы избежать конфликтов;
контролируйте размер сообщений, ведь большие сообщения (свыше 1 МБ) замедляют работу RabbitMQ;
если необходимо передавать файлы, то лучше использовать ссылки.
добавляйте заголовки (об этом чуть позже) и указывайте, например: timestamp, user_id для удобства дальнейшей отладки;
всегда обрабатывайте ошибки и проверяйте, что тело сообщения не пустое и имеет ожидаемый формат.
Свойства (Properties)
Метаданные в RabbitMQ — это скрытая мощь, которая превращает простые очереди в гибкие системы. Они управляют всем: от маршрутизации до выживания сообщений при сбоях.
Метаданные — это служебная информация, которая сопровождает тело сообщения. Сами метаданные не являются обязательными, но с их помощью можно указать системе такие факторы как:
Маршрутизация — куда и как доставить сообщение,
Обработка — как именно интерпретировать данные, передаваемые в сообщении,
Надежность — какие гарантии доставки должны быть использованы для сообщения.
Давайте разберем основные метаданные, которые часто используются:
1. Параметр content_type
— это тип данных, который указывает формат тела сообщения. Это помогает потребителю правильно десериализовать данные.
Например в content_type можно указать один из типов: application/json, application/xml или text/plain.
2. Параметр content_encoding
— отвечает за кодировку и указывает, как закодированы данные. Зачем: Экономит трафик и ускоряет передачу.
Примером значения, указанного в content_encoding могут быть: utf-8 (текст) или gzip и deflate (если используется сжатие).
3. Параметр headers
— является незаменимым, если мы хотим указать значение переменных, которые планируем поместить в пользовательские заголовки
В качестве headers мы можем указывать произвольные пары ключ‑значение для кастомной логики. Это может быть как маршрутизация через обменник типа headers exchange, так и хранение любых меток, которые будут необходимы для работы с сообщением на стороне получателя. Например мы можем определить две переменные: source: 'web', env: 'prod'.
4. Параметр delivery_mode
— содержит настройку режима доставки и может принимать значения: 1 — не сохранять на диск (быстро, но ненадежно). 2 — сохранять на диск (гарантия доставки даже при падении сервера).
Параметр delivery_mode позволяет управлять надежностью доставки сообщения.
5. Параметр priority
— определяет приоритет сообщения. Его значение это целое число в диапазоне 0–255. Сообщения с высшим приоритетом обрабатываются первыми. Но это работает только с очередями, созданными с поддержкой приоритетов
6. И наконец параметр expiration
— задает время жизни (TTL). Если он задан, то сообщение удаляется через N миллисекунд, если не будет обработано. Это очень удобно для обработки временных данных, которыми могут являться одноразовые ссылки и тп.
Что можно рекомендовать при использовании перечисленных метаданных?
указывайте всегда content_type — иначе потребитель может неправильно интерпретировать данные;
используйте delivery_mode=2 только для критически важных данных, не злоупотребляйте этим параметром, так как запись на диск замедляет работу;
если вы передаете больше данные в формате JSON/XML, то сжимайте эти данные через content_encoding — это сократит размер сообщения в 2–3 раза;
будьте осторожны с приоритетами — высокий приоритет может «забить» очередь, если низкоприоритетные сообщения не обрабатываются.
Routing Key
И третья часть сообщения это ключ, который используется для маршрутизации сообщения через exchange. Эта часть сообщения обязательна, если обменник (exchange) его использует в своей работе.
Например, обменники типа direct или topic используют Routing Key, а для fanout и headers ключ маршрутизации игнорируется.
2. Пример отправки объекта Person в сообщении RabbitMQ
И традиционно после небольшой теории у нас всегда идет практический пример. Давайте, в лучших традициях курсов, создадим класс Person и на примере покажем что и как настраивается в сообщении.
Для начала нам нужно будет подключить зависимости. Мы будем использовать стартер Spring framework:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Теперь определим сам класс, экземпляр которого у нас будет являться полезной нагрузкой в сообщении RabbitMQ. Для этого будем использовать проект Lombok, который позволяет сократить часть шаблонного кода в Java:
import lombok.*;
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public class Person {
private String name;
private int age;
}
И давайте приведем пример конфигурационного класса, в котором определим основные настройки RabbitMQ:
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange exchange() {
return new DirectExchange("person_exchange");
}
@Bean
public Queue queue() {
return QueueBuilder.durable("person_queue")
.withArgument("x-dead-letter-exchange", "dlq_exchange")
.withArgument("x-max-priority", 10)
.withArgument("x-message-ttl", 10000)
.build();
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("person.key");
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
В этом классе мы создаем обменник типа Direct с именем person_exchange, который маршрутизирует сообщения в очередь по точному совпадению routing key.
Затем мы создаем «устойчивую очередь» person_queue в которой устойчивость заключается в том, что все сообщения будут сохраняться в случае перезагрузки RabbitMQ.
Далее добавляем аргумент x‑dead‑letter‑exchange, который определяет то, что если есть сообщения, которые не удалось обработать (например, после N попыток), будут отправлены в очередь Dead Letter через обменник dlq_exchange.
Также в настройки очереди присутствует аргумент x‑max‑priority со значением 10 — его назначение мы разберем чуть позже. И указываем параметр x‑message‑ttl, который задаёт максимальное время жизни сообщений в очереди (в миллисекундах). Если сообщение не будет обработано за это время, оно будет автоматически удалено.
После этого мы связываем очередь person_queue с обменником person_exchange, используя ключ маршрутизации person.key. И после этого сообщения, отправленные в person_exchange с ключом person.key, будут попадать в person_queue.
И наконец мы регистрируем конвертер, который автоматически сериализует объекты класса Person в JSON при отправке сообщений и десериализует JSON обратно в объекты при получении.
На третьем шаге мы создадим сервис, который будет отправлять экземпляры класса Person в сконфигурированную очередь RabbitMQ:
@Service
public class PersonProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendPerson(Person person) {
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
props.setPriority(5);
props.setExpiration("10000");
props.setHeader("user_id", "admin");
Message message = rabbitTemplate.getMessageConverter()
.toMessage(person, props);
rabbitTemplate.send("person_exchange", "person.key", message);
}
}
Класс PersonProducer
отправляет сообщения в RabbitMQ с тонкой настройкой метаданных. В нем используется RabbitTemplate который является ключевым классом в Spring AMQP для отправки и получения сообщений в RabbitMQ. Чтобы автоматически сериализовать объекты в JSON мы используем Jackson2JsonMessageConverter
, который ранее был настроен в RabbitConfig.
И давайте подробнее остановимся на настройках:
1) setDeliveryMode(MessageDeliveryMode.PERSISTENT)
— задает параметры режима доставки. В данном случае при MessageDeliveryMode.PERSISTENT
сообщение будет сохранено на диске RabbitMQ. Это гарантирует, что данные не потеряются при перезагрузке сервера. Еще можно выбрать значение MessageDeliveryMode.NON_PERSISTENT
— в этом случае сообщение хранится только в памяти брокера и если сервер RabbitMQ перезагрузится, сообщение будет потеряно. В случае MessageDeliveryMode.NON_PERSISTENT
отправка сообщения будет быстрее, так как не используется сохранение на диск, поэтому этот режим можно использовать для сценариев, где скорость важнее надежности.
2) priority
: 5 — сообщение получит приоритет 5 из диапазона значении от 0 до 255. И для того, чтобы это работало очередь должна поддерживать приоритеты и для этого мы использовали аргумент x‑max‑priority
в классе конфигурации.
Сам приоритет определяет, в каком порядке сообщения будут доставлены потребителям. Чем выше priority, тем раньше сообщение будет обработано. Например: сообщение с приоритетом 5 будет обработано раньше сообщения с приоритетом 3. Если приоритет не указан, используется значение по умолчанию (0).
3) expiration
: 10 000 — TTL отвечает за параметр Time‑To‑Live — это задает время жизни сообщения и в нашем случае сообщение удалится через 10 секунд, если не будет обработано. Если мы задаем TTL, то в настройках очередь мы должны указать то, что очередь поддерживает это (аргумент x‑message‑ttl
).
4) user_id
: admin — пример пользовательского заголовка, который можно использовать для обработки сообщения. В данном случае из этого заголовка мы поймем, что экземпляр класса Person, передаваемый в этом сообщении очевидно является администратором.
Если не требуется полный контроль над созданием Message — можно использовать метод convertAndSend
для сокращения кода:
@Service
public class PersonProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendPerson(Person person) {
rabbitTemplate.convertAndSend(
"person_exchange",
"person.key",
person,
message -> {
message.getMessageProperties()
.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setPriority(5);
message.getMessageProperties().setExpiration("10000");
message.getMessageProperties()
.setHeader("user_id", "admin");
return message;
}
);
}
}
В этом примере мы используем метод convertAndSend
вместо send и этот метод автоматически конвертирует объект Person в сообщение через Jackson2JsonMessageConverter
и при этом используем те же настройки для отправки нашего сообщения.
Заключение
Умение тонко настраивать отправку сообщений в RabbitMQ — это не просто технический нюанс, а основа надежной и эффективной распределенной системы. Каждая опция — будь то установка delivery_mode
для сохранения сообщений на диске, задание TTL для автоматического удаления устаревших данных или использование приоритетов для критичных задач — превращает хаотичный поток информации в предсказуемый и управляемый процесс.
Если вы работаете с распределёнными системами и хотите разобраться в тонкостях настройки RabbitMQ, приглашаем вас пройти небольшое тестирование. Оно поможет оценить ваше понимание ключевых механизмов работы брокера сообщений.
А также приглашаем на открытый урок «Оптимальные решения на RabbitMQ, или как Кролик превосходит Kafka», который пройдет 23 июля в 20:00. Обсудим архитектурные подходы, сравним популярные решения и разберём практические кейсы.
Чтобы оставаться в курсе самых актуальных технологий и трендов, подписывайтесь на Telegram-канал OTUS.