Привет! Меня зовут Дима Кривопальцев, я тимлид бэкенд‑команды Яндекс Диска (Яндекс 360). Уже больше семи лет я занимаюсь разработкой высоконагруженных распределённых систем — и в статье расскажу об одной из них.

В Яндекс 360 есть сервисы с очень большими нагрузками — и по RPS, и по объёму хранимых данных, и по числу обрабатываемых асинхронных задач. Именно последняя часть — асинхронная обработка — будет в центре этого рассказа.

Тема может показаться немного провокационной: речь пойдёт об очередях поверх SQL‑баз, а в сообществе такое решение принято считать антипаттерном — и на это есть основания. На конференциях и в статьях обычно можно услышать скепсис: «Очередь на PostgreSQL? Не стоит даже пытаться». Действительно, подобных попыток было много, и почти все сталкивались с типовыми проблемами — от блокировок до деградации производительности.

Тем не менее, в реальности у многих крупных компаний всё равно есть свои очереди, построенные поверх SQL‑баз — как PostgreSQL, так и MySQL. Это решение встречается и в российских, и в зарубежных командах. Яндекс Диск здесь не исключение — у нас тоже есть своя реализация, о которой сегодня и пойдёт речь.


Зачем строить очередь на PostgreSQL

Чтобы понять, почему мы вообще выбрали этот путь, разберём задачу, которую нужно было решать. Внутри Яндекс Диска много процессов, которые должны выполняться асинхронно. Например:

  • уведомить пользователя, если у него загрузился файл, чтобы десктопное приложение начало синхронизацию;

  • выполнить длительные операции, вроде копирования больших папок с тысячами файлов.

Для всех таких сценариев нужна единая инфраструктура — система, которая умеет ставить и выполнять асинхронные задачи.

По сути, нам нужно проделать всего две операции:

  1. Положить задачу в очередь, указав приоритет.

  2. Взять задачу с наивысшим приоритетом для выполнения.

Такую систему можно назвать и очередью, и шедулером задач — названия разные, суть одна. Мы называем её очередью, имея в виду именно приоритетную очередь задач. 

Для понимания масштаба: сегодня в Яндекс Диске выполняются десятки тысяч задач в секунду, причём задач сотен разных типов: от массовых уведомлений до редких фоновых операций. Хотелось, чтобы всё это работало на одном фреймворке, чтобы разработчикам не приходилось думать о реализации, — просто ставь задачу и забирай результат.

Почему нам не подошли классические очереди

Прежде чем говорить о своей реализации, стоит вспомнить классические очереди.
Kafka, RabbitMQ, SQS и другие — отличные решения, которые мы тоже активно используем там, где они уместны. Например, для добавления событий в раздел «История действий» в интерфейсе Диска: кто загрузил файл, кто поделился папкой и так далее Здесь важен строгий порядок событий, и классическая очередь с приоритетом по времени добавления идеально подходит. Но в нашем случае таких сценариев оказалось недостаточно.

Самая очевидная причина использовать очередь поверх PostgreSQL — это отложенные задачи, которые должны выполняться через заданное время.

Типичные примеры:

  • Показать пользователю «Воспоминания» через три дня — уже сгенерированный контент нужно просто опубликовать в нужный момент.

  • Через минуту подтянуть данные из смежной системы — особенно полезно для сервисов вроде Диска, где данные часто приходят батчами: пользователь загружает не одну фотографию, а целую серию, и их удобно синхронизировать разом.

  • Повторить задачу через 10 минут, если предыдущая попытка завершилась ошибкой.

Такие сценарии, конечно, можно реализовать и в классических очередях — например, добавив отдельные топики в Kafka. Но когда сотни разных задач и у каждой своё время отложенного выполнения — от секунд до десятков минут, — количество топиков начинает расти лавинообразно. Управлять этим зоопарком становится неэффективно, и хочется чего‑то гибкого, где время исполнения можно задавать прямо на уровне одной задачи.

