Привет, на связи Василий Самарин, ведущий инженер данных в Х5 Tech. Это моя вторая статья по теме построения SCD-2-таблиц. Если вы еще не знакомы с SCD-2-таблицами, то рекомендую заглянуть в мою первую статью, где рассказывается про то, когда и зачем можно и нужно их использовать, и подробно разбирается пример для PySpark и Hive.
Сегодня SCD-2-таблицы не только остаются актуальными для медленно меняющихся данных, но и, на мой взгляд, становятся гораздо проще в реализации благодаря новым технологиям и инструментам.
С того момента, когда я писал свою предыдущую статью про построение SCD-2-таблиц на PySpark 2.4.4 и Hive, прошло около года, и мне снова поручили пересобрать эту же витрину, но уже в ходе миграции в наше новое хранилище данных. Да, в X5 постоянно что-то меняется, внедряются новые технологии и инструменты. Это одна из причин, почему мне нравится здесь работать.
Итак, в этой статье мы будем:
строить Iceberg-таблицы с типом SCD-2 с помощью Trino с использованием SQL и Python;
попутно освоим прекрасные функции merge, MD5 и другие полезные инструменты;
напишем свой собственный оператор для Airflow для автоматизации ETL-процесса.
Полный код SQL-запросов оператора для Airflow
Полный код SQL-запросов для создания и наполнения всех необходимых исходных таблиц, итеративного наполнения SCD-2-таблицы данными, а также код оператора для Airflow вы найдёте в моём репозитории.
Для понимания общего контекста сначала немного расскажу про Lakehouse DMP 2.0.
Наш новый Lakehouse
Текущее хранилище данных на базе Greenplum исчерпало возможности масштабирования. У централизованной модели управления были узкие места — команды не справлялись с бэклогом, а сложный процесс внесения изменений замедлял поставку дата-продуктов. Кроме того, существующая архитектура не позволяла обеспечивать необходимую оперативность данных (T-1) из-за многоэтапной «перекладки» информации между системами хранения.
DMP 2.0 решает эти проблемы переходом на архитектуру Lakehouse с декомпозицией на независимые дата-хабы (Data Mesh). Технологически разделяется слой хранения данных (object storage на базе открытых стандартов) и вычислительный слой (Trino, Spark, Flink), что обеспечивает гибкое горизонтальное масштабирование без крупных инфраструктурных вложений. Организационно вместо единого хранилища данных внедряется федеративная модель: автономные команды-владельцы дата-хабов самостоятельно разрабатывают и поддерживают дата-продукты, публикуя данные через формализованные дата-контракты. Это снимает нагрузку с центральной команды и ускоряет time-to-market для аналитических решений.
Ключевые принципы нового распределённого хранилища:
наблюдаемость — данные и потоки описаны и находятся под мониторингом;
компонуемость данных — можно объединять данные из различных хабов в одном запросе;
строгое разграничение ответственности — внедрение дата-контрактов;
отсутствие дублирования данных за счёт универсальности Trino.
Технологический стек построен на компонентах с открытым исходным кодом (Trino, Iceberg, MinIO, Airflow).
Если вам будет интересно узнать подробности про наш новый Lakehouse, то напишите, пожалуйста, об этом в комментариях. Мы сделаем про него отдельную статью.
Итак, в описанной среде нужно было построить SCD-2-таблицу с использованием SQL-движка Trino. Оказалось, что у нас пока нет универсального загрузчика для сборки таких таблиц, поэтому я реализовал свой алгоритм.
Общая логика сборки
Основная задача была в том, чтобы на вход получать очередной срез данных (состояние на расчётную дату), который рассчитывается по расписанию, и затем добавлять его в SCD-2-таблицу. А для обработки накопленных ранее срезов мы хотели предусмотреть возможность их последовательной загрузки на этапе первичного наполнения SCD-2-таблицы.
Чтобы сборка была прозрачной и поддерживаемой, весь ETL-процесс разделили на 7 простых последовательных шагов.
Шаг 1. Подготовка среза для обработки.
Шаг 2. Подготовка среза предыдущего состояния.
Шаг 3. Выявление новых записей.
Шаг 4. Выявление удалённых записей.
Шаг 5. Извлечение изменившихся записей для их обновления в целевой таблице.
Шаг 6. Извлечение изменившихся записей для их закрытия в целевой таблице.
Шаг 7. Сборка всех записей для вставки в целевую таблицу.
Шаг 8. Запись в целевую SCD-2-таблицу.
Каждый шаг — это отдельный SQL-запрос для Trino. Результаты работы на шагах 1-7 лучше сохранять во временные таблицы, а на шаге 8 сделать MERGE данных в целевую таблицу. Как показала практика, сохранение промежуточных результатов очень удобно для диагностики ошибок.
Для автоматизации запуска ETL-процесса по расписанию мы будем использовать Airflow 2.11.0.
Пройдёмся по шагам и кратко разберём наиболее важные моменты.
Шаги 1-2
На шагах 1 и 2 нам нужно добавить технические колонки в обрабатываемый срез данных. Для нас это будет результат регулярного расчета (last_slice в моем примере) или срез из исторической таблицы (hist_slice).
То же самое нужно сделать со снимком, отражающим предыдущее состояние. Его мы будем брать из нашей целевой SCD-2-таблицы.
Вот что нам нужно добавить:
valid_from_dttm— дата начала действия записи;valid_to_dttm— дата окончания действия записи;hashdiff_key— хеш-сумма для быстрого поиска изменившихся записей. Будем рассчитывать её на лету.
Чтобы предусмотреть возможность использования одного и того же скрипта для первичной и регулярной загрузок, добавим в шаблон запроса для шага 1 следующие параметры:
reload_flg— признак того, что мы запускаем первичную сборку или делаем загрузку истории;custom_dt— дата, за которую мы хотим взять срез из таблицы с рассчитанными ранее срезами. В случае регулярной загрузки — это будет текущая дата.
Эти параметры потом будут передаваться в оператор Airflow.
Вот как будут выглядеть наши скрипты для шагов 1 и 2.
Шаг 1. Подготовка среза для загрузки в целевую таблицу
DROP TABLE IF EXISTS catalog_name.schema_name.current_slice; CREATE TABLE catalog_name.schema_name.current_slice AS WITH last_slice AS ( SELECT customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, LOWER( TO_HEX( MD5( TO_UTF8( CONCAT_WS( '|', CAST(customer_id as VARCHAR), CAST(mobile_phone_flg as VARCHAR), CAST(email_flg as VARCHAR), CAST(sms_consent_flg as VARCHAR), CAST(email_consent_flg as VARCHAR), CAST(push_consent_flg as VARCHAR) ) ) ) ) ) as hashdiff_key, CAST(CAST(dataflow_dttm AS DATE) AS TIMESTAMP) AS valid_from_dttm, CAST('5999-01-01 00:00:00' AS TIMESTAMP) AS valid_to_dttm FROM catalog_name.schema_name.customer_consents WHERE CAST({{ params.reload_flg }} AS VARCHAR) = '0' ) , hist_slice AS ( SELECT customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, LOWER( TO_HEX( MD5( TO_UTF8( CONCAT_WS( '|', CAST(customer_id as VARCHAR), CAST(mobile_phone_flg as VARCHAR), CAST(email_flg as VARCHAR), CAST(sms_consent_flg as VARCHAR), CAST(email_consent_flg as VARCHAR), CAST(push_consent_flg as VARCHAR) ) ) ) ) ) as hashdiff_key, CAST(dataflow_dt AS TIMESTAMP) AS valid_from_dttm, CAST('5999-01-01 00:00:00' AS TIMESTAMP) AS valid_to_dttm FROM catalog_name.schema_name.customer_consents_sliced WHERE CAST({{ params.reload_flg }} AS VARCHAR) = '1' AND dataflow_dt = CAST('{{ params.custom_dt }}' AS DATE) ) SELECT customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, hashdiff_key, valid_from_dttm, valid_to_dttm FROM last_slice UNION ALL SELECT customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, hashdiff_key, valid_from_dttm, valid_to_dttm FROM hist_slice ;
Шаг 2. Подготовка среза предыдущего состояния
DROP TABLE IF EXISTS catalog_name.schema_name.previous_slice; CREATE TABLE catalog_name.schema_name.previous_slice AS SELECT customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, LOWER( TO_HEX( MD5( TO_UTF8( CONCAT_WS( '|', CAST(customer_id AS VARCHAR), CAST(mobile_phone_flg AS VARCHAR), CAST(email_flg AS VARCHAR), CAST(sms_consent_flg AS VARCHAR), CAST(email_consent_flg AS VARCHAR), CAST(push_consent_flg AS VARCHAR) ) ) ) ) ) AS hashdiff_key, valid_from_dttm, valid_to_dttm FROM catalog_name.schema_name.customer_consents_versioned WHERE valid_to_dttm = CAST('5999-01-01 00:00:00' AS TIMESTAMP) ;
На выходе мы получим два подготовленных среза данных:
срез для обработки (
current_slice);срез предыдущего состояния (
previous_slice).
Шаги 3-4
На шагах 3 и 4 мы делаем JOIN двух срезов из шага 1 по первичному ключу (customer_id в нашем примере) и сохраняем результаты в промежуточные таблицы. Первичный ключ может быть композитным. Если хотя бы в одной из колонок ключа есть пустые значения, то нужно иметь ввиду, что такие записи будут отброшены при соединении. Если вы не хотите этого допустить, то нужно обработать NULL-значения в каждой из колонок составного ключа, например, с помощью COALESCE. Еще вариант — объединить все колонки составного ключа и посчитать хеш, как мы это делали на шаге 1 для расчета поля hashdiff_key, а потом уже использовать этот ключ для JOIN.
Шаг 3. Новые записи относительно снимка предыдущего состояния
DROP TABLE IF EXISTS catalog_name.schema_name.appended_rows; CREATE TABLE catalog_name.schema_name.appended_rows AS SELECT cs.customer_id, cs.mobile_phone_flg, cs.email_flg, cs.sms_consent_flg, cs.email_consent_flg, cs.push_consent_flg, cs.valid_from_dttm, cs.valid_to_dttm FROM catalog_name.schema_name.current_slice cs LEFT JOIN catalog_name.schema_name.previous_slice ps ON cs.customer_id = ps.customer_id WHERE ps.customer_id IS NULL ;
Шаг 4. Удаленные записи
DROP TABLE IF EXISTS catalog_name.schema_name.deleted_rows; CREATE TABLE catalog_name.schema_name.deleted_rows AS SELECT ps.customer_id, ps.mobile_phone_flg, ps.email_flg, ps.sms_consent_flg, ps.email_consent_flg, ps.push_consent_flg, ps.valid_from_dttm, cj.valid_to_dttm FROM catalog_name.schema_name.previous_slice ps LEFT JOIN catalog_name.schema_name.current_slice cs ON ps.customer_id = cs.customer_id CROSS JOIN ( SELECT valid_from_dttm AS valid_to_dttm FROM catalog_name.schema_name.current_slice LIMIT 1 ) cj WHERE cs.customer_id IS NULL ;
Шаг 5
На шаге 5 мы также делаем JOIN по первичному ключу, но уже сравниваем значения в колонках hashdiff_key. Если они различаются, то берём обновлённое состояние из среза для последующего обновления данных в итоговой таблице.
Шаг 5. Изменившиеся записи для добавления
DROP TABLE IF EXISTS catalog_name.schema_name.updated_rows; CREATE TABLE catalog_name.schema_name.updated_rows AS SELECT cs.customer_id, cs.mobile_phone_flg, cs.email_flg, cs.sms_consent_flg, cs.email_consent_flg, cs.push_consent_flg, cs.valid_from_dttm, cs.valid_to_dttm FROM catalog_name.schema_name.current_slice cs LEFT JOIN catalog_name.schema_name.previous_slice ps ON cs.customer_id = ps.customer_id WHERE ps.hashdiff_key != cs.hashdiff_key ;
Шаг 6
На шаге 6 мы просто собираем все записи в общую таблицу перед тем, как делать MERGE в целевую таблицу.
Шаг 6. Изменившиеся записи для закрытия (обновление valid_to_dttm) в целевой таблице
DROP TABLE IF EXISTS catalog_name.schema_name.closed_rows; CREATE TABLE catalog_name.schema_name.closed_rows AS SELECT ps.customer_id, ps.mobile_phone_flg, ps.email_flg, ps.sms_consent_flg, ps.email_consent_flg, ps.push_consent_flg, ps.valid_from_dttm, cs.valid_from_dttm AS valid_to_dttm FROM catalog_name.schema_name.previous_slice ps LEFT JOIN catalog_name.schema_name.current_slice cs ON ps.customer_id = cs.customer_id WHERE ps.hashdiff_key != cs.hashdiff_key ;
Шаг 7
На данном шаге мы просто объединяем полученные записи на предыдущих этапах в общую временную таблицу.
Шаг 7. Сборка всех записей для вставки в целевую таблицу
DROP TABLE IF EXISTS catalog_name.schema_name.load_batch; CREATE TABLE catalog_name.schema_name.load_batch AS SELECT customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, valid_from_dttm, valid_to_dttm FROM catalog_name.schema_name.appended_rows UNION ALL SELECT customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, valid_from_dttm, valid_to_dttm FROM catalog_name.schema_name.deleted_rows UNION ALL SELECT customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, valid_from_dttm, valid_to_dttm FROM catalog_name.schema_name.updated_rows UNION ALL SELECT customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, valid_from_dttm, valid_to_dttm FROM catalog_name.schema_name.closed_rows ;
Шаг 8
На шаге 8 происходит то, чего нам сильно не хватало при работе с таблицами в Hive — MERGE данных. Благодаря переходу на Iceberg-таблицы автоматически обновятся все необходимые записи в целевой таблице.
Шаг 8. Merge в целевую таблицу
MERGE INTO catalog_name.schema_name.customer_consents_versioned AS trg USING catalog_name.schema_name.load_batch src ON src.customer_id = trg.customer_id AND src.valid_from_dttm = trg.valid_from_dttm WHEN MATCHED AND ( src.mobile_phone_flg IS DISTINCT FROM trg.mobile_phone_flg OR src.email_flg IS DISTINCT FROM trg.email_flg OR src.sms_consent_flg IS DISTINCT FROM trg.sms_consent_flg OR src.email_consent_flg IS DISTINCT FROM trg.email_consent_flg OR src.push_consent_flg IS DISTINCT FROM trg.push_consent_flg OR src.valid_to_dttm IS DISTINCT FROM trg.valid_to_dttm ) THEN UPDATE SET mobile_phone_flg = src.mobile_phone_flg, email_flg = src.email_flg, sms_consent_flg = src.sms_consent_flg, email_consent_flg = src.email_consent_flg, push_consent_flg = src.push_consent_flg, valid_to_dttm = src.valid_to_dttm, dataflow_dttm = DATE_TRUNC('SECOND', CURRENT_TIMESTAMP) WHEN NOT MATCHED THEN INSERT ( customer_id, mobile_phone_flg, email_flg, sms_consent_flg, email_consent_flg, push_consent_flg, valid_from_dttm, valid_to_dttm, dataflow_dttm ) VALUES ( src.customer_id, src.mobile_phone_flg, src.email_flg, src.sms_consent_flg, src.email_consent_flg, src.push_consent_flg, src.valid_from_dttm, src.valid_to_dttm, DATE_TRUNC('SECOND', CURRENT_TIMESTAMP) );
Разберём, как этот скрипт устроен.
В запросе мы используем конструкцию IS DISTINCT FROM вместо оператора '<>'. Это расширение SQL корректно обрабатывает NULL. Оно доступно в Trino и некоторых других СУБД, например, PostgreSQL. В блоке WHEN MATCHED с помощью этой конструкции мы проверяем, какие значения в колонках изменились, чтобы потом обновить только их.
Автоматизация через Airflow
Разобравшись с SQL-запросами, можно попробовать собрать SCD-2-таблицу из примера. Всё необходимое для этого вы найдёте в репозитории. Чтобы лучше понять, как работают SCD-2-таблицы вы можете запускать запросы итеративно. Сначала итерация 1 из insert_data.sql, потом все шаги из scd2_build.sql, наблюдая за тем, как накапливается история изменений в целевой таблице.
После этого у вас уже не должно остаться сомнений и вопросов, а значит, пора приступать к автоматизации ETL-процесса, например, с помощью Airflow.
В зависимости от того, какую версию Airflow вы используете, можно сделать это с помощью готового TrinoOperator или написать свой. Мы разработаем свой оператор для Airflow 2.11.0, в которой TrinoOperator пока отсутствует. Для этого достаточно разместить подготовленный модуль scd2_loader.py на Python в директорию plugins и настроить коннект к Trino.
Полный код оператора также есть в репозитории, а мы разберёмся с важными моментами и параметрами запуска:
trg_table— целевая таблица;src_table— таблица-источник;key_columns— колонки составного первичного ключа;eff_from— дата начала действия записей из таблицы источника. Обычно подходит дата загрузки данных в таблицу;wrk_schema— схема для временного размещения промежуточных таблиц с результатами;reload_flg— признак пересчёта (когда делаем обработку накопленных срезов, то выставляем его в 1);partition_column— колонка, по которой делаются партиции в таблице с накопленными срезами (нужна только при первичном наполнении приreload_flg = 1);custom_dt— дата, за которую берётся срез из таблицы с накопленными срезами (заполняется только приreload_flg = 1);conn_id— название подключения к Trino в Airflow.
Вот какие требования к источнику данных нужно учитывать при работе с данным оператором:
состав колонок и их типы в таблице-источнике и целевой SCD-2-таблице должны совпадать, за исключением технических полей (
valid_from_dttm,valid_to_dttm,dataflow_dttm);в таблице-источнике не должно быть дублей по первичному ключу;
в таблице-источнике не должно быть записей с
NULL-значениями в колонках составного первичного ключа (почему — смотрите пояснения к шагу 3 и 4).
В нашем операторе все рассмотренные выше SQL-запросы, были шаблонизированы. Код для выполнения в Trino генерируется на лету в зависимости от переданных параметров.
В оператор были сразу добавлены базовые проверки переданных параметров и таблиц, чтобы ещё до старта основной части оповестить пользователя об ошибках и не тратить ресурсы на бесполезные вычисления.
Остается только подготовить DAG, а дальше Airflow выполнит все шаги ETL-процесса сборки целевой таблицы автоматически.
Заключение
В итоге мы получили «машину времени», которая позволяет вернуться в любой момент жизни наших данных, а также посмотреть на то, как изменялись записи с момента их рождения. Для этого достаточно сделать фильтр по полям valid_from_dttm, valid_to_dttm, указав нужную нам дату (target_dttm) в прошлом:
SELECT * FROM target_scd_2_table WHERE valid_from_dttm <= target_dttm AND valid_to_dttm > target_dttm
Обратите внимание, что valid_to_dttm строго больше target_dttm для избежания получения дублей.
А для получения актуального среза на текущий момент времени достаточно всего одного фильтра:
SELECT * FROM target_scd_2_table WHERE valid_to_dttm = CAST('5999-01-01 00:00:00' AS TIMESTAMP)
Надеюсь, что теперь вы окончательно разобрались, как строить SCD-2-таблицы с использованием Spark или Trino SQL, чётко понимаете, когда их нужно использовать и сможете успешно их применять для удобного и экономичного хранения данных. А если вдруг остались вопросы или предложения, то обязательно пишите о них в комментариях. Буду рад научиться чему-то новому.