Продолжение статьи https://habr.com/ru/articles/1028290/

Твой ии-агент мог бы сыграть в сериале "Кибердеревня"
Твой ии-агент мог бы сыграть в сериале "Кибердеревня"

В прошлой серии

Мы поняли, что скелетом агента является долговременное состояние (durable state). Именно оно должно позволить ответить на скучные, но жизненно важные вопросы: какой ход активен, какой шаг уже выполнен, какой job держит lease, какой файл был исходным, какой результат можно выдать пользователю, какое подтверждение еще действительно.

В первой части мы разложили durable state на ход агента, шаг плана и событие. У нас появились такие сущности, как AgentTurn, AgentPlanItem, AgentEvent, и агент уже перестает быть нервным генератором текста, который живет ровно до первого рестарта процесса.

Но трех таблиц мало. Нужны еще разрешения, состояние диалоги/сессии, состояние проекта, фоновые задачи, механизм обработки фоновых задач (lease), счетчик и политика повторов, закладка событий (event cursor) и санитарная обработка payload-ов (payload sanitizer).

Что добавляется после AgentTurn, AgentPlanItem и AgentEvent

Минимальный набор первой части можно расширить так:

Сущность

Что хранит

Зачем нужна

ApprovalGrant

Выданное пользователем разрешение

Не спрашивать повторно одно и то же действие в рамках допустимого scope

SessionContext

Активный turn, профиль агента, краткую историю, pending approval

Восстановить диалог и текущую сцену сессии

ProjectContext

Активный проект, файлы, настройки, текущую операцию

Не дать двум тяжелым операциям одновременно менять один проект

BackgroundJob

Длинную операцию вне HTTP-запроса

Например, парсинг, workbook-операции, retry, progress, cancellation

WorkerHeartbeat

Присутствие и занятость исполнителя

Отличить долгую работу от умершего worker-а

Durable payload policy

Правила сохранения payload-ов

Не складывать base64, секреты и гигантские строки в event log

ApprovalGrant: подтверждение тоже должно быть durable

Approval - это юридически важная запись о том, что пользователь разрешил действие с конкретным scope. Если подтверждение живет только в памяти процесса, то после рестарта агент снова спросит то же самое или, что хуже, решит продолжить без понятного основания.

Разрешения привязаны к session_id, project_id, tool_name, mode, scope и expires_at. Это правильная форма: подтверждение не становится вечным. Пользователь мог разрешить править файлы на этом проекте, но это не значит, что агент получил право трогать все проекты, все файлы и все будущие операции.

Хороший approval grant должен быть узким. В идеале scope описывает не человеческую фразу «можно», а машинно проверяемые границы: project_id, tool_name, режим only_missing, срок действия. Тогда executor может принять решение без повторного похода к LLM.

Класс AprrovalGrant, она же таблица для хранения разрешений (прав доступа) на выполнение определенных действий или использование инструментов в какой-то системе, может выглядеть следующим образом

class ApprovalGrant(Base):
    __tablename__ = "approval_grants"
 
    grant_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
    session_id: Mapped[str] = mapped_column(String(200), index=True)
    project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
    tool_name: Mapped[str] = mapped_column(String(120), index=True)
    mode: Mapped[str] = mapped_column(String(40), index=True)
    scope: Mapped[dict] = mapped_column(JSON, default=dict)
    reason: Mapped[str | None] = mapped_column(Text, nullable=True)
    expires_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

Разбор полей (колонок):

  • grant_id: Уникальный идентификатор каждой записи (UUID). Генерируется автоматически.

  • session_id: ID сессии пользователя. Позволяет понять, в рамках какого сеанса выдано разрешение.

  • project_id: Связь с конкретным проектом (может быть пустым).

  • tool_name: Название инструмента, к которому запрашивается доступ (например, "база_данных", "отправка_email").

  • mode: Режим доступа (например, "чтение", "запись" или "админ").

  • scope: Дополнительные параметры в формате JSON. Позволяет хранить сложные настройки доступа в виде словаря.

  • reason: Текстовое описание того, зачем это разрешение было выдано.

  • expires_at: Срок годности разрешения. Если время вышло, доступ аннулируется.

  • created_at: Время создания записи (автоматически ставится текущее время UTC).

