Привет! С вами Кабанов Олег — ведущий ML-инженер Flocktory.

В этой статье расскажу об опыте внедрения YDB в качестве хранилища для ML Online Feature Store. А также о том, как нам удалось ускорить загрузку данных в 40 раз и убрать влияние на скорость чтения данных при обновлении.

Предисловие

Наша первоначальная задача была такой: реализовать сервис, возвращающий фичи текущего пользователя по id, latency < 80ms, объём данных ~1TB, возможность гибко масштабировать на чтение и объём хранилища.

На конференции HighLoad++ мы подробно рассказали, почему решили выбрать в качестве хранилища YDB, а в этой статье хочется поделиться опытом оптимизации загрузки, так как это стало проблемой при внедрении.

Структура данных выглядела следующим образом:

CREATE TABLE features
(
    `customer_id` Int32 NOT NULL,
    `tag` Utf8 NOT NULL, -- название фичи: feature_a, feature_b...
    `valid_to` Datetime NOT NULL, -- данные должны пропадать при достижении valid_to
    INDEX idx_customer_id GLOBAL ON (customer_id), -- в запросе получаем все строки по customer_id
    PRIMARY KEY (`customer_id`, `tag`) -- дедупликация данных по двум полям
)
WITH (
    AUTO_PARTITIONING_BY_SIZE = ENABLED,
    AUTO_PARTITIONING_BY_LOAD = ENABLED,
    AUTO_PARTITIONING_PARTITION_SIZE_MB = 512, -- объём партиции при котором она будет разбита на несколько
    TTL = Interval("PT0S") ON `valid_to`
);

Запрос на получение данных упрощенно выглядел так:

DECLARE $customer_id AS int64;
SELECT AGGREGATE_LIST(tc.tag) AS tags 
    FROM features VIEW idx_customer_id as tc -- явно указываем индекс, иначе не будет использоваться
    WHERE tc.customer_id = $customer_id

С появлением все новых источников данных для ML Feature Store объем обновлений рос, время ответа сервиса перестало укладываться в 80ms, а загрузка длилась часами, что было неприемлемо.

Оптимизация 1

Количество партиций достигло 500, при загрузке данных требовались синхронизации между значительным количеством из них. Это вызывало рост издержек на CPU, latency росло, горизонтальное масштабирование не помогало. Мы попробовали снизить количество партиций сначала в два, потом в четыре раза. Вместо типичной загрузки в 1000 записей в секунду получили 4000! Нагрузка на CPU снизилась, но latency продолжал быть нестабильным.

Оптимизация 2

Оптимизация записи в четыре раза вдохновила нас на анализ query explain, и мы заметили, что скорость чтения с индексом idx_customer_id и без него практически не отличается, а данных на тот момент уже было около ~ 300GB. Оказалось, что SSTable хорошо ищет по первой части первичного ключа. Индекс занимал место, и мы решили удалить его. Также отказ от индекса должен был ускорить вставку. Запрос стал выглядеть так:

DECLARE $customer_id AS int64;
SELECT AGGREGATE_LIST(tc.tag) AS tags 
    FROM features as tc -- убрали индекс
    WHERE tc.customer_id = $customer_id

Удаление индекса:

ALTER TABLE features DROP INDEX idx_customer_id;

Каково было наше удивление, когда скорость загрузки выросла до 12 000 записей в секунду! Но скорость чтения во время заливки продолжала показывать спайки на графиках, нарушая SLA.

Оптимизация 3

Возникает вопрос: почему при загрузке дампов скорость записи достигает 50 000 строк в секунду, а в нашей базе данных при заметно меньшей скорости все шарды уже нагружены на 100% по CPU? В чем разница? Почему сокращение шардов так помогло? Мы пишем с помощью YDB Connector for Apache Spark, он пишет батчами по 500 строк, при этом в каждом batch запросе по ключу customer_id мы задеваем все шарды. Структура данных SSTable и шардирование в YDB навело на мысль, что лучше бы писать не во все шарды по 1-2 записи, а в одну все 500, а это проще всего сделать сортировкой данных перед вставкой.

Придумали, реализовали и через час получили скорость записи 40 000 строк в секунду, при этом никакого влияния на скорость чтения, latency стал стабильно < 80ms на 99.9 перцентиле.

Почему так? Дело в том, как данные уложены в партиции SSTable:

image-20250909-075509.png
image-20250909-075509.png

Данные упорядочены по первичному ключу, и если вставляемые строки уже отсортированы, то при большом объёме вставки один батч будет попадать в одну или две партиции. В таком случае транзакция потребует синхронизацию только нескольких партиций, а не всех в таблице. А одной партиции принять 500 строк, да ещё и отсортированных, не составляет труда. Вот почему дампы, снятые с БД, так хорошо загружаются — они уже отсортированы (брались из сортированной таблицы).

Выводы

С тех пор, как YDB была внедрена в Flocktory и мы провели эти оптимизации, прошло уже больше года, скорости загрузки и чтения нам достаточно по сей день, и к проблеме производительности мы более не возвращались. Шаги, которые мы прошли, могут показаться очевидными, но удовольствия от этого мы получили не меньше! Знание внутренней работы БД очень помогает, даже при типичном паттерне применения.

