На конференции Web 2.0 в 2006 году Marissa Mayer из Google указала на проблему, что дополнительные полсекунды задержки приводили к снижению поискового трафика примерно на 20%. Amazon сообщал о похожем эффекте: каждые дополнительные 100 мс уменьшали продажи примерно на 1%. 

Большие задержки времени отклика чаще можно встретить в аналитических SQL-запросах, так как запрос требует обработки больших блоков данных. Особенно сильно задержки влияют на клиентов с длительной историей покупок. Именно они чаще всего оказываются в верхних перцентилях времени отклика — а это те самые пользователи, которых компании меньше всего хотят терять. 

Конференция была проведена почти 20 лет назад, компьютерные технологии за это время стали демократичнее, что привело к увеличению количества пользователей и продуктов. Проблема задержек не исчезла — наоборот, она стала острее: чем больше информации накапливают сервисы, тем тяжелее становится её обработка. Чтобы справиться с нагрузкой, приходилось менять архитектурные подходы к хранению и обработке данных. В статье мы разберем один из них – event-driven design. 

Что это за подход и зачем он нужен?

По мере роста бизнеса нагрузка на данные становится критичной. Для онлайн-маркетплейса это могут быть персонализированные рекомендации, для финтех-проекта — антифрод, отслеживающий подозрительные транзакции. В обоих случаях данные нужно обрабатывать постоянно, приближаясь к обработке в реальном времени.

Но частый запуск аналитических SQL-запросов быстро перегружает базу: приходится фильтровать, выбирать и распределять строки из огромных блоков информации. Попытка «сохранить статистику заранее» тоже не решает задачу, потому что аналитика динамична — малейшее изменение условий способно полностью поменять стратегию сервиса. Поэтому долгое время стандартной практикой было выполнение запроса прямо при входе пользователя, с частичным кешированием результатов. Такая «ленивая» модель пересчёта (lazy calculation) плохо масштабируется.

Альтернативой стал event-driven design. Вместо того чтобы ждать запрос пользователя и пересчитывать данные с нуля, система реагирует на сами события — покупку, транзакцию, клик. Информация обновляется в момент её появления, и сервис всегда готов выдать свежие результаты без задержки.

Сначала разберем, как эволюционировали подходы к взаимодействию с базой данных. Разберем недостатки существующих средств по улучшению производительности.

Часть I. Оптимизация в OLTP базе данных

Можно ли как-то оптимизировать только с помощью базы данных? 

В случае самой простой обработки статистики, цепочка сервисов сохраняет каждое участвующее в анализе событие, совершенное пользователем или фоновым процессом, в базу данных. При необходимости получить результат, выполняется SQL запрос с группировкой и агрегированием значений. База данных выполняет поиск в снимке данных для нивелирования влияния параллельно выполняющихся транзакции на целостность выборки

С конкурентностью также связана существенная часть задержек из-за необходимости к синхронизации. При массовых аналитических запросах ситуация усугубляется: нагрузка на механизмы блокировок и контроль изоляции транзакций возрастает. В результате база данных тратит больше времени на координацию параллельных операций, выполняет повторные сканирования строк и дополнительную фильтрацию. 

Первым шагом к улучшению скорости запроса является планирование архитектуры самих таблиц. Данные таблицы должны быть нормализованы и спроектированы таким образом, чтобы минимизировать дублирование данных, упростить обновления, процедуру объединения и обеспечить логическую целостность, сохраняя баланс между степенью нормализации и производительностью запроса

Следующий пункт - индексы, представляющие собой вспомогательные структуры данных, созданные для ускорения поиска. Индексы являются вторичными отсортированными таблицами, содержащими ссылки на соответствующие строки основной таблицы. Благодаря структуре индекса, СУБД может выполнять фильтрацию и выборку данных быстрее, не сканируя всю таблицу полностью

