Утром заходим в дашборд выручки и видим нули по половине регионов. Пайплайн в Airflow отработал, тесты dbt test зелёные, в Sentry тишина. Через двадцать минут разборок выясняется простая вещь: бэкенд‑команда вчера выкатила релиз и поменяла значение order_status с awaiting_payment на pending_payment в API заказов. Схема таблицы не изменилась, тип поля тот же string, nullable тот же false. Изменился набор значений — и фильтр WHERE order_status = 'awaiting_payment' в витрине стал возвращать пустоту.

История, знакомая любой компании, где данные текут от продуктовых команд в общее хранилище. На один такой инцидент уходит полдня: найти причину, написать в Slack, согласовать фикс, обновить витрину, перезалить вчерашние данные. Через месяц повторяется на другой команде, через два — на третьей. К концу года дата‑инженеры тратят значимую долю времени на разбор «почему опять сломалось», а доверие бизнеса к аналитике падает быстрее, чем растут зарплаты.

Контракт данных — формальный способ договориться о структуре и поведении датасета между командой‑продюсером и командами‑консьюмерами. Разберём, как такой контракт устроен внутри, что в нём должно быть и как встроить его в существующий пайплайн без переписывания всей инфраструктуры.

Что такое контракт по факту

Контракт — это YAML или JSON‑файл, который лежит в репозитории команды‑продюсера рядом с кодом, генерирующим датасет. Формат свободный, но в индустрии понемногу устаканивается стандарт Open Data Contract Standard (ODCS) и формат от Data Contract CLI. Минимальный набор полей выглядит так:

dataContractSpecification: 1.1.0
id: orders.events.created
info:
  title: Order Created Events
  version: 2.1.0
  owner: team-checkout
  contact:
    email: checkout@example.com
    slack: "#checkout-team"

servers:
  production:
    type: kafka
    host: kafka.prod.internal:9092
    topic: orders.events.created.v2

terms:
  usage: |
    События создания заказа. Гарантируется доставка at-least-once.
    Дедупликация по полю event_id.
  noticePeriod: P3M
  
sla:
  availability: 99.5%
  retention: P90D
  freshnessMinutes: 5

models:
  order_created:
    type: object
    fields:
      event_id:
        type: string
        format: uuid
        required: true
        unique: true
      order_id:
        type: string
        required: true
        pattern: "^ORD-[0-9]{10}$"
      order_status:
        type: string
        required: true
        enum: [created, paid, cancelled, refunded]
      amount_minor_units:
        type: long
        required: true
        minimum: 0
      currency:
        type: string
        required: true
        enum: [RUB, USD, EUR]
      created_at:
        type: timestamp
        required: true

В этом куске зашита вся работа договорённости. Поле version: 2.1.0 — семантическая версия, по которой консьюмеры понимают, что менялось. noticePeriod: P3M означает, что продюсер обязан предупреждать о breaking changes за три месяца — это уже не техническая, а организационная договорённость, но она тоже часть контракта. enum для order_status— то самое место, через которое утром потёк дашборд из истории выше: если бэкенд хочет добавить pending_payment, ему нужно сначала обновить контракт, а это потребует ревью от команд‑консьюмеров.

Где контракт живёт и как обновляется

Контракт хранится в репозитории продюсера‑ там же, где код сервиса, который пишет эти данные. Не в отдельном репо с контрактами на всю компанию, не в Confluence, не в Notion. Рядом с кодом — потому что любое изменение схемы автоматически становится pull request'ом, который проходит через ревью и CI.

В CI запускаются проверки совместимости. Сравнивается новая версия контракта с предыдущей и определяется тип изменения:

  • Добавили опциональное поле — патч‑версия (2.1.0 → 2.1.1).

  • Добавили обязательное поле, но с дефолтом — минорная (2.1.0 → 2.2.0).

  • Удалили поле, поменяли тип, сузили enum — мажорная (2.1.0 → 3.0.0).

Мажорное изменение запускает отдельный воркфлоу: создаётся issue, в Slack уходит уведомление командам‑консьюмерам, начинается период депрекации старой версии. Старая версия не удаляется сразу, она живёт параллельно с новой минимум три месяца, чтобы консьюмеры успели мигрировать.

Для проверки совместимости удобно использовать datacontract-cli — open‑source CLI, который умеет сравнивать две версии и говорить, что именно изменилось:

$ datacontract diff orders.previous.yaml orders.current.yaml
Breaking changes:
  - models.order_created.fields.order_status: enum value 'awaiting_payment' removed
Non-breaking changes:
  - models.order_created.fields.discount_minor_units: field added (optional)

Этот же инструмент встраивается в GitHub Actions или GitLab CI и блокирует merge, если breaking changes есть, а версия не поднята мажорно.

Валидация на стороне продюсера

Контракт без проверки данных против него — мёртвая документация. Все её игнорируют, все её забывают обновлять, через полгода она расходится с реальностью. Чтобы такого не было, продюсер валидирует каждое событие в момент записи.

Для Kafka это удобно делать через Schema Registry — Avro или Protobuf схема генерируется из контракта, сериализатор отказывается отправлять сообщение, не соответствующее схеме. Для прямой записи в БД или для REST API подойдёт pydantic:

from pydantic import BaseModel, Field, field_validator
from typing import Literal
from datetime import datetime
from decimal import Decimal
from uuid import UUID
import re

class OrderCreated(BaseModel):
    event_id: UUID
    order_id: str
    order_status: Literal["created", "paid", "cancelled", "refunded"]
    amount_minor_units: int = Field(ge=0)
    currency: Literal["RUB", "USD", "EUR"]
    created_at: datetime
    
    @field_validator("order_id")
    @classmethod
    def validate_order_id_format(cls, v: str) -> str:
        if not re.match(r"^ORD-\d{10}$", v):
            raise ValueError(f"order_id must match ORD-NNNNNNNNNN, got {v}")
        return v


def publish_order_event(raw_event: dict) -> None:
    try:
        validated = OrderCreated.model_validate(raw_event)
    except ValidationError as e:
        metrics.increment("contract.validation.failed", tags={"dataset": "orders.events.created"})
        dead_letter_queue.send(raw_event, error=str(e))
        logger.error("Contract violation", extra={"event": raw_event, "error": str(e)})
        return
    
    kafka_producer.send("orders.events.created.v2", validated.model_dump_json())
    metrics.increment("contract.validation.passed", tags={"dataset": "orders.events.created"})

Если бэкенд‑разработчик случайно отправит событие с order_status="pending_payment", pydantic его отрежет ещё до Kafka. Событие уйдёт в dead letter queue, метрика дернёт алерт, команда увидит проблему до того, как данные доедут до аналитического хранилища. Ловить ошибку нужно как можно ближе к источнику, а не в витрине через сутки.

В стриминговом сценарии тот же подход реализуется через Confluent Schema Registry для Avro или Buf Schema Registry для Protobuf. Брокер проверяет схему при публикации, и невалидное сообщение просто не доходит до топика.

Валидация на стороне консьюмера

Дата‑инженер не доверяет продюсеру даже после внедрения контрактов. Бывает, что валидация на стороне продюсера отключена в стейдже, бывает, что данные приехали из легаси‑системы без валидации, бывает, что кто‑то залил исторические данные напрямую в Kafka мимо сервиса. Поэтому на входе в свой пайплайн консьюмер проверяет схему ещё раз.

В dbt это делается через тесты в schema.yml:

version: 2
models:
  - name: stg_orders
    config:
      contract:
        enforced: true
    columns:
      - name: order_id
        data_type: string
        tests:
          - not_null
          - unique
          - dbt_utils.expression_is_true:
              expression: "regexp_like(order_id, '^ORD-[0-9]{10}$')"
      - name: order_status
        data_type: string
        tests:
          - not_null
          - accepted_values:
              values: [created, paid, cancelled, refunded]
      - name: amount_minor_units
        data_type: bigint
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "amount_minor_units >= 0"

Опция contract: enforced: true в dbt включает проверку схемы материализованной модели против заявленной в YAML. Если в источнике появится новая колонка или поменяется тип, билд упадёт с понятной ошибкой, а не пройдёт молча, как было бы без контракта.

Для Spark и Flink подход тот же, только реализуется через Great Expectations или ручной assertSchema. Главное правило: упавшая проверка блокирует пайплайн, а не пишет warning в лог. Warning никто не читает, пайплайн с warning доезжает до витрин с битыми данными, и через две недели кто‑то спрашивает, почему отчёт показывает странное.

Что делать с breaking changes

Самая болезненная часть контрактов — это не их написание, а ситуация, когда продюсеру всё‑таки нужно сделать несовместимое изменение. Реальные сценарии: переход на новую платёжную систему с другим набором статусов, разделение поля address на структуру, удаление устаревшего поля.

