Привет! Меня зовут Роман Чечёткин, я разработчик в команде «Платформа коммуникаций» в Ozon Tech. Наша платформа предоставляет возможность другим командам отправлять различные сообщения в личные кабинеты пользователей.

Сегодня хочу рассказать о задаче, которая встала перед нами — долгосрочное хранение всех сообщений (смс, электронные письма, пуши, уведомления), которые пользователь получил от Ozon.

Почему мы, в принципе, должны хранить у себя данные? Некоторые коммуникации мы должны хранить не менее трёх лет по юридическим причинам. Для отслеживания этих коммуникаций мы создали специальный маркер.

Для начала мы выделили основные требования к функциональности:

  • в течение суток сервис должен обрабатывать десятки миллионов сообщений из Kafka;

  • нельзя терять данные в рамках сервиса, чтобы в случае проверок подтвердить соблюдение законодательства и избежать штрафов;

  • данные должны храниться в удобном, масштабируемом хранилище;

  • необходимо использовать холодное хранилище данных (в нашем случае мы будем обращаться за данными очень редко, выгрузка данных будет происходить по запросу — раз-два в месяц);

  • возможно получение коммуникаций по определённым фильтрам;

  • необходимо позаботиться о ресурсах, чтобы не потреблять много памяти для хранения.

Подготовка

При проектировании системы хранения больших объёмов холодных данных мы сразу исключили PostgreSQL по причине ненадобности в OLTP-функциональности. В нашем сценарии данные только пишутся один раз и больше не изменяются, поэтому ACID-гарантии и механизмы блокировок избыточны.

Исходя из условий, что нам нужно холодное хранилище для большого количества данных, которое будет легко масштабироваться и надёжно хранить наши данные, мы выбрали для таких целей Hadoop File System.

Среди преимуществ HDFS можно выделить:

  • распределённое хранение. Данные хранятся с коэффициентом репликации х3. За счёт этого получаем надёжное хранилище данных;

  • используется горизонтальное масштабирование вместо вертикального. Здесь мы получаем лёгкое масштабирование;

  • подходит для хранения больших файлов (терабайты, петабайты). НО! Не подходит для хранения множества маленьких файлов до 100 мб. У нас идёт обработка нескольких десятков миллионов сообщений в сутки. Юридически важной коммуникацией может быть каждая десятая;

  • один раз записал, много раз прочитал — это то, как мы взаимодействуем с нашим сервисом, записав один раз данные HDFS, мы можем множество раз читать и доставать данные, основываясь на входящих фильтрах;

  • HDFS схож с файловой системой на Unix-устройствах. Каждый проект в Ozon Tech, который использует HDFS, имеет свою директорию по типу /warehouse/<project_name>.

Так как HDFS является файловой системой, то мы будем работать с файлами. Стоит определиться, с помощью какого формата мы будем хранить данные в файлах. Мы можем хранить данные в любом формате, хоть в .txt, но рассмотрим самые популярные решения.

  1. Самое простое, что есть в мире, — это json-формат. Все мы понимаем, что json будет неэффективным вариантом, так как данные хранятся в текстовом виде. Отсюда мы получаем медленную скорость и большие итоговые размеры файлов.

  2. В рамках экосистемы Hadoop, где обрабатывают очень много данных, очень часто используют Parquet-формат — почему бы и нам его не использовать для хранения информации? Parquet — это бинарный колоночный формат файла, который поддерживает сжатие. Это позволяет утилизировать меньше памяти по сравнению с другими форматами на то же количество данных. Если сделать 1 млн одинаковых записей в форматах json и parquet, то разница в размере файлов будет х10.

Ссылка на демо

Размеры json- и parquet-файлов с 1 млн записей
Размеры json- и parquet-файлов с 1 млн записей

Но как нам потом получать данные? Кажется, что придётся ходить в HDFS, вытаскивать файлы, а потом делать поиск по содержимому файла. Звучит не очень обнадёживающе.

А как сделать фильтрацию? Тут на помощь приходит ClickHouse и его движок получения данных из HDFS. То есть мы просто указываем путь до файлов и получаем данные прямиком из HDFS. Сказка! Но вопрос, который может возникнуть: а нужен ли нам HDFS, если данные будем доставать из ClickHouse? Может, сразу сохранять данные в ClickHouse и выгружать оттуда?

