Давайте представим ситуацию. Есть много сервисов (чаще всего это паттерн pub/sub), и обращаться к каждому приходится по порту. Возьмём для примера один сервис: порт приёма сообщений у него один, порт отдачи — другой. Умножаем два порта на количество сервисов — и получаем головную боль. Именно она и стала отправной точкой. Mail Pigeon (почтовый голубь) - это еще один воодушевляющий вызов для меня, для моего роста. Зачем концентрироваться на угрюмых облаках, если можно улыбнуться восходящему солнцу. К тому же ZeroMQ мне понравился с первого взгляда. Желание сделать что-то в этом направлении вызвано ещё и тем, что в своей практике я постоянно видел противоположный подход. Так что, это просто желание выговориться, что можно сделать по-другому. Желание сбросить мысленные оковы идеи, которая может днями сверлить мозг. Поэтому, когда я увидел проблему, я в тот же момент увидел решение.

Так появился Mail Pigeon — асинхронная клиент-серверная библиотека на ZeroMQ, которая позволяет сервисам адресовать сообщения друг другу по имени, а не по порту. Почтовый голубь, который доставит письмо даже в бурю. С гарантией доставки, автоматическим восстановлением после сбоев и end-to-end шифрованием.

Проблема: почему REST не всегда подходит для микросервисов

Когда несколько сервисов общаются друг с другом, чаще всего используют REST API. Это просто, понятно, а главное — привычно. Но у REST есть врождённые ограничения, которые становятся проблемой при росте системы:

  • Синхронность. Сервис А отправляет запрос сервису Б и ждёт ответа. Если сервис Б упал или тормозит, сервис А тоже встаёт. Каскадные отказы.

  • Нет гарантии доставки. Если между сервисами пропала сеть, HTTP-запрос просто упадёт с ошибкой. Данные потеряны. Повторять запрос нужно вручную.

  • Протокольные накладные расходы. HTTP хорош для браузера, но для машинного взаимодействия он избыточен: заголовки, статусы, keep-alive.

Мне хотелось получить нечто иное: лёгкий протокол, асинхронность, гарантию, что сообщение дойдёт, даже если получатель временно недоступен. И чтобы это работало как библиотека, а не как отдельный брокер вроде RabbitMQ, который нужно разворачивать и администрировать.

Почему ZeroMQ, а не RabbitMQ или Kafka

ZeroMQ — это не брокер сообщений в классическом смысле. Это библиотека, которая даёт вам низкоуровневые сокеты с высокоуровневыми паттернами: REQ/REP, PUB/SUB, ROUTER/DEALER и другие. Вы не поднимаете отдельный сервер — вы просто используете библиотеку в своём коде.

Для моей задачи я выбрал паттерн ROUTER/DEALER:

  • ROUTER — это серверная сторона. Он принимает сообщения от клиентов, знает их идентификаторы и может маршрутизировать сообщения между ними.

  • DEALER — это клиентская сторона. Он подключается к ROUTER и может отправлять и принимать сообщения асинхронно.

Эта связка позволяет построить брокер переадресации (резервный сервер в клиенте) прямо внутри вашего приложения. Один из клиентов становится сервером-маршрутизатором, остальные подключаются к нему. Никакого внешнего брокера не нужно.

Архитектурное решение №1: Гарантированная доставка

В ZeroMQ есть одна особенность: если клиент отправил сообщение, а получатель в этот момент отключился, сообщение просто теряется. Мне это не подходило.

Я реализовал At-Least-Once семантику — сообщение должно быть доставлено как минимум один раз. Дубли лучше, чем потеря. Как это работает:

  1. Клиент app1 отправляет сообщение для app2. Сообщение уходит серверу, но параллельно сохраняется в исходящую очередь клиента app1.

  2. Клиент app2 получает сообщение и автоматически отправляет подтверждение (ACK) обратно.

  3. Клиент app1 получает подтверждение и только тогда удаляет сообщение из своей очереди.

Если на любом этапе что-то пошло не так (пропала сеть, упал сервер или получатель), сообщение остаётся в очереди отправителя. Когда связь восстанавливается, отправитель заново пытается его доставить.

Для тестирования такого функционала мне пришлось продумать три типа теста.

  • Первый тип — обычный unit тест, ничего особенного, просто тест класса. Но даже в них пришлось использовать таймоут, чтобы добиться нужного эффекта.

  • Второй тип — это тестирования отправки сообщений между сервисами внутри отдельных подпроцессов. Здесь я хотел увидеть, как быстро могут данные отправляться между разными сервисами — как в зашифрованном виде, так и в расшифрованном. Также в этом тесте я смотрел как могут обогащаться данные, проходя по разным сервисам, как это бывает в реальной обыденности. Причём тот подпроцесс, с которого началась отправка сообщений, должен в конце получить полную цепочку обогащённых сообщений.

  • Третий тип — ручное интерактивное тестирование в консольных приложениях. Здесь я проверял, что будет, если одно из консольных приложений упадёт при отправке сообщений. Также в этом тесте я смотрел, как перемещается брокер переадресации между консольными приложениями. Если в одном консольном приложении брокер переадресации падал (я просто закрывал консольное приложение), то в другом консольном приложении он должен был запускаться и продолжать переадресацию сообщений. Так доставка была гарантированной между разными консольными приложениями.