Рассмотрим более сложный сценарий: кластер, который обрабатывает асинхронные задачи, перегружен. Что от него требуется:

  • Очередь должна удерживать десятки и сотни миллионов задач, исходя из нашей нагрузки — десятки тысяч задач в секунду.

  • Важно, чтобы всё это время система продолжала работать без деградации производительности.

  • При этом самые важные задачи должны выполняться в первую очередь, даже когда кластер почти перегружен.

Можно попробовать классический подход: каждой задаче присвоить статический приоритет — например, «важные», «обычные» и «неважные». У такого подхода есть недостатки:

  • Если приоритетов мало, при перегрузке страдает самая большая группа задач — «обычные».

  • Если приоритетов много и мы пытаемся строго выстроить 200+ задач по важности, каждое добавление новой задачи усложняет процесс, и приходится пересматривать систему приоритетов.

В итоге стандартная схема приоритизации часто приводит к тому, что задачи «обычной» группы выполняются с задержками, что нежелательно.

Delay tolerance — гибкий подход к приоритетам

Решение, которое мы использовали, основано на концепции Delay tolerance. Delay tolerance — это время, которое задача может подождать до начала исполнения без негативных последствий для системы или пользователя.

  • Задачи с низкой задержкой (малой Delay tolerance) обрабатываются в первую очередь.

  • Задачи с большой задержкой могут подождать, позволяя системе работать стабильно даже при перегрузке.

Такой подход позволяет одновременно учитывать важность задачи и текущую нагрузку, обеспечивая гибкую приоритизацию и избегая узких мест при высокой частоте задач.

Если подойти к приоритетам с этой точки зрения, легко понять, что задачи из одной категории могут сильно различаться по важности. Например, в Яндекс Диске есть два типа пуш‑уведомлений:

  • Срочный пуш, влияющий на синхронизацию десктопного приложения, — его нужно отправить как можно быстрее.

  • Несрочный пуш, например уведомление о новой подборке, — если он задержится на 30 минут, пользователь всё равно будет доволен.

Используя Delay tolerance, можно корректно расставлять приоритеты.
Пример:

  • Оранжевая задача создана в 10:05, Delay tolerance — 1 секунда.

  • Чёрная задача создана в 10:00, Delay tolerance — 10 секунд.

По классической схеме, казалось бы, чёрная задача важнее, так как создана раньше. Но с учётом Delay tolerance оранжевая задача становится приоритетнее — её нужно выполнить раньше. Такой кастомный приоритет помогает справляться с перегрузками кластера и обеспечивает гибкую обработку задач.

Если требуется кастомизация, которой нет в классических решениях, — нужны очереди поверх SQL‑баз. Примеры таких возможностей:

  • Отложенные задачи с гибкими настройками.

  • Гибкие приоритеты для больших нагрузок и большого числа задач.

  • Любые уникальные фичи, критически важные для конкретного сервиса.

Если эти возможности вам действительно необходимы — стоит задуматься о собственном решении поверх SQL‑базы.

Важные нюансы SQL-очередей

Когда мотивация ясна, важно помнить и о рисках:

  1. Производительность. SQL‑база даёт блокировки, конкуренция между воркерами снижает скорость обработки.

  2. Bloat. Строки, удалённые из таблицы, но ещё не собранные autovacuum, создают мёртвый груз и мешают работе.

  3. Дублирование задач на воркерах. Если запросы, уровни изоляции или транзакции настроены неправильно, одна задача может попасть на несколько воркеров и выполняться параллельно. А в очереди важно, чтобы каждая задача выполнялась только один раз.

Если строить очередь поверх PostgreSQL (или другой SQL‑базы), все запросы будут пишущими, то есть попадать на мастер‑ноду. Реплики нужны лишь для отказоустойчивости при падении мастера, чтобы было куда переключиться. Масштабироваться через репликацию нельзя: единственный способ — шардирование базы, что непросто и требует аккуратного проектирования.

