Привет! Меня зовут Роман Чечёткин, я разработчик в команде «Платформа коммуникаций» в Ozon Tech. Наша платформа предоставляет возможность другим командам отправлять различные сообщения в личные кабинеты пользователей.
Сегодня хочу рассказать о задаче, которая встала перед нами — долгосрочное хранение всех сообщений (смс, электронные письма, пуши, уведомления), которые пользователь получил от Ozon.
Почему мы, в принципе, должны хранить у себя данные? Некоторые коммуникации мы должны хранить не менее трёх лет по юридическим причинам. Для отслеживания этих коммуникаций мы создали специальный маркер.
Для начала мы выделили основные требования к функциональности:
в течение суток сервис должен обрабатывать десятки миллионов сообщений из Kafka;
нельзя терять данные в рамках сервиса, чтобы в случае проверок подтвердить соблюдение законодательства и избежать штрафов;
данные должны храниться в удобном, масштабируемом хранилище;
необходимо использовать холодное хранилище данных (в нашем случае мы будем обращаться за данными очень редко, выгрузка данных будет происходить по запросу — раз-два в месяц);
возможно получение коммуникаций по определённым фильтрам;
необходимо позаботиться о ресурсах, чтобы не потреблять много памяти для хранения.
Подготовка
При проектировании системы хранения больших объёмов холодных данных мы сразу исключили PostgreSQL по причине ненадобности в OLTP-функциональности. В нашем сценарии данные только пишутся один раз и больше не изменяются, поэтому ACID-гарантии и механизмы блокировок избыточны.
Исходя из условий, что нам нужно холодное хранилище для большого количества данных, которое будет легко масштабироваться и надёжно хранить наши данные, мы выбрали для таких целей Hadoop File System.
Среди преимуществ HDFS можно выделить:
распределённое хранение. Данные хранятся с коэффициентом репликации х3. За счёт этого получаем надёжное хранилище данных;
используется горизонтальное масштабирование вместо вертикального. Здесь мы получаем лёгкое масштабирование;
подходит для хранения больших файлов (терабайты, петабайты). НО! Не подходит для хранения множества маленьких файлов до 100 мб. У нас идёт обработка нескольких десятков миллионов сообщений в сутки. Юридически важной коммуникацией может быть каждая десятая;
один раз записал, много раз прочитал — это то, как мы взаимодействуем с нашим сервисом, записав один раз данные HDFS, мы можем множество раз читать и доставать данные, основываясь на входящих фильтрах;
HDFS схож с файловой системой на Unix-устройствах. Каждый проект в Ozon Tech, который использует HDFS, имеет свою директорию по типу /warehouse/<project_name>.
Так как HDFS является файловой системой, то мы будем работать с файлами. Стоит определиться, с помощью какого формата мы будем хранить данные в файлах. Мы можем хранить данные в любом формате, хоть в .txt, но рассмотрим самые популярные решения.
Самое простое, что есть в мире, — это json-формат. Все мы понимаем, что json будет неэффективным вариантом, так как данные хранятся в текстовом виде. Отсюда мы получаем медленную скорость и большие итоговые размеры файлов.
В рамках экосистемы Hadoop, где обрабатывают очень много данных, очень часто используют Parquet-формат — почему бы и нам его не использовать для хранения информации? Parquet — это бинарный колоночный формат файла, который поддерживает сжатие. Это позволяет утилизировать меньше памяти по сравнению с другими форматами на то же количество данных. Если сделать 1 млн одинаковых записей в форматах json и parquet, то разница в размере файлов будет х10.

