Утром заходим в дашборд выручки и видим нули по половине регионов. Пайплайн в Airflow отработал, тесты dbt test зелёные, в Sentry тишина. Через двадцать минут разборок выясняется простая вещь: бэкенд‑команда вчера выкатила релиз и поменяла значение order_status с awaiting_payment на pending_payment в API заказов. Схема таблицы не изменилась, тип поля тот же string, nullable тот же false. Изменился набор значений — и фильтр WHERE order_status = 'awaiting_payment' в витрине стал возвращать пустоту.
История, знакомая любой компании, где данные текут от продуктовых команд в общее хранилище. На один такой инцидент уходит полдня: найти причину, написать в Slack, согласовать фикс, обновить витрину, перезалить вчерашние данные. Через месяц повторяется на другой команде, через два — на третьей. К концу года дата‑инженеры тратят значимую долю времени на разбор «почему опять сломалось», а доверие бизнеса к аналитике падает быстрее, чем растут зарплаты.
Контракт данных — формальный способ договориться о структуре и поведении датасета между командой‑продюсером и командами‑консьюмерами. Разберём, как такой контракт устроен внутри, что в нём должно быть и как встроить его в существующий пайплайн без переписывания всей инфраструктуры.
Что такое контракт по факту
Контракт — это YAML или JSON‑файл, который лежит в репозитории команды‑продюсера рядом с кодом, генерирующим датасет. Формат свободный, но в индустрии понемногу устаканивается стандарт Open Data Contract Standard (ODCS) и формат от Data Contract CLI. Минимальный набор полей выглядит так:
dataContractSpecification: 1.1.0 id: orders.events.created info: title: Order Created Events version: 2.1.0 owner: team-checkout contact: email: checkout@example.com slack: "#checkout-team" servers: production: type: kafka host: kafka.prod.internal:9092 topic: orders.events.created.v2 terms: usage: | События создания заказа. Гарантируется доставка at-least-once. Дедупликация по полю event_id. noticePeriod: P3M sla: availability: 99.5% retention: P90D freshnessMinutes: 5 models: order_created: type: object fields: event_id: type: string format: uuid required: true unique: true order_id: type: string required: true pattern: "^ORD-[0-9]{10}$" order_status: type: string required: true enum: [created, paid, cancelled, refunded] amount_minor_units: type: long required: true minimum: 0 currency: type: string required: true enum: [RUB, USD, EUR] created_at: type: timestamp required: true
В этом куске зашита вся работа договорённости. Поле version: 2.1.0 — семантическая версия, по которой консьюмеры понимают, что менялось. noticePeriod: P3M означает, что продюсер обязан предупреждать о breaking changes за три месяца — это уже не техническая, а организационная договорённость, но она тоже часть контракта. enum для order_status— то самое место, через которое утром потёк дашборд из истории выше: если бэкенд хочет добавить pending_payment, ему нужно сначала обновить контракт, а это потребует ревью от команд‑консьюмеров.
Где контракт живёт и как обновляется
Контракт хранится в репозитории продюсера‑ там же, где код сервиса, который пишет эти данные. Не в отдельном репо с контрактами на всю компанию, не в Confluence, не в Notion. Рядом с кодом — потому что любое изменение схемы автоматически становится pull request'ом, который проходит через ревью и CI.
В CI запускаются проверки совместимости. Сравнивается новая версия контракта с предыдущей и определяется тип изменения:
Добавили опциональное поле — патч‑версия (2.1.0 → 2.1.1).
Добавили обязательное поле, но с дефолтом — минорная (2.1.0 → 2.2.0).
Удалили поле, поменяли тип, сузили enum — мажорная (2.1.0 → 3.0.0).
Мажорное изменение запускает отдельный воркфлоу: создаётся issue, в Slack уходит уведомление командам‑консьюмерам, начинается период депрекации старой версии. Старая версия не удаляется сразу, она живёт параллельно с новой минимум три месяца, чтобы консьюмеры успели мигрировать.
Для проверки совместимости удобно использовать datacontract-cli — open‑source CLI, который умеет сравнивать две версии и говорить, что именно изменилось:
$ datacontract diff orders.previous.yaml orders.current.yaml Breaking changes: - models.order_created.fields.order_status: enum value 'awaiting_payment' removed Non-breaking changes: - models.order_created.fields.discount_minor_units: field added (optional)
Этот же инструмент встраивается в GitHub Actions или GitLab CI и блокирует merge, если breaking changes есть, а версия не поднята мажорно.
Валидация на стороне продюсера
Контракт без проверки данных против него — мёртвая документация. Все её игнорируют, все её забывают обновлять, через полгода она расходится с реальностью. Чтобы такого не было, продюсер валидирует каждое событие в момент записи.
Для Kafka это удобно делать через Schema Registry — Avro или Protobuf схема генерируется из контракта, сериализатор отказывается отправлять сообщение, не соответствующее схеме. Для прямой записи в БД или для REST API подойдёт pydantic:
from pydantic import BaseModel, Field, field_validator from typing import Literal from datetime import datetime from decimal import Decimal from uuid import UUID import re class OrderCreated(BaseModel): event_id: UUID order_id: str order_status: Literal["created", "paid", "cancelled", "refunded"] amount_minor_units: int = Field(ge=0) currency: Literal["RUB", "USD", "EUR"] created_at: datetime @field_validator("order_id") @classmethod def validate_order_id_format(cls, v: str) -> str: if not re.match(r"^ORD-\d{10}$", v): raise ValueError(f"order_id must match ORD-NNNNNNNNNN, got {v}") return v def publish_order_event(raw_event: dict) -> None: try: validated = OrderCreated.model_validate(raw_event) except ValidationError as e: metrics.increment("contract.validation.failed", tags={"dataset": "orders.events.created"}) dead_letter_queue.send(raw_event, error=str(e)) logger.error("Contract violation", extra={"event": raw_event, "error": str(e)}) return kafka_producer.send("orders.events.created.v2", validated.model_dump_json()) metrics.increment("contract.validation.passed", tags={"dataset": "orders.events.created"})
Если бэкенд‑разработчик случайно отправит событие с order_status="pending_payment", pydantic его отрежет ещё до Kafka. Событие уйдёт в dead letter queue, метрика дернёт алерт, команда увидит проблему до того, как данные доедут до аналитического хранилища. Ловить ошибку нужно как можно ближе к источнику, а не в витрине через сутки.
В стриминговом сценарии тот же подход реализуется через Confluent Schema Registry для Avro или Buf Schema Registry для Protobuf. Брокер проверяет схему при публикации, и невалидное сообщение просто не доходит до топика.
Валидация на стороне консьюмера
Дата‑инженер не доверяет продюсеру даже после внедрения контрактов. Бывает, что валидация на стороне продюсера отключена в стейдже, бывает, что данные приехали из легаси‑системы без валидации, бывает, что кто‑то залил исторические данные напрямую в Kafka мимо сервиса. Поэтому на входе в свой пайплайн консьюмер проверяет схему ещё раз.
В dbt это делается через тесты в schema.yml:
version: 2 models: - name: stg_orders config: contract: enforced: true columns: - name: order_id data_type: string tests: - not_null - unique - dbt_utils.expression_is_true: expression: "regexp_like(order_id, '^ORD-[0-9]{10}$')" - name: order_status data_type: string tests: - not_null - accepted_values: values: [created, paid, cancelled, refunded] - name: amount_minor_units data_type: bigint tests: - not_null - dbt_utils.expression_is_true: expression: "amount_minor_units >= 0"
Опция contract: enforced: true в dbt включает проверку схемы материализованной модели против заявленной в YAML. Если в источнике появится новая колонка или поменяется тип, билд упадёт с понятной ошибкой, а не пройдёт молча, как было бы без контракта.
Для Spark и Flink подход тот же, только реализуется через Great Expectations или ручной assertSchema. Главное правило: упавшая проверка блокирует пайплайн, а не пишет warning в лог. Warning никто не читает, пайплайн с warning доезжает до витрин с битыми данными, и через две недели кто‑то спрашивает, почему отчёт показывает странное.
Что делать с breaking changes
Самая болезненная часть контрактов — это не их написание, а ситуация, когда продюсеру всё‑таки нужно сделать несовместимое изменение. Реальные сценарии: переход на новую платёжную систему с другим набором статусов, разделение поля address на структуру, удаление устаревшего поля.
Работающая практика — параллельное существование двух версий. Продюсер пишет в два топика одновременно: старый orders.events.created.v2 и новый orders.events.created.v3. Контракт v3 объявляется стабильным, контракт v2 помечается как deprecated с датой удаления через три месяца. В этот период консьюмеры мигрируют по своему графику. Через три месяца топик v2 отключается, его контракт переезжает в архив.
Для команд это означает дополнительную работу — поддерживать две версии записи. Но эта работа понятная и ограниченная по времени.
Что меняется в работе команд
Контракт — это не только техническая штука, это смена культуры. До контрактов продьюсер мог думать «я владею своим сервисом, делаю с ним что хочу». После — «у меня есть консьюмеры с SLA, они опираются на мои данные при принятии бизнес‑решений, и я несу за это ответственность».
В практическом плане появляется несколько вещей.
У каждого датасета есть владелец с почтой и каналом в Slack — раньше владельца искали по git blame. Изменения схемы обсуждаются на ревью с участием консьюмеров — раньше выкатывались тихо. У дата‑команды есть формальное основание блокировать релиз продуктовой команды, если он ломает контракт, — раньше приходилось договариваться по личным каналам.
При этом контракт не отменяет dbt‑тесты, не заменяет Great Expectations и не делает Schema Registry ненужным. Это слой выше — про договорённости между командами, а не про техническую валидацию. Хорошая дата‑платформа использует всё вместе: контракты для договорённостей, Schema Registry для типов и совместимости на бинарном уровне, dbt‑тесты для бизнес‑правил в витринах, Great Expectations для статистических проверок данных.
С чего начать внедрение
Внедрять контракты сразу по всей компании — плохая идея. Это месяцы планирования, сопротивление команд, политические разговоры на уровне CTO. Лучше работает другой подход: взять один критичный датасет, на котором уже горели, и сделать контракт для него.
Критерии выбора первого датасета: его данные попадают в финансовые или регуляторные отчёты, его ломали хотя бы раз за последние полгода, у него есть владелец, готовый сотрудничать. Дальше — написать YAML, договориться с продюсером о валидации на запись, добавить проверки в свой пайплайн, прогнать неделю и посмотреть, ловит ли валидация что‑то полезное.
Если за неделю валидация поймала хотя бы одно несоответствие — это уже окупает время на внедрение. Дальше масштабируется органически: команды видят пример, копируют структуру, через полгода в репозитории лежит десяток контрактов, через год — это рабочая практика.
Итого
Контракт данных — это YAML‑файл с описанием схемы, версией, SLA и владельцем, лежащий рядом с кодом продюсера. Он валидируется при записи на стороне продюсера через pydantic или Schema Registry, валидируется при чтении на стороне консьюмера через dbt‑тесты или Great Expectations, обновляется через pull request с автоматической проверкой совместимости и привязан к семантическому версионированию с правилами депрекации.
Технически контракт собирается за день для одного датасета. Организационно — становится рабочей практикой за полгода‑год, если есть поддержка со стороны команды дата‑платформы и хотя бы одного продуктового тимлида. Без поддержки контракты превращаются в очередной бюрократический слой и тихо умирают.

Хотите глубже разобраться в архитектуре данных, DWH, Data Lake, Data Lakehouse и пайплайнах? На курсе «Инженер данных» эти темы разбирают на практических задачах, с которыми дата‑инженеры сталкиваются в реальных проектах.
Перед стартом курса можно прийти на бесплатные открытые вебинары — это живые онлайн‑занятия с преподавателями, где можно познакомиться с форматом обучения, задать вопросы и разобрать практические кейсы.
3 июня в 20:00 — «Децентрализованная революция в управлении данными: Data Mesh и его четыре принципа». Записаться
Разберем, зачем компаниям Data Mesh, как он меняет подход к управлению данными и на каких принципах строится такая архитектура.17 июня в 20:00 — «DWH, Data Lake и Data Lakehouse: архитектурные различия и практическое применение». Записаться
Сравним три подхода к хранению и обработке данных: где лучше использовать DWH, когда нужен Data Lake и в каких случаях подходит Data Lakehouse.
Больше бесплатных открытых уроков июня — в дайджесте OTUS. Там собрали вебинары по разным IT‑направлениям: от инженерии данных и архитектуры до разработки, аналитики, инфраструктуры и управления