Привет, Хабр! Стриминговая обработка давно уже стала стандартом – агрегируем, соединяем, считаем прямо по ходу поступления данных. Но стоит задуматься о состоянии – как мы его храним? В Spark Structured Streaming (и не только) появилась возможность хранить промежуточное состояние с помощью RocksDB – очень быстрый встроенный key-value store, оптимизированный для флеш- и RAM-носителей. Само по себе появление RocksDB в экосистеме Spark – серьёзное событие: начиная с Spark 3.2 можно указать RocksDB в качестве бэкенда для state store.
Из коробки Spark Structured Streaming обеспечивает сквозную гарантию exactly-once благодаря checkpoint-логам и write-ahead логам, но это не избавляет от дубликатов в источниках с at-least-once семантикой. Так, Kafka и другие очереди могут присылать одни и те же сообщения повторно. Более того, могут быть потерянные события – например, если источник подтверждает получение раньше, чем Spark выполнил запись в состояние/синк. Как же поймать и обработать эти неприятности при помощи состояния RocksDB?
Ниже разберём на примерах, как настраивать Spark со state-стором RocksDB, как отсекать дубликаты при помощи встроенных функций (и помощью пользовательского стейта), а также как выявлять дырявые события (например, пропуски последовательностей) через mapGroupsWithState.
Зачем RocksDB в Spark
Раньше Spark Structured Streaming хранил состояние в по умолчанию виде: память + файловая система. С появлением RocksDB мы можем делать state-стор на базе встроенной БД. Напомним, что RocksDB – это библиотека от Facebook/META, написанная на C++. Она отлично подходит, когда данных состояния очень много и их не хочется держать целиком в памяти. Кроме Spark, RocksDB используют и другие фреймворки: например, Flink по умолчанию умеет передавать state в RocksDB, а Kafka Streams использует RocksDB для store-менеджмента.
Для Spark Structured Streaming включение RocksDB требуется явно указать. В конфиге SparkSession устанавливаем провайдер стейт-стора. В Spark 3.2+ это делается так:
val spark = SparkSession.builder()
.appName("RocksDBStateExample")
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
.getOrCreate()
Теперь все stateful-операции (группировки, mapGroupsWithState и т.п.) будут сохранять состояния в файлах RocksDB, а не только в памяти. По аналогии, в Databricks (или Lakehouse-пайплайнах) можно включить RocksDB через конфигурацию { "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider" }
.
Само по себе использование RocksDB даёт прирост устойчивости: во-первых, этот стор при коммите данных гарантирует сброс на диск (в Spark написано, что WriteOptions
выставляет флаг sync=true, чтобы не терять данные при сбое машины. Во-вторых, RocksDB хранит данные на диск в SST-файлах и WAL, а при рестарте восстанавливает из чекпоинта автоматически. Кстати, Spark использует для записи state commit (flush) эти опции WriteOptions.sync = true
, чтобы гарантировать отсутствие потери данных.
Итак, с технической стороны: RocksDB в Spark – это обычный state store, только куда тяжелее и длиннее, чем память. Spark при mapGroupsWithState
/update
/append
будет читать и писать в RocksDB через байтовые ключи и значения, используя собственные энкодеры. Для нас важно: с настроенным RocksDB state store можно безопасно сохранять очень большие состояния без опасений, что JVM ляжет от переполнения. Именно для таких случаев Databricks и рекомендует использовать RocksDB для stateful-стрима.
Ещё пару слов про сохранность. Spark Structured Streaming по умолчанию обещает end-to-end exactly-once (в микробатч режиме). Это значит, что если всё нормально сконфигурировано, то после сбоя пепель данных не появится и не пропадёт. Используются checkpoint на HDFS и write-ahead журналы. Мы же просто включаем RocksDB как state store, и Spark сам будет записывать состояние при каждом триггере. В случае перезапуска чекапойнты сработают, и RocksDB база восстановится в том состоянии, в котором она была при сбое.
Теперь рассмотрим, как на этом стейте решать проблемы дубликатов и потерянных событий.
Дубликаты в стриме и dropDuplicates
Первая проблема стриминга – дублированные события. Почти все брокеры дают at-least-once гарантию: сообщение может быть отправлено и получено несколько раз, если сетевая связь прервётся или при перезапуске продюсера. Даже если Spark пишет в конечную систему стабильно, источник может случайно скинуть топик назад. Поэтому стриминговый джоб надо учить не считать одну и ту же запись дважды.
Spark Structured Streaming накладывает exactly-once на свою сторону, но дубликаты на входе сам не удаляет. Чтобы убрать повторяющиеся записи, нужно самостоятельно это сделать, используя уникальный идентификатор события. Например, если у нас у каждой строки есть поле eventId
(GUID, ключ, последовательность), мы можем использовать dropDuplicates()
. Но для стрима есть нюанс: требуется привязать удаление дубликатов к watermark. Spark вводит понятие окна времени задержки, чтобы не хранить бесконечную историю всех виденных ID.
Простейший вариант – просто сбрасывать дубликаты в пределах некоторого окна по времени. Функция dropDuplicatesWithinWatermark(colNames: Seq[String])
делает именно это: вы ставите withWatermark("ts", "delay")
, потом указываете столбцы, по которым определяете уникальность. Всё, что поступает позже указанного задержки (старше watermark) будет считаться слишком поздним, а дубликаты внутри срока будут удаляться. Мы ожидаем дубликаты, если источник – at-least-once. Можно использовать dropDuplicatesWithinWatermark() по полю(ям) с id, чтобы отбросить повторяющиеся строки, поступающие в диапазоне, заданном watermark.
Пример кода: читаем из Kafka, ставим водяную метку по времени события и убираем дубликаты по полю eventId
:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
// Стрим из Kafka
val kafkaStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "input-topic")
.load()
// Предположим, что из Kafka приходит JSON с полями id и timestamp
val events = kafkaStream
.selectExpr("CAST(value AS STRING) AS json")
.select(from_json($"json", schema_of_json("""{"id":"", "value":0, "ts": ""}""")).as("data"))
.select($"data.id".as("eventId"), $"data.value", $"data.ts".cast("timestamp"))
// Применяем водяную метку по времени события и удаляем дубликаты по eventId:
val deduped = events
.withWatermark("ts", "5 minutes") // допускаем задержку до 5 минут
.dropDuplicates("eventId")
val query = deduped.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation", "/path/to/checkpoint") // чекпоинт тоже нужен!
.start()
query.awaitTermination()
Все сообщения с одинаковым eventId
, пришедшие в окне 5 минут, будут обрезаны. Как поясняет документация, все дубли, поступившие в пределах временного окна, будут отброшены. То есть до тех пор, пока запись с данным ID не считается устаревшей (прошло 5 минут с самого позднего обработанного времени), повторные входы просто не пройдут дальше.
Важно: мы сказали могут удалить дубликат, если важна уникальность ID. Иногда надо контролировать более сложные случаи: например, если в событии изменился какой-то дополнительный атрибут, но ID тот же – тогда можно использовать dropDuplicatesWithinWatermark(Seq("id", "ts"))
или комбинацию полей. Главное правило: нужно вовремя назначить watermark, иначе состояние будет расти бесконечно.
Таким образом, встроенные средства Spark позволяют просто сказать «запоминай ID, убирай копии»: метод dropDuplicates
(«сбросить дубликаты»). Плюс, как пишут, Spark сам не позволяет нам жадничать: если мы просто вызовем dropDuplicates("eventId")
без withWatermark
, то раннер упадёт с ошибкой – нужно обязательно указать withWatermark
для стриминга. На практике же dropDuplicates
работает, но только на скользящем окне: только внутри задержки.
Иной способ – самим отслеживать уже виденные ID в состоянии. Например, можно сделать mapGroupsWithState
, где ключ – это eventId, а состояние – флаг того, что этот ID уже встречался. Если событие зашло второй раз, мы его просто не вывели. Можно вернуть что-то особое в случае повторного появления (логировать предупреждение, например). Но это немного сложнее по коду. Пример отсева дубликатов:
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
// Разбираем поток как Dataset[Event], где Event(id: String, ts: Timestamp)
case class Event(eventId: String, ts: java.sql.Timestamp, value: Long)
// Считаем ключом eventId
val eventsDS = events.as[Event]
val dedupWithState = eventsDS
.groupByKey(_.eventId)
.mapGroupsWithState[Boolean, Event](GroupStateTimeout.NoTimeout()) {
case (id, iter, state: GroupState[Boolean]) =>
// state.getOption - содержит true, если этот ID уже был
if (state.exists) {
// Если уже виделись, ничего не возвращаем (или возвращаем маркер)
Iterator.empty
} else {
// Первый раз видим этот ID – "пропускаем" событие и помечаем
state.update(true)
iter
}
}
dedupWithState.writeStream
.format("console")
.option("checkpointLocation", "/path/to/checkpoint2")
.start()
Храним в state
булев флажок – увидели ли мы этот eventId
ранее. На первое появление state
пустой, мы возвращаем событие и выставляем флаг. На повторные – возвращаем пустой Iterator
, тем самым не пропуская запись дальше.
Итого, дубликаты в Spark можно ловить двумя способами: встроенным dropDuplicates
с водяной меткой или через mapGroupsWithState/flatMapGroupsWithState, где вы сами решаете логику удаления повторов. Оба способа надёжно работают в exactly-once режиме (вместе с чекпоинтингом) и записывают состояние в RocksDB, когда оно достигает дискового размера.
Потерянные события: детектируем пропуски
Если дубликаты – это повторка, то потерянные события – это дырки в последовательности данных. Например, представьте поток метрик от сенсора, где каждое событие имеет возрастание sequence number. Если Spark по каким-то причинам пропустил событие, мы этого не увидим глазами по итоговым данным. Однако состояние RocksDB можно использовать и для мониторинга таких ситуаций.
Сценарий: у нас есть ключ (device ID) и по нему приходит поток событий с полем seqNum
– сквозная нумерация. Мы хотим, чтобы если текущий seqNum
> последнего + 1, то где-то должен залогироваться «пропущено n событий».
Сделаем это через mapGroupsWithState
. Для каждого ключа будем хранить последнее обработанное seqNum
. Когда получим новые события, пройдём по ним в порядке прибытия и сравним. Если обнаружим, что текущий нумер выше ожидаемого, пометим пропуски.
import org.apache.spark.sql.{Dataset, Encoders}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.GroupStateTimeout
// Допустим, schema_of_json уже описан (с полями deviceId, seqNum, eventTime)
case class SensorEvent(deviceId: String, seqNum: Long, eventTime: java.sql.Timestamp)
val sensorStream = spark.readStream
.format("kafka") // читаем из Kafka
.option("subscribe", "sensor-topic")
.load()
.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", Encoders.product[SensorEvent].schema).as("data"))
.selectExpr("data.deviceId", "data.seqNum", "data.eventTime")
.as[SensorEvent]
// Группируем по устройству и сохраняем состояние с последним seqNum
val detected = sensorStream
.groupByKey(_.deviceId)
.flatMapGroupsWithState[Long, String]( // выход строки-уведомления
outputMode = OutputMode.Append(),
timeoutConf = GroupStateTimeout.NoTimeout()
) { case (deviceId, eventsIter, state: GroupState[Long]) =>
var lastSeq = state.getOption.getOrElse(0L)
val alerts = scala.collection.mutable.ListBuffer.empty[String]
for (ev <- eventsIter) {
if (lastSeq != 0L && ev.seqNum > lastSeq + 1) {
// Пропуски найдены
val missed = ev.seqNum - lastSeq - 1
alerts += s"Device $deviceId lost $missed events between $lastSeq and ${ev.seqNum}"
}
if (ev.seqNum <= lastSeq) {
// Появился "старый" или повторный номер
alerts += s"Device $deviceId out-of-order or duplicate seq ${ev.seqNum}"
}
// Обновляем последний seq
if (ev.seqNum > lastSeq) lastSeq = ev.seqNum
}
state.update(lastSeq)
alerts.toIterator
}
detected.writeStream
.format("console")
.option("checkpointLocation", "/path/to/checkpoint3")
.start()
Делаем flatMapGroupsWithState
, потому что на один ключ может прийти сразу несколько событий (поэтому flatMap
). Состояние – простое число Long
, последний seqNum
по этому ключу. Когда новые события приходят (они идут по событийному времени или по приходу, как попадёт), мы для каждого смотрим: если ev.seqNum > lastSeq + 1
– то послали прошлые события (пропуски), формируем предупреждение. Если ev.seqNum <= lastSeq
, значит либо дубликат, либо пришло событие слишком старое (out-of-order), и тоже сигналим. После прохождения всех событий состояния групп, обновляем state
на максимальный lastSeq
.
Таким образом, RocksDB будет хранить для каждой «deviceId» последнее seq, а код будет генерировать алерты для пропущенных и задвоенных. Процесс устойчив к сбоям (Spark выкатит state из чекпоинта) и продолжится.
Разумеется, можно усложнить: проверять таймауты (если события долго не приходят), или обновлять состояние при таймауте групп (используя GroupStateTimeout
), чтобы удалять старые ключи. Но основная идея в том, что любой кастомный сценарий проверки можно вписать в mapGroupsWithState
или flatMapGroupsWithState
.
Итак, мы научились ловить lost-события, имея у себя на руках состояния по ключам. Если бы мы не хранили seq, а просто группировали и выводили count или max, то о пропуске можно было и не узнать. State-стор здесь выступает как регистр last-known, и при его помощи мы диагностируем аномалии в потоке.
Стоит упомянуть ещё пару моментов:
Watermarks и удаление состояния. Мы не задали водяную метку во втором примере, значит состояние для каждого устройства будет жить в RocksDB бесконечно (пока работает джоб). Если устройств миллионы и неактивные тоже остаются, нужно ставить timeout или водяные метки, чтобы очищать спящие состояния. Например,
withWatermark("eventTime", "1 hour")
вместе сGroupStateTimeout.ProcessingTimeTimeout
позволит автоудалять state, если за час ничего не пришло.Производительность. MapGroupsWithState с RocksDB достаточно надёжна, но может быть чуть медленнее, чем встроенные агрегаты. Однако стоит оно того, когда надо именно логировать пропуски! Всегда можно настроить
spark.sql.shuffle.partitions
и другие параметры, чтобы оптимизировать процесс, но это тема для отдельной статьи.
Итоги
Итак, мы рассмотрели, как использовать RocksDB-стейт в Spark Structured Streaming для борьбы с потерянными и дублированными событиями. Подытожим моменты:
Конфигурация: не забудьте включить RocksDB state store, иначе Spark по умолчанию будет юзать Memory/HDFS provider. Например, через
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
. Databricks-подобные среды могут иметь свои классы, но принцип тот же.Exactly-Once: Spark Structured Streaming уже по умолчанию обеспечивает end-to-end exactly-once Это значит, можно рассчитывать, что после выключения и повторного запуска источники и слики восстановят согласованный статус. По сути, система сама запомнит, какие оффсеты были обработаны и какое состояние в RocksDB – никаких привычек оставлять дубликаты при восстановлении.
Дубликаты: для удаления повторов используйте
dropDuplicates
илиdropDuplicatesWithinWatermark
, как рекомендует документация. Это очень простой способ вырезать повторы по ключу, не придумывая собственного стейта. Но если нужна более сложная логика (или например ID не приходит в записи, но можно вычислить), применяем stateful-группировки.Пропущенные события: их не уберёшь, но их можно обнаружить. Если вы присылаете пронумерованные события, вполне реальный сценарий – сравнивать номера и фиксировать нестыковки.
RocksDB в продакшене: не забывайте, что RocksDB – это полноценная БД, и ей можно задавать размеры кеша, уровни, включать мониторинг. Spark собирает метрики RocksDB (коммиты, put/get latency и т.п.), так что следите за ними, чтобы понимать, не переполняется ли диск. Если процессов state очень много, можно рассмотреть включение Changelog Checkpointing (в Databricks 13.3+), чтобы ускорить чекпоинты, сохранять только дельту изменений.
Стилевые советы: обрабатывайте исключения, вызывайте
state.remove()
для завершённых сессий, очищайте старый state (например, черезGroupState.remove()
при форсированном таймауте). В наших примерах мы упрощали реализацию, но более сложные задачи требуют учёта всех краёв: выключений, рестартов, изменений схемы (скажем, добавление поля вEvent
). Изменение схемы state вmapGroupsWithState
требует осторожности: изменение типа стейта или timeout запрещено Spark.
Стриминговая разработка – это всегда немного игра в зеркала: код кажется простым, но нужно помнить о безопасности данных. Тут на помощь и приходит RocksDB: он заботится о сохранности ваших состояний, а вы уже думаете про то, какие события считать дубликатами и как реагировать на неполные последовательности.
В сухом остатке: используйте state-чекапинг, эмбеддинговые хранилища и встроенные дедупликаторы – и ни одно событие вас не удивит.
Работа со стримингом и состоянием в Spark редко даётся «из коробки» — всегда появляются дубликаты, пропуски или узкие места в производительности. Чтобы держать руку на пульсе, важно знать не только приёмы борьбы с проблемами, но и понимать, куда движется сам Spark и как выжимать максимум из его экосистемы. Приходите на бесплатные уроки:
9 сентября в 20:00 — Что нового в Spark 4.0
17 сентября в 20:00 — Как ускорить обработку данных в Apache Spark: проверенные техники и лайфхаки
Кроме того, пройдите вступительный тест, чтобы оценить свой уровень и узнать, подойдет ли вам программа углубленного курса по самым мощным инструментам обработки больших данных.
Ninil
А причем тут RocksDB? Разве все пункты кроме "Конфигурация" не реализуемы без RocksDB. В общем тема RocksDB не раскрыта от слова "совсем".