Работающая практика — параллельное существование двух версий. Продюсер пишет в два топика одновременно: старый orders.events.created.v2 и новый orders.events.created.v3. Контракт v3 объявляется стабильным, контракт v2 помечается как deprecated с датой удаления через три месяца. В этот период консьюмеры мигрируют по своему графику. Через три месяца топик v2 отключается, его контракт переезжает в архив.

Для команд это означает дополнительную работу — поддерживать две версии записи. Но эта работа понятная и ограниченная по времени.

Что меняется в работе команд

Контракт — это не только техническая штука, это смена культуры. До контрактов продьюсер мог думать «я владею своим сервисом, делаю с ним что хочу». После — «у меня есть консьюмеры с SLA, они опираются на мои данные при принятии бизнес‑решений, и я несу за это ответственность».

В практическом плане появляется несколько вещей.

У каждого датасета есть владелец с почтой и каналом в Slack — раньше владельца искали по git blame. Изменения схемы обсуждаются на ревью с участием консьюмеров — раньше выкатывались тихо. У дата‑команды есть формальное основание блокировать релиз продуктовой команды, если он ломает контракт, — раньше приходилось договариваться по личным каналам.

При этом контракт не отменяет dbt‑тесты, не заменяет Great Expectations и не делает Schema Registry ненужным. Это слой выше — про договорённости между командами, а не про техническую валидацию. Хорошая дата‑платформа использует всё вместе: контракты для договорённостей, Schema Registry для типов и совместимости на бинарном уровне, dbt‑тесты для бизнес‑правил в витринах, Great Expectations для статистических проверок данных.

С чего начать внедрение

Внедрять контракты сразу по всей компании — плохая идея. Это месяцы планирования, сопротивление команд, политические разговоры на уровне CTO. Лучше работает другой подход: взять один критичный датасет, на котором уже горели, и сделать контракт для него.

Критерии выбора первого датасета: его данные попадают в финансовые или регуляторные отчёты, его ломали хотя бы раз за последние полгода, у него есть владелец, готовый сотрудничать. Дальше — написать YAML, договориться с продюсером о валидации на запись, добавить проверки в свой пайплайн, прогнать неделю и посмотреть, ловит ли валидация что‑то полезное.

Если за неделю валидация поймала хотя бы одно несоответствие — это уже окупает время на внедрение. Дальше масштабируется органически: команды видят пример, копируют структуру, через полгода в репозитории лежит десяток контрактов, через год — это рабочая практика.

Итого

Контракт данных — это YAML‑файл с описанием схемы, версией, SLA и владельцем, лежащий рядом с кодом продюсера. Он валидируется при записи на стороне продюсера через pydantic или Schema Registry, валидируется при чтении на стороне консьюмера через dbt‑тесты или Great Expectations, обновляется через pull request с автоматической проверкой совместимости и привязан к семантическому версионированию с правилами депрекации.

Технически контракт собирается за день для одного датасета. Организационно — становится рабочей практикой за полгода‑год, если есть поддержка со стороны команды дата‑платформы и хотя бы одного продуктового тимлида. Без поддержки контракты превращаются в очередной бюрократический слой и тихо умирают.

Хотите глубже разобраться в архитектуре данных, DWH, Data Lake, Data Lakehouse и пайплайнах? На курсе «Инженер данных» эти темы разбирают на практических задачах, с которыми дата‑инженеры сталкиваются в реальных проектах.

Перед стартом курса можно прийти на бесплатные открытые вебинары — это живые онлайн‑занятия с преподавателями, где можно познакомиться с форматом обучения, задать вопросы и разобрать практические кейсы.

  • 3 июня в 20:00 — «Децентрализованная революция в управлении данными: Data Mesh и его четыре принципа». Записаться
    Разберем, зачем компаниям Data Mesh, как он меняет подход к управлению данными и на каких принципах строится такая архитектура.

  • 17 июня в 20:00 — «DWH, Data Lake и Data Lakehouse: архитектурные различия и практическое применение». Записаться
    Сравним три подхода к хранению и обработке данных: где лучше использовать DWH, когда нужен Data Lake и в каких случаях подходит Data Lakehouse.

Больше бесплатных открытых уроков июня — в дайджесте OTUS. Там собрали вебинары по разным IT‑направлениям: от инженерии данных и архитектуры до разработки, аналитики, инфраструктуры и управления

Комментарии (0)