Стоит учитывать и задержки. В PostgreSQL можно собрать схему по пуш‑модели, когда воркеры сразу получают задачи. Но при росте нагрузки такой подход плохо масштабируется. В реальности приходится использовать полинг с интервалом, что неизбежно создаёт задержки между выборками задач.

Создавая собственное решение, необходимо учитывать ряд повседневных задач, которые в готовых очередях уже есть:

  • ретраи;

  • управление приоритетами;

  • мониторинг;

  • администрирование.

То есть даже если ваша цель — кастомные функции вроде отложенных задач или гибких приоритетов, инфраструктуру и сервисную часть всё равно нужно разрабатывать.

Если кастомизация действительно необходима, можно попробовать собрать свою очередь и посмотреть, как это работает на практике. 

Построение очереди на PostgreSQL: опыт Яндекс Диска

Для кастомных очередей используется отдельная база данных, потому что паттерн нагрузки специфичен — большинство запросов изменяющие, в отличие от основной системы, где преобладают чтения. Разделение облегчает масштабирование и администрирование, снижает риск проблем с основной системой.

Мы создали библиотеку, которая обеспечивает все гарантии работы очереди:

  • каждая задача выполняется только одним воркером в один момент времени;

  • разработчику прикладных задач не нужно заботиться о механике очереди — он пишет только бизнес‑логику;

  • рядом с кодом задачи хранятся настройки: тайм‑ауты, ретраи, политики дедупликации и другие параметры.

Основной флоу работы выглядит так:

  1. Приложение создаёт задачу и вставляет её в базу (PostgreSQL).

  2. Воркер забирает задачу, выполняет её, обновляет статус и сохраняет результат обратно в базу.

  3. Админка обращается к базе, показывает текущее состояние задач и их статусы (об админке здесь подробно не говорим, но она необходима для полноценного продакшн‑решения).

Для начала будем использовать простую модель:

  • Ready — задача готова к исполнению, воркер может её забрать.

  • Running — задача выполняется.

  • Completed — задача успешно выполнена.

  • Failed — задача не выполнена по какой‑либо причине.

На первом этапе создадим базовую таблицу с минимальным набором колонок. По мере добавления новых фич структура может расширяться, но базовая логика уже закладывает фундамент для всей очереди.

Каждая задача в нашей очереди имеет уникальный идентификатор и человекочитаемое имя — например, «отправить пуш» или «скопировать файлы». Ей необходимы параметры для успешного выполнения: кому отправить уведомление, что и куда копировать и так далее. Ещё есть статус, который мы обсуждали ранее, и поле Schedule time — время, когда задачу можно начать исполнять. В простейшей реализации это время создания задачи. Также важно знать, какой воркер сейчас выполняет задачу, поэтому у нас есть поле Worker ID.

Выборка задач воркерами

С минимальным набором полей определились, теперь переходим к самой сложной части: выборке задач воркерами. Здесь кроются две критические проблемы — производительность и атомарность. Нужно минимизировать время блокировок и гарантировать, чтобы одна задача не попала на несколько воркеров одновременно.

Для этого мы используем конструкцию SELECT ... FOR UPDATE SKIP LOCKED. Она блокирует выбранные строки и пропускает те, что уже заблокированы, позволяя нескольким воркерам работать параллельно. Если заблокированных строк много, их всё равно нужно обрабатывать, тратя ресурсы. Поэтому мы решили обновлять статус задачи прямо в запросе выборки, минимизируя время блокировки и обеспечивая атомарность.

SELECT
     id
 FROM job
 WHERE
      task = ? AND status = ? AND schedule_time <= now()
 LIMIT ?
 FOR UPDATE SKIP LOCKED

Как только воркер выбирает задачу, её статус меняется на Running, фиксируются время начала, воркер и другие параметры, и запрос возвращает все поля, чтобы воркер знал, что именно он заасайнил.