Но как нам потом получать данные? Кажется, что придётся ходить в HDFS, вытаскивать файлы, а потом делать поиск по содержимому файла. Звучит не очень обнадёживающе.
А как сделать фильтрацию? Тут на помощь приходит ClickHouse и его движок получения данных из HDFS. То есть мы просто указываем путь до файлов и получаем данные прямиком из HDFS. Сказка! Но вопрос, который может возникнуть: а нужен ли нам HDFS, если данные будем доставать из ClickHouse? Может, сразу сохранять данные в ClickHouse и выгружать оттуда?
Ключевым фактором является разделение storage и compute. В HDFS мы будем хранить данные, а ClickHouse будет обрабатывать их по запросу. Это позволит нам масштабировать вычисления независимо от хранилища.
Почему мы считаем, что ClickHouse является просто compute и не хранит данные? Все дело в том, что при чтении данных движок HDFS работает как внешний источник, и фактически ClickHouse не управляет хранением этих данных напрямую. Он лишь выполняет вычисления на лету, читая файлы из HDFS по мере необходимости.
Формат Parquet
Для начала надо разобраться подробнее с форматом Parquet. Ниже представлена схема parquet-файла. Помимо колоночного хранения данных, в parquet-файлах есть Row Groups, которые позволяют разделить данные по подгруппам, и когда мы будем получать данные на основе определенных показателей, то можно будет пропустить любую row group, если она не подходит по условиям.
В примере ниже мы хотим получить все значения T-Shirt. Так как Row Group 2 не имеет таких значений, то эта подгруппа будет проигнорирована, следовательно, будет сделано меньше операций.