SessionContext: агент должен помнить, где он находится

Пока у Гэндальфа не было Session Context, он был серым
Пока у Гэндальфа не было Session Context, он был серым

SessionContext — это durable-состояние диалога. Не transcript, не полный лог сообщений и не “вся память агента”, а компактная техническая карточка текущей сессии.

Если AgentTurn отвечает на вопрос “какой запрос сейчас выполняется”, то SessionContext отвечает на вопрос “в какой сцене находится пользователь и агент”.

Например:

  • какой turn_id сейчас активен;

  • есть ли незавершенное подтверждение;

  • какой проект открыт;

  • какой профиль агента выбран;

  • какой краткий summary уже построен;

  • с какого события UI нужно продолжить чтение после reconnect;

  • какие операции сейчас нельзя запускать параллельно.

То есть SessionContext нужен не для философской “памяти”, а для скучной инженерной магии: закрыли вкладку, обновили страницу, перезапустили backend, worker умер, пользователь вернулся через час — и система все еще понимает, что происходит.

Примерная структура:

class SessionContext(Base):
    __tablename__ = "session_contexts"

    session_id: Mapped[str] = mapped_column(String(200), primary_key=True)
    user_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
    project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    active_turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
    active_job_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    status: Mapped[str] = mapped_column(String(40), default="idle", index=True)
    agent_profile: Mapped[str] = mapped_column(String(80), default="default")

    summary: Mapped[str | None] = mapped_column(Text, nullable=True)

    pending_approval: Mapped[dict | None] = mapped_column(JSON, nullable=True)

    event_cursor: Mapped[int] = mapped_column(default=0)
    context_version: Mapped[int] = mapped_column(default=1)

    last_user_message_at: Mapped[datetime | None] = mapped_column(nullable=True)
    last_agent_event_at: Mapped[datetime | None] = mapped_column(nullable=True)

    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
    )

Поле

Что хранит

Зачем нужно

session_id

ID сессии

Главный ключ состояния диалога

user_id

Пользователь

Изоляция сессий и прав

active_project_id

Текущий проект

Понимать рабочий контекст

active_turn_id

Текущий ход агента

Восстановить незавершенный turn

agent_profile

Режим/персона/настройки агента

Например, “код-ревьюер”, “переводчик”, “аналитик”

summary

Сжатая история

Не тащить весь transcript в каждый prompt

pending_approval

Ожидаемое подтверждение

Не потерять confirm после рестарта

event_cursor

Последнее доставленное событие

Догнать UI после reconnect

status

active, waiting_user, running, idle

Быстро понять состояние сессии

updated_at

Время обновления

Отладка, TTL, чистка старых сессий

Важно: SessionContext не должен превращаться в помойку. В него не надо складывать весь prompt, все ответы модели, base64 файлов и простыню traceback-ов. Для этого есть AgentEvent, файловое хранилище, blob storage и отдельные job-таблицы.

Хороший SessionContext маленький, скучный и восстанавливаемый.

ProjectContext: агент не должен пилить один проект двумя руками одновременно

Еще одна сущность, которая быстро становится необходимой, — ProjectContext.

Сессия отвечает за диалог. Проект отвечает за рабочую область.

Пользователь может открыть один проект в нескольких вкладках,, потом еще что-то сделать, потом нажать “повторить”. Если система не хранит durable-состояние проекта, два job-а могут одновременно начать менять одни и те же файлы.

И получится не AI-agent, а кибердеревенский комбайн, который одной рукой чинит забор, второй рукой уже его сносит.