Индексы не используются автоматически. Например их применимость зависит от селективности — отношения количества уникальных значений в столбце к общему числу записей. Чем ближе селективность к единице, тем выше вероятность, что индекс будет использован. Например, индекс по полю "ID" с уникальными значениями почти всегда полезен, тогда как по булевому полю — нет.

Кроме того, каждый оператор модификации в таблице с индексами требует их обновления, что ведёт к дополнительной нагрузке. Поэтому, несмотря на очевидную пользу в ряде случаев, индексы не являются универсальным решением и должны использоваться только при явной экономии ресурсов в большинстве запросов

Можно ли улучшить производительность переписав запрос используя альтернативные конструкции?

Для этого метода нет каких-то универсальных советов, каждый запрос должен рассматриваться индивидуально, исходя из особенностей вашей СУБД. Существуют общие методы оптимизации, как вспомогательная фильтрация, денормализация данных или создание временной таблицы для кеширования результатов. Написание эффективных запросов - навык,  требующий высокого опыта разработчика;  Переход на другую СУБД может свести на нет преимущества уже оптимизированных запросов, а сложность написанной реализации может превратить высоконагруженный участок кода в ящик пандоры любого релиза, поэтому, если это возможно, хорошей практикой является смещение ответственности по улучшению перформанса с разработчика на другое программное решение, заточенное под решение данной задачи

Что насчет внутреннего кеширования?

Современные реляционные СУБД используют встроенные механизмы ускорения — например, буферный кэш страниц и план-кэш для часто выполняемых запросов. Это действительно может улучшить время отклика, но полагаться только на внутренние алгоритмы базы не стоит: при росте нагрузки RPS рано или поздно просядет

Часть II. Аналитические базы данных

Аналитические запросы можно выполнять и в отдельной базе данных. Обновление данных и аналитические запросы — нагрузки разного характера, которые приводят к взаимным блокировками или просто тратят огромное количество ресурсов. Решений может быть несколько: переход на Read/Write реплики или использование OLAP хранилищ. Остановимся на последнем подробнее

Что такое OLAP хранилище и когда оно лучше?

OLTP база данных оптимизирована, как для записи, так и для чтения, поэтому чаще всего используется в production среде как основной инструмент управления данными. OLAP хранилище выигрывает при частом чтении, фильтрации, агрегации, но имеет существенный недостаток в скорости обновлений и вставки данных. Характер хранения данных может быть различен: несложные Data Lake хранилища, где данные хранятся в неструктурированом виде, а обработка данных происходит во время чтения данных, так и различные системы хранения данных в виде многомерных кубов, где в зависимости от запроса будут использованы конкретные плоскости, количество которых в сегодняшних реалиях технически может переходить за сотню.

Современные базы данных реального времени, такие как  ClickHouse, Apache Druid, StarRocks предлагают функционал и инструменты предобработки записей с помощью триггеров. Данные, поступая в одно из выше перечисленных хранилищ, могут обрабатываться материализованными представлениями и применять уже закешированным результатам. В этом случае при запросе пользователя, БД выдаст подготовленные данные, приводя время задержки к минимуму

Но как упоминалось выше, любые операции модификации довольно сложны в OLAP хранилищах, большинство аналитических движков изначально проектировались под модель, где данные проще добавлять, чем модифицировать. Это сильно влияет на характер использования подобных систем и накладывает архитектурные ограничения

Классический способ управления данными для архитектуры REST - CRUD предполагает частые добавления, обновления, удаления. В случае репликации данных из OLTP базы в OLAP без изменения структуры хранения записей, мы получим медленные операции модификации и, как следствие, увеличение интервала конечной согласованности двух баз данных

Часть III. Журнал событий

Статистические запросы часто не имеют явных временных границ, что приводит к полному сканированию записей в таблице. А в случае с плановыми, например квартальными, запросами модель представления данных не гарантирует, что данные не могли устареть из-за нового обновления уже существующих записей. 

Как следствие при поступлении новых данных в долгосрочной перспективе: 

  1. Время выполнения этих запросов будет статично возрастать 

  2. Большая часть данных будет обработана более одного раза