UPDATE job SET
 	status = ?, start_time = ?, finish_time = ?, worker_id = ?
 FROM (
     	SELECT id
     	FROM
         	UNNEST(ARRAY[?, ?], ARRAY[?, ?]) t (task, lim),
         	LATERAL (
             	SELECT
                 	…
             	FROM job
             	WHERE
                 	task = t.task AND status = ? AND schedule_time <= ?
             	LIMIT t.lim::bigint
             	FOR UPDATE SKIP LOCKED
             	) j
        	LIMIT ?
 	) found
 WHERE job.id = found.id RETURNING *

Крутая особенность нашей реализации — воркер может одновременно выбирать задачи разных типов одним запросом. Достаточно передать массив имён задач, например send push, copy files и другие, указать лимит для каждого типа, и система вернёт готовый набор задач. Так мы эффективно решили проблему блокировок и параллельного выполнения, сделав выборку максимально быстрой и безопасной.

Приоритизация

Приоритизация у нас была ключевой темой с самого начала. В очереди мы используем числовой priority, который играет роль аналогов «важной», «обычной» и «неважной» задач. Ещё есть Delay tolerance — время, в течение которого задача может ждать выполнения, и Schedule time. Если Delay toleranceдля задачи не задан, используем Schedule time как индикатор срочности — такие задачи считаем критичными и выполняем сразу.

Реализовать сортировку задач оказалось проще, чем можно было ожидать. Нужно было лишь построить индексы и в запросе добавить сортировку: сначала по приоритету, а затем по Delay tolerance (или Schedule time, если Delay tolerance нет). Таким образом мы обеспечиваем корректное выполнение самых важных задач, и приоритеты удаётся побороть буквально парой строк кода.

UPDATE job SET
 	status = ?, start_time = ?, finish_time = ?, worker_id = ?
 FROM (
     	SELECT id
     	FROM
         	UNNEST(ARRAY[?, ?], ARRAY[?, ?]) t (task, lim),
         	LATERAL (
             	SELECT
                 	id, priority, delay_tolerance_time, schedule_time
             	FROM job
             	WHERE
                 	task = t.task AND status = ? AND schedule_time <= ?
             	ORDER BY priority DESC, COALESCE(delay_tolerance_time, schedule_time)
             	LIMIT t.lim::bigint
             	FOR UPDATE SKIP LOCKED
             	) j
         	ORDER BY priority DESC, COALESCE(delay_tolerance_time, schedule_time)
        	LIMIT ?
 	) found
 WHERE job.id = found.id RETURNING *

Но приоритеты — это только часть задачи. Второй важный аспект — задержки во время полинга. Простая схема, когда воркер забирает несколько задач, выполняет их и только потом делает новый запрос, ведёт к простаиванию ресурсов. В промежутках между полингами воркеры простаивают, хотя могли бы работать. Нам нужна равномерная загрузка, чтобы использовать ресурсы максимально эффективно, без дополнительных серверов.

Для этого библиотека, которая управляет очередью, умеет предсказывать, сколько задач стоит вытащить за раз. Если вытащено больше задач, чем можно сразу выполнить, лишние хранятся в локальной очереди воркера. Это особенно важно для быстрых задач, например пуш‑уведомлений, которые выполняются за десятки или сотни миллисекунд. Небольшой переизбыток задач можно кешировать локально и начать их выполнение сразу, не делая новый запрос к базе.

Долгие задачи, такие как копирование больших папок, кешировать не имеет смысла. 

Логику обработки разных типов задач нужно было реализовать в библиотеке, чтобы эффективно использовать ресурсы для быстрых задач и корректно обрабатывать долгие.

Для этого нам понадобился дополнительный статус Starting. Он обозначает задачи, которые воркер собирается выполнить в ближайшее время, но пока ещё не начал. Задачи с этим статусом лежат в локальной очереди, ожидая свободного ресурса для перехода в Running. Этот статус помогает отслеживать задачи и быстро перераспределять их на другой воркер, если с текущим что‑то случилось, минимизируя простой и задержки.

Дедупликация и ретраи