сlass ProjectContext(Base):
    __tablename__ = "project_contexts"

    project_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    owner_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    active_operation_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    operation_lock: Mapped[dict | None] = mapped_column(JSON, nullable=True)

    latest_output_file_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    settings: Mapped[dict | None] = mapped_column(JSON, nullable=True)

    status: Mapped[str] = mapped_column(String(40), default="idle", index=True)

    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
    )

ProjectContext хранит:

Поле

Что хранит

project_id

ID проекта

owner_id

Владелец

active_operation_id

Текущая тяжелая операция

operation_lock

Мягкая блокировка проекта

latest_output_file_id

Последний результат

settings

Настройки проекта

status

idle, processing, needs_review, failed

updated_at

Последнее изменение

Это не обязательно жесткий database lock. Чаще достаточно прикладной блокировки: “в этом проекте уже идет операция типа workbook_write, вторую такую же не запускаем”.

Например, можно разрешить читать файл и строить preview, но запретить одновременно две операции, которые пишут результат в один и тот же output slot.

BackgroundJob: долгая работа не должна жить внутри HTTP-запроса

Твой агент запустил бэкграунд джоб
Твой агент запустил бэкграунд джоб

Если операция может занять больше пары секунд, она должна стать job-ом.

HTTP-запрос может принять задачу, проверить права, создать AgentTurn, положить BackgroundJob в очередь и вернуть пользователю состояние: “задача принята”. А дальше работает worker.

Класс джоб будет таким

class Job(Base): tablename = “jobs”

class Job(Base):
    __tablename__ = "operation_jobs"

    job_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)

    turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)
    project_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    type: Mapped[str] = mapped_column(String(80), index=True)
    status: Mapped[str] = mapped_column(String(40), default="queued", index=True)

    attempt: Mapped[int] = mapped_column(default=0)
    max_attempts: Mapped[int] = mapped_column(default=3)

    next_attempt_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True)

    lease_owner: Mapped[str | None] = mapped_column(String(200), nullable=True, index=True)
    lease_expires_at: Mapped[datetime | None] = mapped_column(nullable=True, index=True)

    progress_seq: Mapped[int] = mapped_column(default=0)

    input: Mapped[dict | None] = mapped_column(JSON, nullable=True)
    output: Mapped[dict | None] = mapped_column(JSON, nullable=True)
    error: Mapped[dict | None] = mapped_column(JSON, nullable=True)

    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
    )

Поле

Что хранит

job_id

ID задачи

turn_id

К какому turn относится

project_id

В каком проекте выполняется

type

translate, parse_workbook, render, export

status

queued, running, completed, failed, cancelled

attempt

Номер попытки

max_attempts

Лимит повторов

next_attempt_at

Когда можно retry

lease_owner

Какой worker забрал задачу

lease_expires_at

Когда lease протухает

progress_seq

Монотонный номер progress-события

input

Санитизированный input

output

Ссылка на результат

error

Классифицированная ошибка

Ключевой момент — worker не просто берет задачу. Он атомарно claim-ит ее:

UPDATE background_jobs SET status = 'running', lease_owner = :worker_id, lease_expires_at = :now + interval '2 minutes' WHERE job_id = :job_id AND status = 'queued' AND (next_attempt_at IS NULL OR next_attempt_at <= :now);

Если обновилась одна строка — worker владеет задачей. Если ноль строк — кто-то уже забрал.

Lease нужен, потому что worker может умереть. Не “вернуть ошибку”, не “аккуратно завершиться”, а просто исчезнуть. После истечения lease_expires_at другой worker может подобрать задачу и продолжить или перезапустить ее с учетом idempotency.

EventCursor: UI должен догонять события, а не молиться на websocket

Live stream — это приятно, но websocket не является durable state.

Пользователь закрыл ноутбук, сеть моргнула, вкладка перезагрузилась. Если события жили только в памяти процесса, прогресс потерян. Поэтому UI должен читать события из AgentEvent по cursor-у.

Условный сценарий:

  1. UI подписался на события turn-а.

  2. Получил события до event_seq = 42.

  3. Соединение оборвалось.

  4. UI reconnect-ится и говорит: “дай события после 42”.

  5. Backend читает durable event log и отдает 43, 44, 45....

