Привет, Хабр! Стриминговая обработка давно уже стала стандартом – агрегируем, соединяем, считаем прямо по ходу поступления данных. Но стоит задуматься о состоянии – как мы его храним? В Spark Structured Streaming (и не только) появилась возможность хранить промежуточное состояние с помощью RocksDB – очень быстрый встроенный key-value store, оптимизированный для флеш- и RAM-носителей. Само по себе появление RocksDB в экосистеме Spark – серьёзное событие: начиная с Spark 3.2 можно указать RocksDB в качестве бэкенда для state store.

Из коробки Spark Structured Streaming обеспечивает сквозную гарантию exactly-once благодаря checkpoint-логам и write-ahead логам, но это не избавляет от дубликатов в источниках с at-least-once семантикой. Так, Kafka и другие очереди могут присылать одни и те же сообщения повторно. Более того, могут быть потерянные события – например, если источник подтверждает получение раньше, чем Spark выполнил запись в состояние/синк. Как же поймать и обработать эти неприятности при помощи состояния RocksDB?

Ниже разберём на примерах, как настраивать Spark со state-стором RocksDB, как отсекать дубликаты при помощи встроенных функций (и помощью пользовательского стейта), а также как выявлять дырявые события (например, пропуски последовательностей) через mapGroupsWithState.

Зачем RocksDB в Spark

Раньше Spark Structured Streaming хранил состояние в по умолчанию виде: память + файловая система. С появлением RocksDB мы можем делать state-стор на базе встроенной БД. Напомним, что RocksDB – это библиотека от Facebook/META, написанная на C++. Она отлично подходит, когда данных состояния очень много и их не хочется держать целиком в памяти. Кроме Spark, RocksDB используют и другие фреймворки: например, Flink по умолчанию умеет передавать state в RocksDB, а Kafka Streams использует RocksDB для store-менеджмента.

Для Spark Structured Streaming включение RocksDB требуется явно указать. В конфиге SparkSession устанавливаем провайдер стейт-стора. В Spark 3.2+ это делается так:

val spark = SparkSession.builder()
  .appName("RocksDBStateExample")
  .config("spark.sql.streaming.stateStore.providerClass", 
          "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
  .getOrCreate()

Теперь все stateful-операции (группировки, mapGroupsWithState и т.п.) будут сохранять состояния в файлах RocksDB, а не только в памяти. По аналогии, в Databricks (или Lakehouse-пайплайнах) можно включить RocksDB через конфигурацию { "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider" }.

Само по себе использование RocksDB даёт прирост устойчивости: во-первых, этот стор при коммите данных гарантирует сброс на диск (в Spark написано, что WriteOptions выставляет флаг sync=true, чтобы не терять данные при сбое машины. Во-вторых, RocksDB хранит данные на диск в SST-файлах и WAL, а при рестарте восстанавливает из чекпоинта автоматически. Кстати, Spark использует для записи state commit (flush) эти опции WriteOptions.sync = true, чтобы гарантировать отсутствие потери данных.

Итак, с технической стороны: RocksDB в Spark – это обычный state store, только куда тяжелее и длиннее, чем память. Spark при mapGroupsWithState/update/append будет читать и писать в RocksDB через байтовые ключи и значения, используя собственные энкодеры. Для нас важно: с настроенным RocksDB state store можно безопасно сохранять очень большие состояния без опасений, что JVM ляжет от переполнения. Именно для таких случаев Databricks и рекомендует использовать RocksDB для stateful-стрима.

Ещё пару слов про сохранность. Spark Structured Streaming по умолчанию обещает end-to-end exactly-once (в микробатч режиме). Это значит, что если всё нормально сконфигурировано, то после сбоя пепель данных не появится и не пропадёт. Используются checkpoint на HDFS и write-ahead журналы. Мы же просто включаем RocksDB как state store, и Spark сам будет записывать состояние при каждом триггере. В случае перезапуска чекапойнты сработают, и RocksDB база восстановится в том состоянии, в котором она была при сбое.

Теперь рассмотрим, как на этом стейте решать проблемы дубликатов и потерянных событий.

Дубликаты в стриме и dropDuplicates

Первая проблема стриминга – дублированные события. Почти все брокеры дают at-least-once гарантию: сообщение может быть отправлено и получено несколько раз, если сетевая связь прервётся или при перезапуске продюсера. Даже если Spark пишет в конечную систему стабильно, источник может случайно скинуть топик назад. Поэтому стриминговый джоб надо учить не считать одну и ту же запись дважды.

Spark Structured Streaming накладывает exactly-once на свою сторону, но дубликаты на входе сам не удаляет. Чтобы убрать повторяющиеся записи, нужно самостоятельно это сделать, используя уникальный идентификатор события. Например, если у нас у каждой строки есть поле eventId (GUID, ключ, последовательность), мы можем использовать dropDuplicates(). Но для стрима есть нюанс: требуется привязать удаление дубликатов к watermark. Spark вводит понятие окна времени задержки, чтобы не хранить бесконечную историю всех виденных ID.

Простейший вариант – просто сбрасывать дубликаты в пределах некоторого окна по времени. Функция dropDuplicatesWithinWatermark(colNames: Seq[String]) делает именно это: вы ставите withWatermark("ts", "delay"), потом указываете столбцы, по которым определяете уникальность. Всё, что поступает позже указанного задержки (старше watermark) будет считаться слишком поздним, а дубликаты внутри срока будут удаляться. Мы ожидаем дубликаты, если источник – at-least-once. Можно использовать dropDuplicatesWithinWatermark() по полю(ям) с id, чтобы отбросить повторяющиеся строки, поступающие в диапазоне, заданном watermark.

Пример кода: читаем из Kafka, ставим водяную метку по времени события и убираем дубликаты по полю eventId:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}

// Стрим из Kafka
val kafkaStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker:9092")
  .option("subscribe", "input-topic")
  .load()

// Предположим, что из Kafka приходит JSON с полями id и timestamp
val events = kafkaStream
  .selectExpr("CAST(value AS STRING) AS json")
  .select(from_json($"json", schema_of_json("""{"id":"", "value":0, "ts": ""}""")).as("data"))
  .select($"data.id".as("eventId"), $"data.value", $"data.ts".cast("timestamp"))

// Применяем водяную метку по времени события и удаляем дубликаты по eventId:
val deduped = events
  .withWatermark("ts", "5 minutes")   // допускаем задержку до 5 минут
  .dropDuplicates("eventId")

val query = deduped.writeStream
  .outputMode("append")
  .format("console")
  .option("checkpointLocation", "/path/to/checkpoint")  // чекпоинт тоже нужен!
  .start()

query.awaitTermination()

Все сообщения с одинаковым eventId, пришедшие в окне 5 минут, будут обрезаны. Как поясняет документация, все дубли, поступившие в пределах временного окна, будут отброшены. То есть до тех пор, пока запись с данным ID не считается устаревшей (прошло 5 минут с самого позднего обработанного времени), повторные входы просто не пройдут дальше.

Важно: мы сказали могут удалить дубликат, если важна уникальность ID. Иногда надо контролировать более сложные случаи: например, если в событии изменился какой-то дополнительный атрибут, но ID тот же – тогда можно использовать dropDuplicatesWithinWatermark(Seq("id", "ts")) или комбинацию полей. Главное правило: нужно вовремя назначить watermark, иначе состояние будет расти бесконечно.

Таким образом, встроенные средства Spark позволяют просто сказать «запоминай ID, убирай копии»: метод dropDuplicates («сбросить дубликаты»). Плюс, как пишут, Spark сам не позволяет нам жадничать: если мы просто вызовем dropDuplicates("eventId") без withWatermark, то раннер упадёт с ошибкой – нужно обязательно указать withWatermark для стриминга. На практике же dropDuplicates работает, но только на скользящем окне: только внутри задержки.

Иной способ – самим отслеживать уже виденные ID в состоянии. Например, можно сделать mapGroupsWithState, где ключ – это eventId, а состояние – флаг того, что этот ID уже встречался. Если событие зашло второй раз, мы его просто не вывели. Можно вернуть что-то особое в случае повторного появления (логировать предупреждение, например). Но это немного сложнее по коду. Пример отсева дубликатов:

import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}

// Разбираем поток как Dataset[Event], где Event(id: String, ts: Timestamp)
case class Event(eventId: String, ts: java.sql.Timestamp, value: Long)

// Считаем ключом eventId
val eventsDS = events.as[Event]

val dedupWithState = eventsDS
  .groupByKey(_.eventId)
  .mapGroupsWithState[Boolean, Event](GroupStateTimeout.NoTimeout()) {
    case (id, iter, state: GroupState[Boolean]) =>
      // state.getOption - содержит true, если этот ID уже был
      if (state.exists) {
        // Если уже виделись, ничего не возвращаем (или возвращаем маркер)
        Iterator.empty
      } else {
        // Первый раз видим этот ID – "пропускаем" событие и помечаем
        state.update(true)
        iter
      }
  }

dedupWithState.writeStream
  .format("console")
  .option("checkpointLocation", "/path/to/checkpoint2")
  .start()