Ситуацию меняет переход на другую стратегию хранения данных – журнал событий. Структура гарантирует, что записи  не будут обновлены или удалены, а любое изменение порождает новую запись в базу данных, компенсирующее какое-либо действие 

Разберемся на практическом примере с балансами пользователей, проектируя представления с помощью принципов append-only журнала:

Вместо того, чтобы хранить в таблице пользователей значение баланса, мы будем определять его на основании операций с этим балансом. А данные будут представлять собой только изменение данных, а не их конечное состояние. 

На левой части рисунка, в таблице users содержатся уникальные по пользователю значения балансов. Данные в этой таблице обновляются командой SQL при любом изменении

На правой части, таблица transactions содержит только ивенты изменения:  пополнения и снятия средств. Сумма этих событий является актуальным балансом.

Таблицы для расчета баланса
Таблицы для расчета баланса

Преимущество данной структуры хранения – историчность данных. Записи всегда расположены в хронологическом порядке. Из этого следует, что взяв любую точку отсчета времени, меньшую чем текущая, мы сможем получить актуальное на тот момент значение поля. Каждое новое значение следует из неизменного предыдущего 

Фиксация изменений с ме��ками времени подсчета, позволяет сохранять результаты агрегации данных и переиспользовать их, вместо многочисленных пересчетов значений по всем доступным строкам из БД. Такой метод проектирования называется системой снимков данных.

Временной ряд ивентов
Временной ряд ивентов

Разберём на примере. Представим линию времени, на которой расположены события (events) в хронологическом порядке. Отметим на ней две контрольные точки — t₁ и t₂.

С начала линии до момента t₁ мы обработали события 1–6 и сохранили результат агрегации во вспомогательную таблицу. Данная запись является снимком состояния.

Когда наступает момент t₂, мы обрабатываем только новые события 7–11. Чтобы получить актуальное состояние, нам не нужно пересчитывать все события с 1 по 11. Достаточно взять сохранённый результат в точке t₁ и прибавить к нему агрегацию новых данных:

final_state = previous_agg + agg(events WHERE created_at > previous_agg.created_at)

Таким образом, мы двигаем временное окно и обрабатываем только «незафиксированные» изменения. Это снижает нагрузку: вместо постоянного пересчета всей истории система работает с небольшими частями данных. Актуальность при этом гарантируется тем, что уже зафиксированные события остаются неизменными.

Совмещая структуру append-only журнала с системой снимков данных, мы решаем проблему постоянного пересчета одних и тех же данных, а также статичного увеличения времени выполнения аналитических запросов: 

  1. Подсчитанные агрегации данных будут хранится в качестве снимков данных, вместо пересчета старых данных мы будем использовать ранее сохраненные результаты 

  2. Размер рассчитываемых данных остается неизменным или почти неизменным (в зависимости от политики их создания: по временным интервалам или количеству ивентов)

Также такой подход упрощает механизм репликации: изменения будут являться новыми строками в таблице, поэтому нет необходимости сканировать таблицу при каждом такте репликации. 

Подобная организация хранения данных лежит в основе современных стриминговых платформ. По нашему мнению, наиболее полно ее реализует Apache Kafka, которая выступает распределенным журналом событий с возможностью репликации и параллельной обработки группой потребителей с гарантией доставки ивентов

Часть IV. Kafka Stream и ksqlDB. Потоковая обработка событий

Второй продукт от Confluent - Kafka Streams был создан для обработки сообщений из топик Kafka, используя операции map, filter, aggregate, join, etc… Результаты этих агрегаций попадают в другую топику в виде сообщения, которое также можно использовать как источник для нового оператора. Такая модель позволяет организовывать пайплайны ивентов, ввиду чего KStreams активно используется, как дистрибьютер агрегированых сообщений между микросервисами