Так интерфейс перестает зависеть от идеальной сети.

class EventCursor(Base):
    __tablename__ = "event_cursors"

    cursor_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)

    session_id: Mapped[str] = mapped_column(String(200), index=True)
    turn_id: Mapped[uuid.UUID | None] = mapped_column(nullable=True, index=True)

    consumer_id: Mapped[str] = mapped_column(String(200), index=True)

    last_event_seq: Mapped[int] = mapped_column(BigInteger, default=0)

    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
    )

    __table_args__ = (
        UniqueConstraint(
            "session_id",
            "turn_id",
            "consumer_id",
            name="uq_event_cursor_consumer",
        ),
    )

event_seq лучше делать монотонным внутри turn_id или session_id. Не надо использовать только timestamp: у двух событий может быть одинаковое время, а порядок все равно важен.

Управление агентом с ходами и состояниями чем-то напоминает пошаговую стратегию
Управление агентом с ходами и состояниями чем-то напоминает пошаговую стратегию

Durable payload policy: event log не мусорный бак

Отдельно стоит прописать политику payload-ов.

Почти в каждом агенте рано или поздно появляется соблазн: “давайте просто положим весь JSON в event payload”. Через месяц в event log лежат base64-файлы, токены доступа, гигантские HTML-страницы, персональные данные и ответы модели на 300 килобайт.

Правило простое: durable event должен хранить факт, ссылку и короткий summary, а не весь мир.

Плохо:

{ "event_type": "file_processed", "payload": { "file_base64": "UEsDBBQAAAA...", "openai_api_key": "sk-...", "full_html": "<html>..." } }

Хорошо:

{ "event_type": "file_processed", "payload": { "file_id": "file_123", "mime_type": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "rows_count": 1842, "result_file_id": "file_456" } }

Payload sanitizer должен вырезать:

  • секреты;

  • base64 и бинарные данные;

  • слишком длинные строки;

  • полные prompt-ы без необходимости;

  • персональные данные, если они не нужны для восстановления процесса.

Идеально, если sanitizer применяется централизованно перед записью AgentEvent, а не “по договоренности между разработчиками”. Договоренность живет до первого пятничного hotfix-а.

Все это ложится в один централизованный слой перед добавлением AgentEvent

import json
import re
from typing import Any


SECRET_KEY_RE = re.compile(
    r"(api[_-]?key|token|secret|password|authorization|cookie|access[_-]?token|refresh[_-]?token)",
    re.IGNORECASE,
)

BASE64_RE = re.compile(r"^[A-Za-z0-9+/]+={0,2}$")


class EventPayloadSanitizer:
    max_depth = 8
    max_string_length = 2_000
    max_list_items = 100
    max_payload_bytes = 32_000

    def sanitize(self, payload: dict[str, Any] | None) -> dict[str, Any] | None:
        if payload is None:
            return None

        sanitized = self._sanitize_value(payload, depth=0)

        encoded = json.dumps(sanitized, ensure_ascii=False, default=str)
        if len(encoded.encode("utf-8")) > self.max_payload_bytes:
            return {
                "summary": "payload_too_large",
                "original_size_bytes": len(encoded.encode("utf-8")),
            }

        return sanitized

    def _sanitize_value(self, value: Any, depth: int) -> Any:
        if depth > self.max_depth:
            return "[redacted:max_depth]"

        if isinstance(value, dict):
            result = {}

            for key, item in value.items():
                key_str = str(key)

                if SECRET_KEY_RE.search(key_str):
                    result[key_str] = "[redacted:secret]"
                    continue

                result[key_str] = self._sanitize_value(item, depth + 1)

            return result

        if isinstance(value, list):
            items = value[: self.max_list_items]
            result = [self._sanitize_value(item, depth + 1) for item in items]

            if len(value) > self.max_list_items:
                result.append(f"[truncated:{len(value) - self.max_list_items}_items]")

            return result

        if isinstance(value, str):
            return self._sanitize_string(value)

        return value

    def _sanitize_string(self, value: str) -> str:
        if self._looks_like_base64(value):
            return "[redacted:base64]"

        if len(value) > self.max_string_length:
            return value[: self.max_string_length] + f"...[truncated:{len(value)} chars]"

        return value

    def _looks_like_base64(self, value: str) -> bool:
        compact = value.strip()

        if len(compact) < 256:
            return False

        if len(compact) % 4 != 0:
            return False

        return bool(BASE64_RE.fullmatch(compact))