Храним в state булев флажок – увидели ли мы этот eventId ранее. На первое появление state пустой, мы возвращаем событие и выставляем флаг. На повторные – возвращаем пустой Iterator, тем самым не пропуская запись дальше.

Итого, дубликаты в Spark можно ловить двумя способами: встроенным dropDuplicates с водяной меткой или через mapGroupsWithState/flatMapGroupsWithState, где вы сами решаете логику удаления повторов. Оба способа надёжно работают в exactly-once режиме (вместе с чекпоинтингом) и записывают состояние в RocksDB, когда оно достигает дискового размера.

Потерянные события: детектируем пропуски

Если дубликаты – это повторка, то потерянные события – это дырки в последовательности данных. Например, представьте поток метрик от сенсора, где каждое событие имеет возрастание sequence number. Если Spark по каким-то причинам пропустил событие, мы этого не увидим глазами по итоговым данным. Однако состояние RocksDB можно использовать и для мониторинга таких ситуаций.

Сценарий: у нас есть ключ (device ID) и по нему приходит поток событий с полем seqNum – сквозная нумерация. Мы хотим, чтобы если текущий seqNum > последнего + 1, то где-то должен залогироваться «пропущено n событий».

Сделаем это через mapGroupsWithState. Для каждого ключа будем хранить последнее обработанное seqNum. Когда получим новые события, пройдём по ним в порядке прибытия и сравним. Если обнаружим, что текущий нумер выше ожидаемого, пометим пропуски.

import org.apache.spark.sql.{Dataset, Encoders}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.GroupStateTimeout

// Допустим, schema_of_json уже описан (с полями deviceId, seqNum, eventTime)
case class SensorEvent(deviceId: String, seqNum: Long, eventTime: java.sql.Timestamp)

val sensorStream = spark.readStream
  .format("kafka") // читаем из Kafka
  .option("subscribe", "sensor-topic")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .select(from_json($"value", Encoders.product[SensorEvent].schema).as("data"))
  .selectExpr("data.deviceId", "data.seqNum", "data.eventTime")
  .as[SensorEvent]

// Группируем по устройству и сохраняем состояние с последним seqNum
val detected = sensorStream
  .groupByKey(_.deviceId)
  .flatMapGroupsWithState[Long, String](   // выход строки-уведомления
    outputMode = OutputMode.Append(),
    timeoutConf = GroupStateTimeout.NoTimeout()
  ) { case (deviceId, eventsIter, state: GroupState[Long]) =>
    var lastSeq = state.getOption.getOrElse(0L)
    val alerts = scala.collection.mutable.ListBuffer.empty[String]
    for (ev <- eventsIter) {
      if (lastSeq != 0L && ev.seqNum > lastSeq + 1) {
        // Пропуски найдены
        val missed = ev.seqNum - lastSeq - 1
        alerts += s"Device $deviceId lost $missed events between $lastSeq and ${ev.seqNum}"
      }
      if (ev.seqNum <= lastSeq) {
        // Появился "старый" или повторный номер
        alerts += s"Device $deviceId out-of-order or duplicate seq ${ev.seqNum}"
      }
      // Обновляем последний seq
      if (ev.seqNum > lastSeq) lastSeq = ev.seqNum
    }
    state.update(lastSeq)
    alerts.toIterator
  }

detected.writeStream
  .format("console")
  .option("checkpointLocation", "/path/to/checkpoint3")
  .start()

Делаем flatMapGroupsWithState, потому что на один ключ может прийти сразу несколько событий (поэтому flatMap). Состояние – простое число Long, последний seqNum по этому ключу. Когда новые события приходят (они идут по событийному времени или по приходу, как попадёт), мы для каждого смотрим: если ev.seqNum > lastSeq + 1 – то послали прошлые события (пропуски), формируем предупреждение. Если ev.seqNum <= lastSeq, значит либо дубликат, либо пришло событие слишком старое (out-of-order), и тоже сигналим. После прохождения всех событий состояния групп, обновляем state на максимальный lastSeq.

Таким образом, RocksDB будет хранить для каждой «deviceId» последнее seq, а код будет генерировать алерты для пропущенных и задвоенных. Процесс устойчив к сбоям (Spark выкатит state из чекпоинта) и продолжится.

Разумеется, можно усложнить: проверять таймауты (если события долго не приходят), или обновлять состояние при таймауте групп (используя GroupStateTimeout), чтобы удалять старые ключи. Но основная идея в том, что любой кастомный сценарий проверки можно вписать в mapGroupsWithState или flatMapGroupsWithState.