Ключевым фактором является разделение storage и compute. В HDFS мы будем хранить данные, а ClickHouse будет обрабатывать их по запросу. Это позволит нам масштабировать вычисления независимо от хранилища.

Почему мы считаем, что ClickHouse является просто compute и не хранит данные? Все дело в том, что при чтении данных движок HDFS работает как внешний источник, и фактически ClickHouse не управляет хранением этих данных напрямую. Он лишь выполняет вычисления на лету, читая файлы из HDFS по мере необходимости.

Формат Parquet

Для начала надо разобраться подробнее с форматом Parquet. Ниже представлена схема parquet-файла. Помимо колоночного хранения данных, в parquet-файлах есть Row Groups, которые позволяют разделить данные по подгруппам, и когда мы будем получать данные на основе определенных показателей, то можно будет пропустить любую row group, если она не подходит по условиям.

В примере ниже мы хотим получить все значения T-Shirt. Так как Row Group 2 не имеет таких значений, то эта подгруппа будет проигнорирована, следовательно, будет сделано меньше операций.

все row groups
все row groups
проигнорирована row group 2
проигнорирована row group 2

ClickHouse при сканировании данных из Parquet-файлов в HDFS умеет эффективно использовать метаинформацию, содержащуюся в файле, включая статистику по каждому Row Group. В частности, каждая Row Group содержит минимальные и максимальные значения по каждому столбцу, что позволяет ClickHouse заранее определить, стоит ли читать конкретную группу строк в контексте заданного фильтра.

Возвращаясь к примеру: если мы выполняем запрос с условием