Как выглядит путь одного запроса

Соберем все вместе на сценарии: пользователь просит, например, обработать файл и применить проектные настройки.

  • API принимает запрос и нормализует session_id. Если есть client_turn_id или idempotency_key, система проверяет, не запускали ли такой turn раньше.

  • AgentService создает AgentTurn и строит TurnPlan. Шаги плана попадают в AgentPlanItem.

  • ApprovalService оценивает риск. Если нужен confirm, в SessionContext сохраняется pending_approval, а в agent_events появляется approval_requested.

  • Пользователь подтверждает. Система создает ApprovalGrant с ограниченным scope и снимает pending_approval.

  • Executor запускает шаг. Для долгой операции создается TranslateJob или WorkbookJob, а turn получает события tool_started и job_queued.

  • Worker атомарно claim-ит job, выставляет lease_owner и lease_expires_at, обновляет progress_seq и публикует progress.

  • UI читает live stream. Если вкладка закрылась, после reconnect он догоняет события из БД.

  • При успехе job получает completed и output_file_id, проект обновляет last_translated_file_id, turn получает финальное событие.

  • При retryable-ошибке job возвращается в queued с next_attempt_at. При terminal-ошибке сохраняются error и классификация отказа.

В коде контракт между слоями можно выразить вот так

class AgentRequestHandler:
    def handle(self, request: AgentRequest) -> AgentTurn:
        session_id = normalize_session_id(request.session_id)

        with self.db.transaction():
            existing_turn = self.turns.find_by_idempotency_key(
                session_id=session_id,
                idempotency_key=request.idempotency_key or request.client_turn_id,
            )
            if existing_turn is not None:
                return existing_turn

            turn = self.agent_service.create_turn(
                session_id=session_id,
                project_id=request.project_id,
                user_input=request.input,
                idempotency_key=request.idempotency_key,
                client_turn_id=request.client_turn_id,
            )

            plan = self.agent_service.build_plan(turn)
            self.agent_service.save_plan_items(turn, plan)

            approval = self.approval_service.assess(turn, plan)

            if approval.required:
                self.sessions.set_pending_approval(
                    session_id=session_id,
                    approval=approval.to_pending_payload(),
                )
                self.events.write(
                    turn_id=turn.turn_id,
                    session_id=session_id,
                    type="approval_requested",
                    payload=approval.to_event_payload(),
                )
                return turn

            self.executor.enqueue_ready_steps(turn, plan)

            return turn

И важная часть дял воркера:

class JobWorker:
    def run_once(self) -> None:
        job = self.jobs.claim_next(
            lease_owner=self.worker_id,
            lease_seconds=60,
        )

        if job is None:
            return

        try:
            self.events.write(
                turn_id=job.turn_id,
                job_id=job.job_id,
                project_id=job.project_id,
                type="job_started",
                payload={"job_id": str(job.job_id), "type": job.type},
            )

            output = self.execute(job)

            self.jobs.complete(job.job_id, output=output)

            self.events.write(
                turn_id=job.turn_id,
                job_id=job.job_id,
                project_id=job.project_id,
                type="job_completed",
                payload=output,
            )

        except RetryableJobError as exc:
            self.jobs.schedule_retry(job.job_id, error=exc.to_payload())

        except TerminalJobError as exc:
            self.jobs.fail(job.job_id, error=exc.to_payload())

Телеграм канал автора, где он что‑то пишет про ML, NLP и разработку

Комментарии (0)