public class SimpleStreamApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> input = builder.stream("input-topic");
        KStream<String, String> transformed = input.mapValues(value -> value.toUpperCase());
        transformed.to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Обработка данных ведется с помощью потребления сообщений и группировки с помощью кода библиотеки. Официальная версия продукта доступна только на Java, что существенно усложняет использование пакета в проектах, использующих иной основной язык. Технология имеет высокий порог входа, изучение внутренних методов замедляет разработку, а сложные операции организации данных реализуются бок о бок с бизнес логикой сервиса. Kafka Streams была первой версией нового подхода к обработке ивентов, получившей признание и интерес со стороны сообщества разработчиков.

В 2017 году Confluent представила новое решение — Kafka SQL (ksqlDB), которое устранило ряд недостатков предыдущих подходов. Теперь запросы к данным выполняются не в пользовательском коде, а через развернутый сервис ksql-server (например, в Docker), используя HTTP-запросы. Для описания логики применяется SQL, лишь слегка отличающийся по семантике от классического. Это снижает порог вхождения: достаточно базовых знаний SQL, чтобы быстро начать работать с потоками данных.

При этом сохранились знакомые из Kafka Streams структуры данных:

Stream (поток) — неизменяемый конвейер событий. Пользователь задаёт схему с типизацией, и каждое сообщение, попавшее в Kafka-топик, записывается в соответствующие поля стрима. Если поле отсутствует в событии, Kafka трактует его как zero-value. В Kafka SQL нет обновления или удаления событий: каждое новое сообщение — это отдельный event, а модификация или удаление выражаются через компенсирующие события.

Table (таблица) — близка по сути к стриму, но хранит не всю последовательность событий, а только актуальное значение по ключу. Группировка задаётся пользователем при создании таблицы, с помощью конструкции PRIMARY KEY.

За счет типизированной структуры, каждое поле события сохраняется в свою колонку. С этого момента мы можем использовать широкий инструментарий запросов ksql: 

Выборка, Фильтрация,  объединения, агрегация, создания окон и арифметику над числовыми значениями

Более подробное описание можно прочитать в официальной документации

Часть V. KSQL на практике

Четыре месяца назад мы впервые попробовали запросить статистику с помощью KSQL и были удивлены удобством технологии. На тот момент не существовало ни одной библиотеки на нашем основном языке Golang, поэтому сначала мы использовали CURL запросы, а позже и вовсе решили написать свой пакет взаимодействия с сервисом. По мере разработки мы добавили привычные нам фичи по которым скучали перейдя на новую технологию: query builder, query reflection, CLI migrations. В дальнейших примерах мы будем использовать, как стандартные запросы, так и код нашей библиотеки, для реализации сложных примеров взаимодействия с KSQL

Разберем на примере

Представим, что мы занимаемся сервисом начислений бонусной программы пользователей. В качестве потока ивентов будет топика Kafka с названием purchases. Опишем структуру ивентов: 

type Purchase struct {
	ID           string  `ksql:"id"`
	CustomerID   string  `ksql:"customer_id"`
	SellerID     string  `ksql:"seller_id"`
	ProductID    string  `ksql:"product_id"`
	ShopID       string  `ksql:"shop_id"`
	Quantity     int     `ksql:"quantity"`
	Price        float64 `ksql:"price"`
	PurchaseDate string  `ksql:"purchase_date"`
}

Идентификаторы, такие как CustomerID, ProductID и т.д., ссылаются на записи в таблицах ksqlDB. Таблицы лучше всего подходят в этой ситуации, т.к. они будут хранить лишь последнее актуальное состояние и отправлять ивент только при изменении. 

Поток событий с покупками будет началом конвейера, 

Для дальнейшей работы, заведем поток bonus_invoices, который будет регистрировать ивенты изменения бонусного баланса пользователя. Для каждой покупки создадим «начисление бонусов» как 10% от суммы (quantity × price): 