ClickHouse при сканировании данных из Parquet-файлов в HDFS умеет эффективно использовать метаинформацию, содержащуюся в файле, включая статистику по каждому Row Group. В частности, каждая Row Group содержит минимальные и максимальные значения по каждому столбцу, что позволяет ClickHouse заранее определить, стоит ли читать конкретную группу строк в контексте заданного фильтра.
Возвращаясь к примеру: если мы выполняем запрос с условием
WHERE product = 'T-Shirt'`
и одна из Row Group по столбцу product
имеет статистику min='Socks',max='Socks'
, то ClickHouse, анализируя эту метаинформацию, поймёт, что ни одна строка из этой группы не удовлетворяет условию — и полностью пропустит её при сканировании. Это существенно сокращает объём обрабатываемых данных и повышает производительность запроса.
Такая стратегия называется predicate push down, и она особенно эффективна на больших объёмах данных, где Parquet-файлы разбиты на десятки или сотни Row Goups. А вот сама настройка input_format_parquet_filter_push_down
в документации ClickHouse

Разработка
Мы выбрали для реализации именно коллаборацию из HDFS и ClickHouse.
Как выглядит флоу работы сервиса? Наш сервис читает топик Kafka, проверяет, является ли коммуникация юридически важной, если да, то сервис пишет запись в Parquet-файл.

Кажется, что здесь максимально примитивная функциональность, как здесь можно что-то сломать? Но как только приступили к реализации сервиса, мы начали сталкиваться с вопросами.
Корректность файлов
Сперва нам надо было разобраться, какие Parquet-файлы будут валидными? Что я имею в виду под словом «валидные»? Если мы можем получать данные в ClickHouse без проблем, значит Parquet-файл валиден.
Попытка #1
Для начала определимся с HDFS и директориями. У нас есть директория в HDFS /warehouse/hdfs-exporter/prod/
.
ClickHouse настроен на просмотр всех Parquet-файлов по этому пути ENGINE=HDFS('hdfs://url:8020/warehouse/hdfs-exporter/prod/*.parquet', 'Parquet')
Мы пишем тестовые файлы в директорию prod по принципу:
открываем соединение с HDFS;
создаём Parquet-файл, куда будем писать;
пишем данные в файл;
смотрим конечные данные в ClickHouse.
В процессе тестирования выяснилось, что если пишем в наш Parquet-файл и хотим уже из него извлечь данные, то ClickHouse не сможет отдавать данные и выдаст ошибку об отсутствии футера. Футер указывает, что файл — формата Parquet. Это реализовано с помощью 4 байт и записи «PAR1» в конце Parquet-файла.

Вывод: нужно обязательно проставлять конечные штампы в виде футера в Parquet-файлах.
Попытка #2
Когда мы поняли, что наличие у файлов футеров обязательно, мы пошли дальше. Сделали механизм, при котором сервис пишет данные в файл определённое время, по истечении которого запись прекращается, файл закрывается на запись, устанавливаем в файле футер и после всего открываем новый файл.
На данный момент у нас есть директория все та же директория /warehouse/hdfs-exporter/prod/
.
Что мы собираемся делать с файлами:
открываем соединение с HDFS;
создаём Parquet-файл, куда будем писать;
пишем определённое время данные;
прекращаем запись;
закрываем файл, ставим футер в файле;
открываем новый файл по соседству в той же директории;
смотрим конечные данные в ClickHouse, когда у нас на руках: один уже закрытый файл и открытый файл, доступный для записи.
Путь до директории, где ClickHouse ищет файлы, остался тем же. При таком подходе мы также получили ошибку. В данном случае ClickHouse не может читать из директории, где одновременно лежат открытые и закрытые файлы.
Вывод: у нас должно быть место только с закрытыми файлами.
Попытка #3
Поняв, что нам надо хранить закрытые файлы в одном месте, мы пришли к такому решению: у нас будет дополнительная директория /warehouse/hdfs-exporter/prod/closed
, куда будем переносить файлы после закрытия.
Получаем такой флоу работы сервиса:
открываем соединение с HDFS;
создаём Parquet-файл, куда будем писать;
пишем определённое время данные;
прекращаем запись;
закрываем файл, ставим футер в файле;
переносим файл из
/warehouse/hdfs-exporter/prod
в/warehouse/hdfs-exporter/prod/closed
;создаем новый файл в
/warehouse/hdfs-exporter/prod
;смотрим конечные данные в ClickHouse, обновив путь, откуда ClickHouse будет брать данные на
ENGINE=HDFS('hdfs://url:8020/warehouse/hdfs-exporter/prod/closed/*.parquet', 'Parquet')
.
При таком подходе мы увидели итоговые данные из таблички в ClickHouse.
Кажется, всё прекрасно, механизм работает, данные получаем, что может пойти не так?
Ох уж эта сеть
При длительном мониторинге сервиса с последним подходом для решения задачи мы начали наблюдать ошибки, которые вызывались из-за проблем нестабильной сети.
Какого рода были ошибки:
невозможность создать файл в HDFS из-за broken pipe;
невозможность закрыть файл из-за broken pipe;
невозможность перенести закрытый файл в директорию closed из-за broken pipe.
Что первое пришло в голову, чтобы починить такие ошибки сети в случае чего? Делать ретраи. Но ретраи помогали очень редко, потому что раз какая-то проблема в сети, то и в рамках ретраев ничего не менялось и отдавались те же ошибки broken pipe.
func (e *EventHandler) moveParquetFileWithRetries(src string) error {
if err := retry.Do(
func() error {
return e.moveParquetFile(src)
},
retry.Attempts(e.app.GetValue(e.ctx, config.HdfsRetryAttemps).Uint()),
retry.Delay(e.app.GetValue(e.ctx, config.HdfsRetryTimeout).Duration()),
retry.DelayType(retry.BackOffDelay),
retry.OnRetry(func(n uint, err error) {
logger.Warnf(e.ctx, "move %s parquet file to dir 'closed' hdfs; retry #%d: %v", src, n+1, err)
}),
); err != nil {
return err
}
return nil
}
Грустно, больно, неприятно.
В один из вечеров, когда гулял по городу, в голову пришло решение — а для чего работать с файлами удалённо в HDSF? Почему я не могу просто работать с файлом локально, а потом отправить его в HDFS. Мы будем в HDFS иметь только закрытые файлы, не будем ловить ошибки сети при переносе и закрытии файлов.
Попытка #4
Глобально мы переносим работу с файлами на локальный диск с удалённого HDFS. Это было просто, так как для работы с Parquet-файлами мы использовали библиотеку github.com/xitongsys/parquet-go
, которая идёт с дополнительной библиотекой github.com/xitongsys/parquet-go-source
, где реализованы разные коннекторы для работы с разными хранилищами. Наша миграция выглядела так:
// БЫЛО
pf, err := hdfs.NewHdfsFileWriter(path)
if err != nil {
return fmt.Errorf(«failed to create file: %v», err)
}
// СТАЛО
pf, err := localparquet.NewLocalFileWriter(path)
if err != nil {
return fmt.Errorf(«failed to create file: %v», err)
}
Мы поменяли коннектор с HDFS на локальное хранилище, а также добавили функциональность для копирования локальных файлов на удалённый HDFS. Для работы с HDFS использовали библиотеку, в основе которой лежит форк либы github.com/colinmarc/hdfs
. В форке реализованы дополнительные логи, трейсы, чтобы получать больше информации по взаимодействию с HDFS.
Проблемы с получением данных
Наконец мы настроили запись данных в файлы, переоткрытие этих файлов, отправку на удалённый HDFS. Всё прекрасно работает, сервис крутится, Kafka читается, количество файлов в HDFS постепенно увеличивается, теперь только идти в ClickHouse и доставать нужные данные. Но тут произошло непредвиденное:
Received exception from server (version 24.8.6):
Code: 497. DB::Exception:
DB::Exception: Cannot list directory /warehouse/comms-hdfs-exporter/prod/closed
AccessControlException:
This is most likely due insufficient credentials or malicious interactions.
(ACCESS_DENIED)
Грубо говоря, у нас нет доступа до HDFS из ClickHouse.
Для контекста: HDFS в Ozon Tech стоит под Kerberos, то есть всем, кто работает с HDFS, нужно авторизоваться с помощью keytab-файла. В ClickHouse как раз указано, как настроить Kerberos для HDFS-движка.
Но почему-то нам ClickHouse не отдавал данные, ссылаясь на то, что ClickHouse не мог пройти аутентификацию в HDFS.
Как работает процесс аутентификации Kerberos с keytab-файлом?
Если кратко, мы можем предоставить свой логин и пароль, чтобы пройти аутентификацию.
kinit <user_name> # потом вводится пароль
Мы можем создать keytab-файл. При помощи keytab-файла вы можете автоматизировать процесс аутентификации Kerberos, не указывая пароль в открытом виде или вводя его каждый раз. Если брать пример из жизни, то вместо того, чтобы каждый раз представляться на парковке охраннику, вы просто сделаете пропуск-удостоверение и будете показывать только его.
# Запустите команду ktutil без аргументов, чтобы начать интерактивную сессию.
ktutil
# Добавление записи с паролем:
ktutil: addent -password -p <user_name> -k 1 -e RC4-HMAC
Password for <user_name>:
# Сохранение keytab-файла:
ktutil: wkt <user_name>.keytab
# Завершение сессии ktutil:
ktutil: q
# Последующую авторизацию можно выполнять через keytab.
kinit -kt <user_name>.keytab <user_name>
При успешной аутентификации в системе создается кэш-тикет, который содержит информацию по сессии, если говорить очень обобщенно.

Здесь можно увидеть, что срок сессии — 10 часов. Получается, надо рефрешиться, чтобы сессия продолжалась.
Раз мы получаем от ClickHouse ошибку ACCESS_DENIED, то у него не получается рефрешнуть кэш-тикет, используя keytab-файл. Возник вопрос: почему? Ответ оказался одним из самых первых в выдаче браузера:

Сначала мы использовали костыль для решения проблемы — стало просто запустить крон джобы на нодах ClickHouse, которая будет рефрешить тикет, используя команду ниже:
kinit -R
После такого костыля мы ни разу не словили ACCESS_DENIED
от ClickHouse.
UPD. Пока я писал статью, добрый человек сделал ПР, который рефрешит тикет. Этот ПР успешно замержили. У меня потекла мужская скупая слеза.
Итоги
На данный момент у нас без ошибок отрабатывает ClickHouse и успешно сохраняются Parquet-файлы в HDFS. Всё, что нам остаётся, — мониторить и искать места, где можно что-то улучшить.
Впереди я вижу задачи в виде пережатия Parquet-файлов для оптимизации сканирования по файлам. Буду ли я делать это на Go или буду постигать Data Engineering, пока не могу сказать, надеюсь, это выльется в отдельную статью.
Для себя я сделал вывод, что иногда правильное решение лежит на поверхности и не стоит изобретать велосипед.