Комментарии (0)


  1. Vadik_prog
    13.09.2025 08:14

    Ускорение в 40 раз - достаточно большая цифра, это сравнение с начальным состоянием?то есть взяли базу и загрузку с дефолтными настройками и запустили процесс. Подход имеет право на жизнь - эволюционная оптимизация только по мере необходимости против достаточно затратного подхода предусмотреть все заранее для всех сценариев, которые могут и не реализоваться.


    1. kolegich Автор
      13.09.2025 08:14

      Добрый день! Да, вы верно уловили подход.

      Изначальный дизайн и тестирование дали нам хороший и ожидаемый на старте результат. Но когда данные существенно выросли (в 4 раза), мы столкнулись с повышенным потреблением CPU, которого не заметили при нагрузочном тестировании. Цифра 40x — это именно сравнение производительности до и после оптимизации под возросшие объемы, а не с самой первой реализацией.


  1. lazy_val
    13.09.2025 08:14

    Удалили индексы, сократили число партиций - скорость записи выросла. Результат не сказать что неожиданный ))

    На скорость чтения это как повлияло?


    1. kolegich Автор
      13.09.2025 08:14

      Вы правы, сам факт того, что после удаления лишнего всё ускорилось, действительно ожидаем. Но ключевая неожиданность была не в «что делать», а в «почему именно это было проблемой».

      Мы видели высокую утилизацию CPU и спайки латенси при записи, проблема оказалась архитектурная: когда в одном батче данные пишутся сразу во все партиции, это создает большую нагрузку на координатор (он должен атомарно управлять записью в каждую из них), это съедало все ресурсы CPU и не позволяло масштабироваться.

      Пока координатор был загружен на 80-90% этими мелочами, он точно так же не мог быстро отвечать и на запросы на чтение — отсюда и спайки латенси на всех операциях. Убрав такой паттерн нагрузки (сортировкой данных перед вставкой), мы радикально разгрузили CPU.

      Как помогла сортировка?
      Если взять несколько десятков тысяч строк и сделать сквозную сортировку, то в каждом батче по 500 строк окажутся самые близкие значения PK. Поэтому они попадут в одну или две партиции (SSTable формат).


      1. lazy_val
        13.09.2025 08:14

        Сокращаем количество партиций - уменьшаем потребление ресурсов координатором - избавляемся от latency. Тут понятно, спору нет.

        Удаляем индекс - экономим время на перестройке индекса при каждой операции записи - повышаем скорость записи. Тоже все понятно.

        Меня другое интересует. Вот загрузку данных (то есть запись) вы оптимизировали. Скорость чтения этих самых данных не пострадала у вас из-за этого? Потому что индексы и партицирование не просто так придумали, а для оптимизации операций чтения.


        1. kolegich Автор
          13.09.2025 08:14

          Тоже сначала так думали, поэтому и добавили вторичный индекс по customer_id при проектировании.
          В структуре данных SSTable все партиции отсортированы по первичному ключу, в нашем случае это customer_id+tag, значит данные упорядочены сначала по customer_id, а если customer_id одинаковый, то по tag.
          По части первичного ключа - по customer_id работает та же оптимизация как и по индексу первичного ключа (по полю tag уже так работать не будет). Поэтому вторичный индекс был излишним!

          Кстати, в YDB вторичный индекс может быть async, это хорошо поднимает производительность, когда можно пожертвовать консистентностью. Но координация вставки всё-равно съедает CPU.


  1. EmiAsk
    13.09.2025 08:14

    Привет! Спасибо за статью.

    что лучше бы писать не во все шарды по 1-2 записи, а в одну все 500, а это проще всего сделать сортировкой данных перед вставкой.

    можете раскрыть детали, каким образом сортировка перед вставкой 500 записей помогает писать все эти 500 записей в один шард, а не распылять по разным шардам по ключу шардирования?

    Окей, сортировкой мы сгруппируем записи так, что записи с одинаковым ключом шардирования будут идти подряд, но неясно каким образом записи попадут в один шард все 500


    1. kolegich Автор
      13.09.2025 08:14

      Спасибо за вопрос! Вы правы, они могут попасть не в одну партицию, а в несколько - 2 или 3.

      Мы пишем раз в N часов и данных накапливается за это время несколько десятков тысяч строк. Сортировка по первичному ключу (включающему ключ шардирования) группирует записи так, что в один батч попадают строки с близкими диапазонами ID.

      YDB использует диапазонное шардирование. Это значит, что непрерывный диапазон ключей часто обслуживается одной таблеткой (процессом на узле). Поэтому отсортированный батч из 500 записей попадает не в 500 разных партиций, а всего в несколько (1-3).

      Это снижает:

      • Сетевые издержки (батч летит на 1-2 узла, а не на 500)

      • Нагрузку на координатор (ему нужно управлять не сотней мелких транзакций, а одной-двумя)

      • Нагрузку на CPU