bonusInvoices, err := streams.CreateStreamAsSelect[dtypes.BonusInvoice](
		ctx,
		BonusInvoicesStreamName,
		shared.StreamSettings{
			SourceTopic: PurchasesStreamName,
			Partitions:  1,
		},
		ksql.Select(
			ksql.F("id").As("payment_id"),
			ksql.F("customer_id").As("customer_id"),
			ksql.Mul(ksql.Mul(ksql.F("quantity"), ksql.F("price")), 0.1).As("amount"),
		).From(ksql.Schema(PurchasesStreamName, ksql.STREAM)),
	)

Ивенты об изменении баланса пользователей нужно куда-то записывать заведем таблицу bonus_balances, которая будет агрегировать поток изменений: 

bonusBalances, err := tables.CreateTableAsSelect[dtypes.BonusBalance](
		ctx,
		BonusBalancesTableName,
		shared.TableSettings{
			SourceTopic: BonusInvoicesStreamName,
			Partitions:  1,
		},
		ksql.
			Select(
				ksql.F("customer_id").As("customer_id"),
				ksql.Sum(ksql.F("amount")).As("balance"),
			).
			From(ksql.Schema(BonusInvoicesStreamName, ksql.STREAM)).
			GroupBy(ksql.F("customer_id")).
			EmitChanges(),
	)

Стоит упомянуть про push/pull queries. ksqlDB предоставляет возможность выполнения одноразового запроса (pull queries), который вернет значение один раз и закончит свое выполнение. Второй вариант – push query будет являться повисшим TCP-соединением, а результаты обработки будут возвращаться при каждом изменении.  Здесь мы это указали, использовав специальное слово EMIT CHANGES. Нам нужно считать баланс по всем изменениям баланса, и будущим тоже, а не высчитывать его один раз. 

Таблица бонусных балансов у нас реализована. Теперь реализуем подсчет уровней лояльности пользователей. Пусть будет 3 уровня:

  1. бронзовый  (бонусный баланс меньше 10_000)

  2. серебряный (бонусный баланс больше 10_000 и меньше 50_000)

  3. золотой  (бонусный баланс больше 100_000)

Эти три уровня рассчитываются в течение года, поэтому нам нужно реализовать механизм оконного подсчета балансов.  Это можно реализовать с помощью условных операторов и специального метода WINDOW: 

	bonusLevels, err := tables.CreateTableAsSelect[dtypes.BonusLevel](
		ctx,
		BonusLevelsTableName,
		shared.TableSettings{
			SourceTopic: BonusBalancesTableName,
			Partitions:  1,
		},
		ksql.Select(
			ksql.F("customer_id"),
			ksql.Case(
				"level",
				ksql.CaseWhen(ksql.F("balance").Less(10_000), "bronze"),
				ksql.CaseWhen(ksql.And(ksql.F("balance").Greater(10_000), ksql.F("balance").Less(100_000)), "silver"),
				ksql.CaseWhen(ksql.F("balance").Greater(100_000), "gold"),
			),
		).
			From(ksql.Schema(BonusBalancesTableName, ksql.TABLE)).
			Windowed(ksql.NewHoppingWindow(
				ksql.TimeUnit{Val: 365, Unit: ksql.Days},
				ksql.TimeUnit{Val: 1, Unit: ksql.Days},
			)),
	)

Таким образом мы построили конвейер работы с бонусной программой. Важно, что при дальнейшей разработке этот граф вычислений может довольно просто расширяться. У нас есть полноценный пакет ksql-examples, где вы можете посмотреть на более развернутые графы вычислений. 

Разработка библиотеки ведется и сейчас, на текущий момент она находится в ранней бета-версии. Доска с issues заполнена задачами по улучшению проекта и собрана по планам на дальнейшие релизы. Пакет открыт для всех желающих разрабатывать open-source продукт или же к объективному взгляду со стороны. Мы были бы рады любому обсуждению в комментариях.

Ссылка на проект

Источники: 

- https://glinden.blogspot.com/2006/11/marissa-mayer-at-web-20.html

- https://clickhouse.com/docs/best-practices/use-materialized-views

- https://www.confluent.io/product/ksqldb/

Комментарии (0)