Очереди сообщений — это сложнее, чем кажется

Но гарантированная доставка — это только полдела. Вторая половина — правильно организовать хранение сообщений, пока они ждут отправки или подтверждения. И вот тут начинается самое интересное. Каждое сообщение в библиотеке проходит через четыре состояния:

  • queue — сообщение ждёт отправки.

  • send_queue — сообщение отправлено, ждёт подтверждения.

  • wait_queue — входящее сообщение, адресованное конкретному ключу. Это нужно, чтобы получать нужные ответы из разных потоков тому, кто отправил сообщение.

  • send_waiting_queue — сообщение, которое не удалось отправить (получатель не в сети).

Управление этими состояниями вынесено в абстрактные классы BaseQueue и BaseAsyncQueue. А конкретные реализации — SimpleBox (in-memory очередь) и FilesBox (файловая очередь на диске) — лишь определяют, где физически хранятся данные.

Такой подход позволяет легко добавить очередь на Redis. Достаточно унаследоваться от BaseQueue и реализовать три метода. Вот как выглядит in-memory реализация:

from mail_pigeon.queue import BaseQueue

class SimpleBox(BaseQueue):
    def __init__(self):
        super().__init__()
        self._simple_box = {}

    def _save_data(self, key: str, value: str):
        self._simple_box[key] = value

    def _read_data(self, key: str) -> str:
        return self._simple_box[key]

    def _remove_data(self, key: str):
        if key in self._simple_box:
            del self._simple_box[key]

Вся логика состояний уже есть в базовом классе.

Архитектурное решение №2: Автоматический failover

Вторая проблема, которую я хотел решить, — отказоустойчивость. Что будет, если клиент-сервер (тот самый, который ROUTER) упадёт? Вся система встанет.

Я сделал так, чтобы клиенты могли автоматически поднимать резервный сервер у себя. За это отвечает параметр is_master:

  • is_master=True — клиент при запуске пытается стать сервером переадресации.

  • is_master=False — клиент никогда не становится сервером, только подключается к существующему.

  • is_master=None — клиент проверяет, есть ли сервер. Если да — подключается. Если нет — сам становится сервером.

В коде это выглядит так:

# app1 — запускается первым, становится сервером автоматически
client1 = MailClient('app1', is_master=None)
client1.wait_server()

# app2 — запускается вторым, подключается к серверу в app1
client2 = MailClient('app2', is_master=None)
client2.wait_server()

# Если app2 упадёт, app1 не заметит разрыва. Но при этом client1 заметит отключение client2.
# Если упадёт app1, app2 автоматически поднимет сервер у себя.

Каждый клиент мониторит состояние сервера. Если сервер перестаёт отвечать на heartbeat, клиент переходит в режим переподключения и при необходимости запускает свой сервер. Это позволяет системе самоисцеляться без внешнего оркестратора.

Сердцебиение системы: как не потерять друг друга

Отказоустойчивость требует быстрого обнаружения проблем. Если сервер упал, клиенты должны узнать об этом как можно скорее, чтобы запустить резервный сервер и переподключиться. Если клиент отключился, сервер должен уведомить остальных о его уходе.

Первый вариант, который я реализовал, был симметричным: и клиент, и сервер отправляли друг другу ping, и каждый ждал pong от другого. Логично, правда? Двухсторонняя проверка — надёжнее.

Но на практике этот подход дал сбой. В момент переподключения или при кратковременных задержках сети возникала гонка: клиент отправлял ping серверу, сервер не успевал ответить, клиент считал сервер мёртвым и начинал переподключение. Одновременно сервер слал свой ping клиенту, не получал ответа и удалял его из списка активных. В итоге сообщения, которые ещё можно было бы доставить, терялись. Система была слишком нервной.

Пришлось переосмыслить подход. Я сделал асимметричную модель:

  • Сервер регулярно отправляет echo-запросы всем клиентам. Это его единственная обязанность — проверять, живы ли они. Если клиент не отвечает в течение заданного интервала, сервер считает его отключившимся и уведомляет остальных.

  • Клиент ничего не отправляет серверу. Он только слушает. Если заданное время от сервера ничего не приходит — значит, сервер упал или канал оборвался. Клиент переходит в режим переподключения и, если нужно, запускает резервный сервер у себя.

Это решило проблему гонки. Теперь только одна сторона инициирует проверку, а вторая только реагирует на её отсутствие. Никаких лишних переподключений, никаких потерянных сообщений из-за ложного срабатывания.

Дополнительно сервер может отправить явный сигнал NOTIFY_STOP_SERVER, когда он завершает работу штатно (а не падает). Это позволяет клиентам мгновенно понять, что это не обрыв связи, и сразу запустить резервный сервер, не дожидаясь таймаута heartbeat.