После того как мы научились управлять задержками и определять, сколько задач воркер должен выбрать, нам понадобилась дедупликация. Задача должна существовать в очереди только в одном экземпляре, чтобы не возникало параллельного выполнения одной и той же работы. Для этого мы используем поле active_uid — уникальный ключ дедупликации, иногда его называют ключом идемпотентности.

В коде мы определяем, из каких полей формируется этот ключ. При создании новой задачи проверяем её уникальность: если задача с таким ключом уже существует, действуем в соответствии с выбранной политикой. Например, при миграции пользователя с одного шарда на другой уникальной сущностью является ID пользователя. Независимо от других параметров, по одному пользователю может быть только одна задача. Аналогично для копирования: уникальными считаются пользователь, исходная и целевая папки. Так мы можем разрешать параллельное выполнение разных операций для одного пользователя, но не дублировать идентичные задачи.

Политики работы с дубликатами разные. Чаще всего оставляют всё как есть и не создают повторную задачу. Есть более продвинутые сценарии: можно смёржить параметры двух задач или объединять их, если задача ещё в статусе Ready и никто её не взял. Есть и перспективная политика, которую мы пока не реализовали: запускать задачу повторно после того, как она уже завершила исполнение. Это удобно для периодических синхронизаций между системами, когда важно, чтобы задача выполнилась ещё раз после поступления уведомления о необходимости синхронизации, чтобы никакие изменения не потерялись.

Следующий ключевой механизм — ретраи. Без него невозможно корректно обрабатывать ошибки. Каждая задача хранит счётчик попыток в колонке Attempt. Если задача не выполнилась, но её фейл не считается терминальным, мы рассчитываем новый Schedule time и пробуем снова. Так как наш запрос к базе уже учитывает Schedule time, задачи с будущей датой корректно остаются в очереди.

Политика ретраев хранится в коде, что позволяет гибко настраивать поведение. Можно использовать постоянную задержку, экспоненциальный или линейный рост времени между попытками, а также любые комбинации этих вариантов или любую другую политику, которую можно написать кодом на основе счётчика attempt. Такой подход позволяет создавать практически любую стратегию ретраев, подходящую именно для ваших задач.

Отказоустойчивость и масштабирование

После того как мы разобрались с ретраями, встаёт важный вопрос отказоустойчивости. Представим ситуацию: задачи находятся в статусах Starting или Running, и воркер, который их исполняет, внезапно падает. Без дополнительных механизмов эти задачи просто зависнут — их никто не подхватит, и они будут потеряны. Это, конечно, неприемлемо, поэтому нужно продумать, как с этим работать.

Существует несколько подходов. Самый простой и долгое время рабочий — запускать крон‑задачу, которая будет периодически сканировать очередь, находить «потерянные» задачи и сбрасывать их статус в Ready. Это легко реализовать, но для очень быстрых задач, например пушей, такая периодичность может оказаться слишком медленной.

Более гибкий подход заключается во введении поля expected_status_change_time. В него записывается момент, до которого статус задачи должен измениться. Если воркер не успел выполнить задачу до этого времени — значит, что‑то пошло не так, и другой воркер может её безопасно подхватить. Для коротких задач это работает прекрасно: тайм‑аут небольшой, и система быстро реагирует на падение воркера. Главное здесь — иметь надёжные тайм‑ауты и аккуратно настроенные запросы, чтобы не возникла ситуация, когда две копии одной задачи выполняются одновременно.

После настройки отказоустойчивости наступает этап масштабирования. Чтобы система могла обрабатывать десятки тысяч задач в секунду, требуется шардирование. В нашей реализации база шардирована с помощью consistent hashing: все данные разбиваются на большое количество партиций, и каждой партиции присваивается конкретный шард. Такая схема упрощает балансировку нагрузки, но накладывает свои требования: при вставке нужно проверять, не находится ли партиция в процессе миграции.