WHERE product = 'T-Shirt'`

и одна из Row Group по столбцу product имеет статистику min='Socks',max='Socks', то ClickHouse, анализируя эту метаинформацию, поймёт, что ни одна строка из этой группы не удовлетворяет условию — и полностью пропустит её при сканировании. Это существенно сокращает объём обрабатываемых данных и повышает производительность запроса.

Такая стратегия называется predicate push down, и она особенно эффективна на больших объёмах данных, где Parquet-файлы разбиты на десятки или сотни Row Goups. А вот сама настройка input_format_parquet_filter_push_down в документации ClickHouse

cхема parquet файла
cхема parquet файла

Разработка

Мы выбрали для реализации именно коллаборацию из HDFS и ClickHouse.

Как выглядит флоу работы сервиса? Наш сервис читает топик Kafka, проверяет, является ли коммуникация юридически важной, если да, то сервис пишет запись в Parquet-файл.

флоу работы сервиса
флоу работы сервиса

Кажется, что здесь максимально примитивная функциональность, как здесь можно что-то сломать? Но как только приступили к реализации сервиса, мы начали сталкиваться с вопросами.

Корректность файлов

Сперва нам надо было разобраться, какие Parquet-файлы будут валидными? Что я имею в виду под словом «валидные»? Если мы можем получать данные в ClickHouse без проблем, значит Parquet-файл валиден.

Попытка #1

Для начала определимся с HDFS и директориями. У нас есть директория в HDFS /warehouse/hdfs-exporter/prod/.

ClickHouse настроен на просмотр всех Parquet-файлов по этому пути ENGINE=HDFS('hdfs://url:8020/warehouse/hdfs-exporter/prod/*.parquet', 'Parquet')

Мы пишем тестовые файлы в директорию prod по принципу:

  1. открываем соединение с HDFS;

  2. создаём Parquet-файл, куда будем писать;

  3. пишем данные в файл;

  4. смотрим конечные данные в ClickHouse.

В процессе тестирования выяснилось, что если пишем в наш Parquet-файл и хотим уже из него извлечь данные, то ClickHouse не сможет отдавать данные и выдаст ошибку об отсутствии футера. Футер указывает, что файл — формата Parquet. Это реализовано с помощью 4 байт и записи «PAR1» в конце Parquet-файла.

футер parquet-файла
футер parquet-файла

Вывод: нужно обязательно проставлять конечные штампы в виде футера в Parquet-файлах.

Попытка #2

Когда мы поняли, что наличие у файлов футеров обязательно, мы пошли дальше. Сделали механизм, при котором сервис пишет данные в файл определённое время, по истечении которого запись прекращается, файл закрывается на запись, устанавливаем в файле футер и после всего открываем новый файл.

На данный момент у нас есть директория все та же директория /warehouse/hdfs-exporter/prod/.

Что мы собираемся делать с файлами:

  1. открываем соединение с HDFS;

  2. создаём Parquet-файл, куда будем писать;

  3. пишем определённое время данные;

  4. прекращаем запись;

  5. закрываем файл, ставим футер в файле;

  6. открываем новый файл по соседству в той же директории;

  7. смотрим конечные данные в ClickHouse, когда у нас на руках: один уже закрытый файл и открытый файл, доступный для записи.

Путь до директории, где ClickHouse ищет файлы, остался тем же. При таком подходе мы также получили ошибку. В данном случае ClickHouse не может читать из директории, где одновременно лежат открытые и закрытые файлы.

Вывод: у нас должно быть место только с закрытыми файлами.

Попытка #3

Поняв, что нам надо хранить закрытые файлы в одном месте, мы пришли к такому решению: у нас будет дополнительная директория /warehouse/hdfs-exporter/prod/closed, куда будем переносить файлы после закрытия.

Получаем такой флоу работы сервиса:

  1. открываем соединение с HDFS;

  2. создаём Parquet-файл, куда будем писать;

  3. пишем определённое время данные;

  4. прекращаем запись;

  5. закрываем файл, ставим футер в файле;

  6. переносим файл из /warehouse/hdfs-exporter/prod в /warehouse/hdfs-exporter/prod/closed;

  7. создаем новый файл в /warehouse/hdfs-exporter/prod;

  8. смотрим конечные данные в ClickHouse, обновив путь, откуда ClickHouse будет брать данные на ENGINE=HDFS('hdfs://url:8020/warehouse/hdfs-exporter/prod/closed/*.parquet', 'Parquet').

При таком подходе мы увидели итоговые данные из таблички в ClickHouse.

Кажется, всё прекрасно, механизм работает, данные получаем, что может пойти не так?

Ох уж эта сеть

При длительном мониторинге сервиса с последним подходом для решения задачи мы начали наблюдать ошибки, которые вызывались из-за проблем нестабильной сети.

Какого рода были ошибки:

  • невозможность создать файл в HDFS из-за broken pipe;

  • невозможность закрыть файл из-за broken pipe;

  • невозможность перенести закрытый файл в директорию closed из-за broken pipe.

Что первое пришло в голову, чтобы починить такие ошибки сети в случае чего? Делать ретраи. Но ретраи помогали очень редко, потому что раз какая-то проблема в сети, то и в рамках ретраев ничего не менялось и отдавались те же ошибки broken pipe.

func (e *EventHandler) moveParquetFileWithRetries(src string) error {
	if err := retry.Do(
		func() error {
			return e.moveParquetFile(src)
		},
		retry.Attempts(e.app.GetValue(e.ctx, config.HdfsRetryAttemps).Uint()),
		retry.Delay(e.app.GetValue(e.ctx, config.HdfsRetryTimeout).Duration()),
		retry.DelayType(retry.BackOffDelay),
		retry.OnRetry(func(n uint, err error) {
			logger.Warnf(e.ctx, "move %s parquet file to dir 'closed' hdfs; retry #%d: %v", src, n+1, err)
		}),
	); err != nil {
		return err
	}
	return nil
}

Грустно, больно, неприятно.

В один из вечеров, когда гулял по городу, в голову пришло решение — а для чего работать с файлами удалённо в HDSF? Почему я не могу просто работать с файлом локально, а потом отправить его в HDFS. Мы будем в HDFS иметь только закрытые файлы, не будем ловить ошибки сети при переносе и закрытии файлов.

Попытка #4

Глобально мы переносим работу с файлами на локальный диск с удалённого HDFS. Это было просто, так как для работы с Parquet-файлами мы использовали библиотеку github.com/xitongsys/parquet-go, которая идёт с дополнительной библиотекой github.com/xitongsys/parquet-go-source, где реализованы разные коннекторы для работы с разными хранилищами. Наша миграция выглядела так:

// БЫЛО
pf, err := hdfs.NewHdfsFileWriter(path)
if err != nil {
		return fmt.Errorf(«failed to create file: %v», err)
}

// СТАЛО
pf, err := localparquet.NewLocalFileWriter(path)
if err != nil {
		return fmt.Errorf(«failed to create file: %v», err)
}

Мы поменяли коннектор с HDFS на локальное хранилище, а также добавили функциональность для копирования локальных файлов на удалённый HDFS. Для работы с HDFS использовали библиотеку, в основе которой лежит форк либы github.com/colinmarc/hdfs. В форке реализованы дополнительные логи, трейсы, чтобы получать больше информации по взаимодействию с HDFS.

Проблемы с получением данных

Наконец мы настроили запись данных в файлы, переоткрытие этих файлов, отправку на удалённый HDFS. Всё прекрасно работает, сервис крутится, Kafka читается, количество файлов в HDFS постепенно увеличивается, теперь только идти в ClickHouse и доставать нужные данные. Но тут произошло непредвиденное:

Received exception from server (version 24.8.6):
Code: 497. DB::Exception: 
DB::Exception: Cannot list directory /warehouse/comms-hdfs-exporter/prod/closed
AccessControlException: 
This is most likely due insufficient credentials or malicious interactions. 
(ACCESS_DENIED)

Грубо говоря, у нас нет доступа до HDFS из ClickHouse.

Для контекста: HDFS в Ozon Tech стоит под Kerberos, то есть всем, кто работает с HDFS, нужно авторизоваться с помощью keytab-файла. В ClickHouse как раз указано, как настроить Kerberos для HDFS-движка.

Но почему-то нам ClickHouse не отдавал данные, ссылаясь на то, что ClickHouse не мог пройти аутентификацию в HDFS.

Как работает процесс аутентификации Kerberos с keytab-файлом?

Если кратко, мы можем предоставить свой логин и пароль, чтобы пройти аутентификацию.

kinit <user_name> # потом вводится пароль

Мы можем создать keytab-файл. При помощи keytab-файла вы можете автоматизировать процесс аутентификации Kerberos, не указывая пароль в открытом виде или вводя его каждый раз. Если брать пример из жизни, то вместо того, чтобы каждый раз представляться на парковке охраннику, вы просто сделаете пропуск-удостоверение и будете показывать только его.

# Запустите команду ktutil без аргументов, чтобы начать интерактивную сессию.
ktutil
# Добавление записи с паролем:
ktutil: addent -password -p <user_name> -k 1 -e RC4-HMAC
Password for <user_name>:
# Сохранение keytab-файла:
ktutil: wkt <user_name>.keytab
# Завершение сессии ktutil:
ktutil: q
 
# Последующую авторизацию можно выполнять через keytab.
kinit -kt <user_name>.keytab <user_name>

При успешной аутентификации в системе создается кэш-тикет, который содержит информацию по сессии, если говорить очень обобщенно.

результат klist
результат klist

Здесь можно увидеть, что срок сессии — 10 часов. Получается, надо рефрешиться, чтобы сессия продолжалась.

Раз мы получаем от ClickHouse ошибку ACCESS_DENIED, то у него не получается рефрешнуть кэш-тикет, используя keytab-файл. Возник вопрос: почему? Ответ оказался одним из самых первых в выдаче браузера:

clickhouse не умеет в рефреш
clickhouse не умеет в рефреш

Сначала мы использовали костыль для решения проблемы — стало просто запустить крон джобы на нодах ClickHouse, которая будет рефрешить тикет, используя команду ниже:

kinit -R 

После такого костыля мы ни разу не словили ACCESS_DENIED от ClickHouse.

UPD. Пока я писал статью, добрый человек сделал ПР, который рефрешит тикет. Этот ПР успешно замержили. У меня потекла мужская скупая слеза.

Итоги

На данный момент у нас без ошибок отрабатывает ClickHouse и успешно сохраняются Parquet-файлы в HDFS. Всё, что нам остаётся, — мониторить и искать места, где можно что-то улучшить.

Впереди я вижу задачи в виде пережатия Parquet-файлов для оптимизации сканирования по файлам. Буду ли я делать это на Go или буду постигать Data Engineering, пока не могу сказать, надеюсь, это выльется в отдельную статью.

Для себя я сделал вывод, что иногда правильное решение лежит на поверхности и не стоит изобретать велосипед.

Комментарии (0)