Безопасность: два уровня защиты

В распределённой системе сообщения проходят через сервер-посредник. Если не подумать о безопасности, сервер может прочитать всё, что передают клиенты. Мне это не нравилось, поэтому я реализовал два уровня защиты:

Уровень 1: CurveZMQ (шифрование канала)

Это аналог TLS, встроенный в ZeroMQ. Каждый клиент имеет пару ключей: публичный и приватный. Сервер тоже имеет свою пару. При подключении происходит аутентификация и все данные шифруются на транспортном уровне. Это защищает от прослушивания трафика между клиентом и сервером, но не защищает от чтения сообщений на самом сервере.

# app1 — становится сервером, генерирует ключи
client1 = MailClient('app1', is_master=True, cert_dir='/certificate')
client1.wait_server()

# app2 — подключается с публичным ключом сервера
client2 = MailClient('app2', is_master=False, cert_dir='/certificate2')
client2.wait_server()

Уровень 2: End-to-End шифрование (HMAC)

Чтобы даже сервер не мог прочитать сообщения, предназначенные другому клиенту, я добавил сквозное шифрование на уровне клиента. Сообщение шифруется до отправки на сервер и расшифровывается после получения. Сервер видит только зашифрованный набор байтов.

from mail_pigeon.security import TypesEncryptors

encript = TypesEncryptors.HMAC('общий_пароль_группы')

# app1
client1 = MailClient('app1', is_master=True, encryptor=encript)

# app2
client2 = MailClient('app2', is_master=False, encryptor=encript)

# app1 и app2 могут общаться, их сообщения недоступны серверу.

Само шифрование реализовано на основе PBKDF2HMAC и Fernet из библиотеки cryptography. Пароль группы преобразуется в ключ, которым шифруется каждое сообщение.

class HMACEncryptor(IEncryptor):
    def __init__(self, secret_word: str):
        self.salt = b'pigeon'
        self.kdf = PBKDF2HMAC(
            algorithm=SHA256(), length=32, salt=self.salt, iterations=1000
        )
        self.key = base64.urlsafe_b64encode(self.kdf.derive(secret_word.encode()))
        self.cipher = Fernet(self.key)
    
    def encrypt(self, message: bytes) -> bytes:
        return self.cipher.encrypt(message)
    
    def decrypt(self, encrypted: bytes) -> bytes:
        return self.cipher.decrypt(encrypted)

Эти два уровня можно комбинировать: CurveZMQ защищает канал, а HMAC — содержимое. Получается двойная защита.

Я сторонник того, чтобы никто не смог прочитать личные сообщения сервиса. Я представлял ситуацию, в которой у Сервиса А потребуют данные других сервисов, а они окажутся настолько неразборчивой кашей, что станут бесполезны для дальнейшего анализа, у кого нет общего ключа группы. Сервис может нести ценную информацию, и её нужно защищать даже от самого сервера.

Отправить и забыть… или дождаться ответа?

Библиотека поддерживает два режима отправки сообщений:

Режим 1: «Отправить и забыть»

client1.send(recipient='app2', content='данные')
# Всё, пошли дальше. Сообщение ушло в очередь и будет доставлено, когда получится.

Это асинхронный режим. Вы не ждёте ответа, не блокируете поток. Сообщение гарантированно дойдёт (благодаря файловой очереди и ACK), но вы об этом не узнаете.

Режим 2: «Отправить и ждать ответа»

msg = client1.send(recipient='app2', content='запрос', wait=True)
# Блокируется до получения ответа от app2
print(msg.content)  # ответ

Здесь клиент ждёт, пока получатель не пришлёт ответное сообщение с тем же ключом. Если получатель отключился или сервер упал, ожидание прервётся и метод вернёт None. Можно задать таймаут.

Эта гибкость позволяет использовать библиотеку и для фоновых задач (уведомления, логирование), и для request-response сценариев (запрос данных от другого сервиса).

Заключение

Mail Pigeon — это не замена RabbitMQ или Kafka. Это нишевый инструмент для случаев, когда:

  • Вам нужна лёгкая шина данных без разворачивания отдельного брокера.

  • Важна гарантированная доставка даже при обрывах связи.

  • Нужна автоматическая отказоустойчивость без оркестратора.

  • Нужно сквозное шифрование сообщений.

Этот проект дал мне:

  • Понимание ZeroMQ и его паттернов.

  • Опыт реализации семантики доставки сообщений (At-Least-Once).

  • Навыки проектирования отказоустойчивых систем с автоматическим восстановлением.

  • Практику применения криптографии в реальном проекте.

У меня немало идей по развитию этой библиотеки, и я, возможно, вернусь к ним позже. Но пока проект выполнил свою главную задачу: показал, что распределённая шина данных не обязана быть тяжёлой, сложной в развёртывании и дырявой в плане безопасности. Лёгкий почтовый голубь долетел.

А ещё — после реализации этой идеи мне действительно полегчало. Надеюсь, и вам, прочитавшим эту статью, стало немного яснее.


Ссылки

Комментарии (0)