Миграция данных выглядит так: сначала на старом шарде помечается, что партиция перевозится, затем данные быстро копируются на новый шард, и только после этого запись удаляется со старого шарда. Так мы поддерживаем консистентность и одновременно минимизируем влияние миграций на текущие операции.

Работа с Bloat и долгими транзакциями

Следующий важный момент — Bloat в PostgreSQL. К сожалению, здесь нет волшебного решения: помогает только агрессивный VACUUM и ANALYZE. В нашей базе они запускаются примерно раз в две‑три минуты, и при таком ритме база чувствует себя прекрасно. PostgreSQL позволяет настроить приоритет этих процессов, и для нашей нагрузки мы выставили его примерно вдвое выше рекомендаций — это помогает базе держать нужное качество работы.

Если взглянуть на конфигурацию серверов, ключевыми для нас оказываются не столько CPU и RAM, сколько сеть и производительность дисков. Наши базы Master Replicaset обрабатывают десятки тысяч транзакций в секунду, и все изменения идут на мастер, реплики служат только для отказоустойчивости. Один шард весит примерно 70 ГБ, и за час в нём происходит больше изменений, чем занимает вся база. Конечно, при такой нагрузке DBA не обрадуются, но при правильно настроенном вакууме и анализе всё работает стабильно.

Теперь про возможные проблемы. Первая, классическая — это долгие транзакции. Если задача содержит очень большой объём данных, миллионы символов, то время транзакции растёт, а с ним растёт и Bloat. PostgreSQL не может своевременно удалять старые версии строк, и база начинает «раздуваться». Растёт время всех запросов, увеличивается использование диска — в итоге система может просто остановиться. Борьба с этим простая по концепции: не писать долгих запросов и настроить тайм‑ауты для транзакций и отдельных стейтментов.

Второй интересный случай — неожиданная деградация после миграций данных. Допустим, мы быстро переносим данные с одного шарда на другой, таблица на исходном шарде пуста, но физически старые строки ещё лежат на диске. В этот момент вакуум и анализ не успевают отработать, и база оказывается полностью забита Bloat. 

Любой запрос, даже простой SELECT, вынужден сканировать десятки гигабайт удалённых, неактуальных строк, что приводит к огромной нагрузке на диск и CPU. Ситуация выглядит невинно, но фактически это серьёзная проблема, с которой можно столкнуться, если не учитывать особенности работы PostgreSQL на больших объёмах данных.

Итоги и выводы

Главное, что я хочу донести: если для вашей задачи хватает классических очередей — используйте их. Иногда достаточно немного подкостылить, добавить пару топиков, и всё заработает. В этом случае вы избежите всех сложностей, о которых мы сегодня говорили.

Если же есть сценарий, который с помощью стандартной очереди не решить, тогда можно задуматься о кастомном решении на PostgreSQL. Сегодня я рассказал о ключевых моментах, на которые стоит обращать внимание: как строить очередь, расставлять приоритеты, управлять дедупликацией и ретраями, контролировать отказоустойчивость и масштабируемость, бороться с Bloat и долгими транзакциями.

Важно понимать, что есть масса нюансов, о которых вы узнаете только на практике. Всё это требует времени, разработки и тщательного тестирования, иначе система работать не будет. Поэтому перед тем как браться за такое решение, трезво оцените, стоит ли игра свеч и приложенных усилий.

Комментарии (1)


  1. antonb73
    09.12.2025 07:45

    Интересная статья, рно у меня есть несколько примечаний.

    С терминами путаница.
    - Очередь это буфер с политикой обработки FIFO
    - Kafka, Rabbit MQ - это реализация архитектурного паттерна Message Broker, в котором FIFO это скорее второстепенная задача, частный случай, который может соблюдатся если есть только один потребитель на очередь.

    Не существует никаких классических, стандартных очередей. Лучше не надо употреблять эти термины у них есть строго определённое значение. например, когда мне говорят стандартный что то там, я прошу ссылку на стандарт - люди недоумевают :)

    Потом допишу остальное, сейчас надо уйти.