
Установка и эксплуатация приложений Spark в облаке зачастую становятся препятствием для дата-инженеров (Data Engineer, DE): сложная работа с Helm-конфигурациями отвлекает внимание от анализа данных и замедляет подготовку среды. Но полностью отказываться от Spark зачастую нерационально, поэтому многие команды стремятся найти свое решение для обхода существующих сложностей.
Привет, Хабр. Меня зовут Юрий Орлов. Я руководитель команды разработки ML Platform в VK Tech. В этой статье я расскажу о том, как мы автоматизировали развертывание Spark в облаке и создали клиент на Python, который снижает требования к знаниям в области DevOps и Kubernetes, необходимым для начала работы со Spark.
О Spark и существующих сложностях
Apache Spark — фреймворк для быстрой и эффективной обработки больших объемов данных (Big Data).
Со Spark работают разные специалисты: разработчики, аналитики данных, ML-специалисты и дата-инженеры. При этом дата-инженеры решают с помощью Spark большой пул задач, среди которых:
Сбор и обработка данных (ETL). Извлечение, трансформация и загрузка данных из множества источников в централизованное хранилище.
Анализ и агрегирование данных. Вычисление статистики, создание отчетов и визуализаций на основе большого объема данных.
Поточная обработка данных (Streaming). Быстрое реагирование на события путем обработки данных в реальном времени.
Оптимизация вычислительных процессов. Повышение скорости и эффективности обработки данных за счет параллельных и распределенных вычислений.
Развертывание и мониторинг инфраструктуры. Установка, конфигурирование и поддержание стабильности работы Spark-кластеров.
Вместе с тем для дата-инженеров работа со Spark не всегда проста. Так, запуск Spark в Kubernetes требует создания YAML и компетенций в DevOps. Из-за этого DE вынуждены погружаться в особенности реализации YAML, kubernetes, взаимодействия с компонентами Spark и прочих ненужных инфраструктурных нюансов. Это повышает порог входа в работу с инструментом и значительно сказывается на производительности команд.
Наше решение
Чтобы преодолеть существующие сложности работы со Spark, мы с командой разработали Python-клиент, который скрывает «под капотом» всю механику работы с kubernetes и конфигами. Таким образом, при использовании данного клиента spark job запускается прямо из python и airflow, а инженеры могут писать пайплайны, а не YAML.
Сам Python-клиент устанавливается в Notebook Jupyterhub и хранит в себе множество настроек для интеграции с Kubernetes-кластером, на котором установлен Apache Spark с Spark-оператором.