Итак, мы научились ловить lost-события, имея у себя на руках состояния по ключам. Если бы мы не хранили seq, а просто группировали и выводили count или max, то о пропуске можно было и не узнать. State-стор здесь выступает как регистр last-known, и при его помощи мы диагностируем аномалии в потоке.

Стоит упомянуть ещё пару моментов:

  • Watermarks и удаление состояния. Мы не задали водяную метку во втором примере, значит состояние для каждого устройства будет жить в RocksDB бесконечно (пока работает джоб). Если устройств миллионы и неактивные тоже остаются, нужно ставить timeout или водяные метки, чтобы очищать спящие состояния. Например, withWatermark("eventTime", "1 hour") вместе с GroupStateTimeout.ProcessingTimeTimeout позволит автоудалять state, если за час ничего не пришло.

  • Производительность. MapGroupsWithState с RocksDB достаточно надёжна, но может быть чуть медленнее, чем встроенные агрегаты. Однако стоит оно того, когда надо именно логировать пропуски! Всегда можно настроить spark.sql.shuffle.partitions и другие параметры, чтобы оптимизировать процесс, но это тема для отдельной статьи.


Итоги

Итак, мы рассмотрели, как использовать RocksDB-стейт в Spark Structured Streaming для борьбы с потерянными и дублированными событиями. Подытожим моменты:

  • Конфигурация: не забудьте включить RocksDB state store, иначе Spark по умолчанию будет юзать Memory/HDFS provider. Например, через spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"). Databricks-подобные среды могут иметь свои классы, но принцип тот же.

  • Exactly-Once: Spark Structured Streaming уже по умолчанию обеспечивает end-to-end exactly-once Это значит, можно рассчитывать, что после выключения и повторного запуска источники и слики восстановят согласованный статус. По сути, система сама запомнит, какие оффсеты были обработаны и какое состояние в RocksDB – никаких привычек оставлять дубликаты при восстановлении.

  • Дубликаты: для удаления повторов используйте dropDuplicates или dropDuplicatesWithinWatermark, как рекомендует документация. Это очень простой способ вырезать повторы по ключу, не придумывая собственного стейта. Но если нужна более сложная логика (или например ID не приходит в записи, но можно вычислить), применяем stateful-группировки.

  • Пропущенные события: их не уберёшь, но их можно обнаружить. Если вы присылаете пронумерованные события, вполне реальный сценарий – сравнивать номера и фиксировать нестыковки.

  • RocksDB в продакшене: не забывайте, что RocksDB – это полноценная БД, и ей можно задавать размеры кеша, уровни, включать мониторинг. Spark собирает метрики RocksDB (коммиты, put/get latency и т.п.), так что следите за ними, чтобы понимать, не переполняется ли диск. Если процессов state очень много, можно рассмотреть включение Changelog Checkpointing (в Databricks 13.3+), чтобы ускорить чекпоинты, сохранять только дельту изменений.

  • Стилевые советы: обрабатывайте исключения, вызывайте state.remove() для завершённых сессий, очищайте старый state (например, через GroupState.remove() при форсированном таймауте). В наших примерах мы упрощали реализацию, но более сложные задачи требуют учёта всех краёв: выключений, рестартов, изменений схемы (скажем, добавление поля в Event). Изменение схемы state в mapGroupsWithState требует осторожности: изменение типа стейта или timeout запрещено Spark.

Стриминговая разработка – это всегда немного игра в зеркала: код кажется простым, но нужно помнить о безопасности данных. Тут на помощь и приходит RocksDB: он заботится о сохранности ваших состояний, а вы уже думаете про то, какие события считать дубликатами и как реагировать на неполные последовательности.

В сухом остатке: используйте state-чекапинг, эмбеддинговые хранилища и встроенные дедупликаторы – и ни одно событие вас не удивит.


Работа со стримингом и состоянием в Spark редко даётся «из коробки» — всегда появляются дубликаты, пропуски или узкие места в производительности. Чтобы держать руку на пульсе, важно знать не только приёмы борьбы с проблемами, но и понимать, куда движется сам Spark и как выжимать максимум из его экосистемы. Приходите на бесплатные уроки:

Кроме того, пройдите вступительный тест, чтобы оценить свой уровень и узнать, подойдет ли вам программа углубленного курса по самым мощным инструментам обработки больших данных.

Комментарии (1)


  1. Ninil
    04.09.2025 16:33

    А причем тут RocksDB? Разве все пункты кроме "Конфигурация" не реализуемы без RocksDB. В общем тема RocksDB не раскрыта от слова "совсем".