Таким образом мы избавляем дата-инженеров от необходимости выполнения множества низкоуровневых операций: от подключения к Kubernetes до указания дополнительных переменных в окружениях.
Сравнение методов запуска Spark
Чтобы понять, насколько удобной получилась наша реализация, сравним ее с другими способами запуска Spark.
Запуск Spark c YARN
Для начала рассмотрим небольшой фрагмент кода, касающийся YARN.
from pyspark.sql import SparkSession
# Создаем SparkSession с YARN как cluster manager
spark = SparkSession.builder \
.appName("WordCountOnYARN") \
.master("yarn") \
.getOrCreate()
lines = spark.read.text("hdfs:///data/input.txt").rdd.map(lambda r: r[0])
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
word_counts.saveAsTextFile("hdfs:///data/output")
spark.stop()
Здесь, кажется, все просто:
создаем экземпляр сессии;
пишем нужный код;
подключаемся к YARN, после чего начинается обработка исходных данных и их преобразование в нужный вид.
Но в действительности у такой реализации есть несколько нюансов.
Необходимость HADOOP-кластера с файловой системой HDFS. Это большой комбайн, который нужно не только запустить, но и поддерживать.
Сложность поддержки YARN-кластера. Необходима поддержка YARN — отдельного кластерного компонента, который также разворачивается в Kubernetes и имеет собственные тонкости, многочисленные настройки и потенциальные трудности.
Недостаточная гибкость настроек. В настройках есть определенные ограничения.
Таким образом, для полноценной работы с такой реализацией потребуется привлечение как минимум одного DevOps-специалиста. В противном случае дата-инженеры будут вынуждены тратить время на настройку работы с внешними клиентами (например, airflow).
Запуск spark в k8s
В случае Kubernetes сразу появляется зависимость от элементов k8s, в первую очередь — от манифестов и правил их составления.
Причем настройка взаимосвязей между компонентами и внесение правок в манифесты — дело непростое. Для наглядности разберем небольшой фрагмент манифеста:
{ "apiVersion":"sparkoperator.k8s.io/v1beta2",
"kind":"SparkApplication",
"metadata":{
"name":"job-37666432",
"namespace":"sparkcluster-tnz2g7qn"
},
"spec":{
"driver":{
"affinity":{
"nodeAffinity":{
"requiredDuringSchedulingIgnoredDuringExecution":{
"nodeSelectorTerms":[
{
"matchExpressions":[
{
"key":"data.vkcs.cloud/nodegroup",
"operator":"In",
"values":[
"sparkjobs-tnz2g7qn"
]}]}]}}},
"annotations":{
"data.vkcs.cloud/log-filter":"sparkjob"
},
"coreLimit":"1200m",
"cores":1,
"env":[{
"name":"AWS_ACCESS_KEY_ID","valueFrom":{
При изучении манифеста становится очевидно, что даже для запуска маленькой простой джобы в Spark нужно:
описывать YAML-манифесты;
знать детали kubernetes, понимать, где и какие разделы располагаются.
Все это приводит к тому, что дата-инженеры вынуждены сначала изучать и помнить детали инфраструктуры, а уже потом приступать к написанию запросов и созданию пайплайнов. При этом даже незначительные изменения в пайплайнах занимают дополнительное время, поскольку каждое обновление конфигурации требует соответствующих исправлений в манифестах Kubernetes.
Запуск spark job через наш Python-клиент
Теперь к тому, как запускается spark job в нашей реализации. Для примера рассмотрим задачу конвертации набора данных из формата CSV в Parquet посредством Spark.
Здесь все просто:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, IntegerType, DecimalType, StringType
from pyspark.sql.functions import StructType
def check_files_exists(spark: SparkSession, input_s3_path: str):
print(f"Checking input path: {input_s3_path}/customer.tbl")
try:
temp_df = spark.read.text(f"{input_s3_path}/customer.tbl").limit(1)
count = temp_df.count()
print(f"File accessible, found {count} line(s)")
except Exception as e:
print(f"Cannot access file: {e}")
raise
def convert_customer():
input_s3_path = os.environ.get('INPUT_S3_PATH')
output_s3_path = os.environ.get('OUTPUT_S3_PATH')
scale_factor = os.environ.get('SCALE_FACTOR')
customer_schema = StructType([
StructField("c_custkey", IntegerType(), True),
StructField("c_name", StringType(), True),
StructField("c_address", StringType(), True),
StructField("c_nationkey", IntegerType(), True),
StructField("c_phone", StringType(), True),
StructField("c_acctbal", DecimalType(15, 2), True),
StructField("c_mktsegment", StringType(), True),
StructField("c_comment", StringType(), True)
])
spark = SparkSession.builder \
.appName(f"Convert_Customer_SF{scale_factor}") \
.getOrCreate()
Помимо этой джобы, мы создаем еще дополнительный Python-файл, который будет ей управлять.
import logging
import uuid
from mlplatform_client.v2 import BasicAuth, SparkCluster
from mlplatform_client.v2.utils import wait_job_running, wait_job_succeeded
from mlplatform_client.v2.clients.spark import SparkClient
log = logging.getLogger()
ML_PLATFORM_HOST = "https://spark.tnz2g7qn.data.bizmrg.com"
KEYSTONE_USERNAME="uniq_user"
KEYSTONE_PASSWORD="pass"
def run():
cluster = SparkCluster(
SparkClient(
host=ML_PLATFORM_HOST,
auth=BasicAuth(username=KEYSTONE_USERNAME, password=KEYSTONE_PASSWORD),
skip_tls_verify=False
)
)
job_name = f"job-{str(uuid.uuid4()).encode().hex()[:8]}"
manifest = cluster.jobs.get_default_manifest(job_name)
manifest.set_executor_settings({"instances": 1, "cores": 1})
job = cluster.jobs.submit_pyjob(manifest, pyfile="jobs/test_job.py")
wait_job_running(job, delay=8)
log.info(job.info())
wait_job_succeeded(job, delay=8)
log.info(job.logs())
if name == "__main__":
run()
После этого остается только запустить Python-файл, и джоба начнет выполняться — ничего больше от дата-инженера не требуется.
Причем удобство метода не ограничивается простыми пайплайнами. Так, если нужно изменить какие-либо настройки (например, число экземпляров, количество выделяемых ядер CPU, размер оперативной памяти, число partition'ов), достаточно добавить в код еще один метод.
Например, возьмем уже рассмотренный ранее код:
cluster = SparkCluster(
SparkClient(
host=ML_PLATFORM_HOST,
auth=BasicAuth(username=KEYSTONE_USERNAME, password=KEYSTONE_PASSWORD),
skip_tls_verify=False
)
)
job_name = f"job-{str(uuid.uuid4()).encode().hex()[:8]}"
manifest = cluster.jobs.get_default_manifest(job_name)
manifest.set_executor_settings({"instances": 1, "cores": 1})
job = cluster.jobs.submit_pyjob(manifest, pyfile="jobs/test_job.py")
wait_job_running(job, delay=8)
log.info(job.info())
wait_job_succeeded(job, delay=8)
log.info(job.logs())
Чтобы внести в него такой параметр, как число партишенов, достаточно добавить еще один метод:
manifest.set_spark_conf({
"spark.sql.shuffle.partitions": "600"
})
То есть не надо вручную вносить изменения в манифесты, следить за правильностью отступов и другими нюансами — все сводится к указанию одного дополнительного параметра.
Таким образом, в нашей реализации дата-инженеры получают:
простой запуск из python (YAML и знание kubernetes не нужно);
понятный интерфейс для работы — все управление можно осуществлять с помощью одной библиотеки.
Пример более сложных кейсов
Мы рассмотрели простой пример, когда запускается одна джоба. Но на практике такие простые задачи встречаются редко. Поэтому немного усложним и посмотрим, как наш Python-клиент будет работать в кейсах, приближенных к реальным.
Для начала примем условие, что:
джобу необходимо запускать определенное количество раз;
требуется автоматизированное расписание запуска джобы и контроль зависимостей;
важно иметь возможность легко править код и параметры;
нужен прозрачный контроль выполнения задач.
Саму задачу оставляем прежней — преобразовать CSV в parquet с помощью Spark.
Для автоматизации воспользуемся Airflow.
Что ж, перейдем к обзору алгоритма.
Примечание: В рамках примера будем использовать Spark 3.5.1 и Airflow 2.7.1.
Итак, рассмотрим пример DAG в Airflow. Первым делом берем уже упомянутую задачу, которая конвертирует CSV в parquet.
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, IntegerType, DecimalType, StringType
from pyspark.sql.functions import StructType
def check_files_exists(spark: SparkSession, input_s3_path: str):
print(f"Checking input path: {input_s3_path}/customer.tbl")
try:
temp_df = spark.read.text(f"{input_s3_path}/customer.tbl").limit(1)
count = temp_df.count()
print(f"File accessible, found {count} line(s)")
except Exception as e:
print(f"Cannot access file: {e}")
raise
def convert_customer():
input_s3_path = os.environ.get('INPUT_S3_PATH')
output_s3_path = os.environ.get('OUTPUT_S3_PATH')
scale_factor = os.environ.get('SCALE_FACTOR')
customer_schema = StructType([
StructField("c_custkey", IntegerType(), True),
StructField("c_name", StringType(), True),
StructField("c_address", StringType(), True),
StructField("c_nationkey", IntegerType(), True),
StructField("c_phone", StringType(), True),
StructField("c_acctbal", DecimalType(15, 2), True),
StructField("c_mktsegment", StringType(), True),
StructField("c_comment", StringType(), True)
])
spark = SparkSession.builder \
.appName(f"Convert_Customer_SF{scale_factor}") \
.getOrCreate()
Далее мы на Python составляем типовой DAG для Airflow.
with DAG("csv-to-parquet",
default_args=DEFAULT_ARGS,
schedule=None,
catchup=False,
dagrun_timeout=timedelta(hours=16),
tags=[DEFAULT_ARGS["owner"]]
) as dag:
def check_environ_params(env_vars, **context):
os.environ.update(env_vars)
required_vars = ["ML_PLATFORM_HOST", "KEYSTONE_USERNAME",
"KEYSTONE_PASSWORD", "S3_ENDPOINT",
"AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY",
"AWS_REGION", "BUCKET_NAME"]
for var in required_vars:
if var not in os.environ:
raise ValueError(f"Required environment variable {var} is missing")
if not os.environ[var]:
raise ValueError(f"Environment variable {var} is empty")
for name, value in os.environ.items():
print(f"{name}: {value}")
Следом обе джобы складываем в S3, к которому подключен этот DAG.

Туда же загружаем Python-файл с DAG.

Далее просто переходим в Airflow, где уже будет доступен загруженный DAG. Находим его и нажимаем Play.

На этом подготовка завершена: сразу после в Airflow можно отслеживать процесс выполнения задач и статус их завершения («успешно» или «неудачно»).

Помимо этого, здесь же через интерфейс Airflow можно получить подробный отчет о выполнении каждой задачи: чтобы увидеть всю информацию о задаче, достаточно перейти во вкладку с логами.

К выводам
Безусловно, мы не первооткрыватели этого решения — подобная схема несколько раньше уже была реализована в сервисе Apache Livy. Вместе с тем в Livy есть несколько значительных недостатков — например:
надо поднимать и поддерживать отдельный микросервис Livy;
Livy взаимодействует непосредственно с мастер-нодой и воркер-нодами Spark, обходя Spark-оператор.
Реализовав свою схему, мы не просто исключили эти требования — мы сделали решение, с которым дата-инженеры могут запускать код PySpark напрямую из скриптов Python или создавать автоматизированные конвейеры без погружения в инфраструктурные детали. Это сокращает время от старта написания кода до выполнения задач и повышает продуктивность команд, работающих с большими данными в облачной среде.
А как со Spark работают дата-инженеры в вашей команде? Делитесь опытом.
eigrad
В озоне лет 5 назад выпиливал из кода подобный велосипед (правда в том месте все ещё был yarn), за счёт сильной связанности он усложнил переход на python3. Там тоже была обвязка, которая дублировала существующую стандартную функциональность. Чтобы избавиться от такой ситуации в будущем вводил декларативный подход - отдельно описывалось что где как и когда запускать, а в коде оставалась только реализация бизнес логики - датафреймы на вход -> датафреймы на выход (вход опционально, если источник внешний). На текущем месте радуюсь что все необходимые подобные абстракции уже реализованы в dagster, а Spark-over-k8s запускается в client mode (если хочется) без танцев с бубном (никаких yaml без строгой необходимости, вся нутрянка определяющая окружение защита в конфиге этого окружения, и ничего описанного в статье велосипедить не нужно). Но это конечно требует экспертизы и не отменяет большой работы по настройке.
Джоба которая пытается контролировать окружение - антипаттерн. Окружение должно запускать джобу и делать чтобы ей было хорошо, а не наоборот.
Но в целом то конечно пофиг, особенно если загрузка кода через бакет, а не git... Удобно наверное. Если в данных условиях процессы работают - то хорошо и все молодцы.
А Livy разве не потерял актуальность с появлением Spark Connect?