Введение
В этой статье я наглядно покажу, как организовать нагрузочное тестирование с использованием Python и фреймворка Locust, опираясь на инженерные практики и удобную архитектуру. Цель статьи — дать вам готовый шаблон, с которым вы сможете начать писать свои нагрузочные тесты уже завтра в рамках реального проекта.
Нагрузочное тестирование становится всё более востребованным. Если раньше оно ассоциировалось в основном с тяжеловесным интерфейсом Apache JMeter, то сегодня в арсенале инженеров появились гораздо более удобные и гибкие инструменты: Locust, k6, Gatling, Artillery и другие. Они позволяют за несколько часов поднять полноценные тесты и встроить их в CI/CD.
В этой статье мы:
посмотрим, как реализовать полноценный нагрузочный фреймворк на Python и Locust;
разберём структуру сценариев, а также архитектуру
TaskSet
иUser
-классов;научимся централизованно задавать конфигурацию через
.conf
-файлы;обсудим, что такое сидинг, зачем он нужен и как он влияет на метрики;
кратко пройдёмся по видам тестирования производительности: нагрузочное, стресс, пиковое, тестирование стабильности и др.;
и, наконец, запустим тесты в CI/CD через GitHub Actions, а HTML-отчёт опубликуем на GitHub Pages.
В качестве тестируемого сервиса мы будем использовать FakeBank API — учебный REST API, позволяющий работать с фейковыми банковскими операциями. Все запросы будут отправляться на: https://api.sampleapis.com/fakebank/accounts.
Важно! Это тестовый API с ограничением по количеству запросов, поэтому мы сознательно не будем создавать избыточную нагрузку. Задача статьи — не "завалить" сервис, а показать, как правильно организовать инфраструктуру нагрузочного тестирования.
Вы получите не просто рабочий пример, а цельный фреймворк, с которым можно работать, масштабировать, адаптировать под свои сценарии и запускать из CI/CD. По ходу мы разберём множество архитектурных и практических деталей — от построения сценариев до публикации результатов.
Тестирование производительности: что это и зачем?
Прежде чем перейти к практике, давайте разберёмся в базовых терминах. Очень часто тестирование производительности путают с «нагрузкой ради нагрузки»: запускают 1000 пользователей на один endpoint и с гордостью называют это "нагрузочным тестом". Такой подход редко даёт реальную пользу — чаще всего он ведёт к искаженному восприятию метрик и неправильным выводам.
Что такое тестирование производительности?
Тестирование производительности — это систематическая проверка, как приложение ведёт себя под разной нагрузкой, с целью выявить узкие места, понять поведение системы и спрогнозировать её устойчивость.
Оно включает в себя несколько подвидов:
Нагрузочное тестирование (Load Testing) — проверка, как система работает при ожидаемой (или слегка повышенной) нагрузке.
Стресс-тестирование (Stress Testing) — проверка пределов системы: при каком уровне нагрузки она начинает деградировать.
Тестирование стабильности (Stability / Endurance / Soak Testing) — запуск системы под умеренной нагрузкой в течение длительного времени (например, ночь или сутки), чтобы выявить утечки памяти и деградацию.
Тестирование масштабируемости (Scalability Testing) — анализ, как производительность меняется при увеличении ресурсов (CPU, количество реплик и т.д.).
В рамках этой статьи мы сфокусируемся на классическом нагрузочном тестировании, когда мы хотим понять:
Как быстро отвечает сервис под определённым количеством одновременных пользователей?
Насколько стабильно ведёт себя API при увеличении RPS?
Где потенциальные точки отказа?
Что именно мы тестируем?
Важно понимать: нагрузочное тестирование — это не гонка за цифрами. Оно не сводится к запуску 10 000 пользователей на один endpoint вроде /ping
и последующей гордости за полученные 150 RPS. Такой подход может красиво смотреться на графике, но не говорит ровным счётом ничего о реальной устойчивости системы.
Наша цель — смоделировать поведение живых пользователей. Не synthetic load, а то, как пользователи реально двигаются по системе, с какими данными работают, в каком порядке совершают действия. Иначе мы просто тестируем абстрактный HTTP-сервер, а не продукт.
Что это значит на практике? Вместо "битья" по одному endpoint, мы собираем конкретные пользовательские флоу — например:
пользователь заходит в систему, авторизуется, открывает список счетов и делает перевод;
другой пользователь получает уведомление, проверяет историю операций, загружает документ;
администратор просматривает отчёты и выгружает данные за квартал.
Такие цепочки — это сценарии использования (use cases). Именно они нагружают систему как единое целое: затрагивают авторизацию, базу данных, кэш, бизнес-логику, генерацию PDF, внешние сервисы.
Именно такие сценарии:
помогают выявить реальные узкие места: медленные JOIN'ы, перегретые очереди, ошибки сериализации;
дают ценную информацию бизнесу: где пользователь может столкнуться с фрустрацией;
позволяют заранее увидеть деградацию при росте аудитории — ещё до того, как это произойдёт в продакшене.
Даже если у вас мало времени — лучше один осмысленный сценарий, чем тысяча безликих запросов. Хорошо составленный сценарий — это основа эффективного нагрузочного теста.
Какие метрики важны?
Чтобы нагрузочное тестирование приносило пользу, важно понимать, что именно нужно измерять. В любом тесте под нагрузкой есть два слоя наблюдения: клиентский и инфраструктурный.
На клиентской стороне — то, что мы видим из Locust: как быстро API отвечает, сколько запросов проходит в секунду, сколько из них завершаются с ошибками. Один из ключевых показателей здесь — время отклика (latency). Если, к примеру, пользователь видит, что страница с операциями загружается не за 300 мс, а за 3 секунды, это становится проблемой — даже если сервер формально не падает.
Другой важный параметр — RPS (requests per second). Он показывает, сколько запросов система реально обрабатывает при заданной нагрузке. Если при 100 пользователях мы получаем только 20 RPS, это сигнал задуматься: не слишком ли много времени уходит на внутренние операции?
Кроме того, нужно следить за уровнем ошибок. Допустим, при 500 RPS появляются ответы 5xx или 429 — значит, сервис уже не справляется, и нужно искать бутылочное горлышко.
Не менее важно понимать дисперсию отклика. Бывает, что среднее время — 300 мс, но 90-й процентиль — 2 секунды. Это означает, что часть пользователей стабильно получает очень медленный отклик. Такие перекосы крайне опасны в пользовательских системах: один клиент счастлив, а другой негодует.
И наконец, полезно отслеживать успешность сценариев: как часто они доходят до конца, где рвутся, какие шаги «проваливаются» — всё это помогает понять устойчивость не отдельных endpoint'ов, а именно бизнес-флоу.
С другой стороны, есть инфраструктурные метрики — они дают понимание того, что происходит "внутри" системы. Это, прежде всего, нагрузка на CPU и потребление оперативной памяти. Если при росте RPS CPU сразу уходит в 100%, значит, дальше масштабироваться не получится без оптимизации кода или увеличения ресурсов.
Также важны количество открытых соединений, активность диска (IO), состояние баз данных и кэшей. Например, при пиковых значениях может «задохнуться» Redis, очередь сообщений или файловое хранилище. Бывает, что кэш забит, очередь растёт — а ошибки на клиенте появляются через 2 минуты, когда всё уже упало.
Такие метрики обычно собираются через внешние системы мониторинга: Grafana, Prometheus, Datadog, NewRelic. Они позволяют строить дашборды, отслеживать тренды и мгновенно реагировать на отклонения.
А как с этим быть у нас?
В рамках этой статьи мы работаем с учебным API, который для нас — по сути, black-box. Мы не можем влезть внутрь и посмотреть системные метрики, так как это не наш сервис.
Поэтому в данной статье мы ограничимся клиентскими метриками — теми, которые нам даст HTML-отчёт Locust.
Цель этой статьи — не полноценный анализ архитектуры, а техническая сторона написания хорошо организованных нагрузочных тестов: как структурировать код, как задавать сценарии, как подключать сидинг, как запускать в CI/CD и т.д.
Если у вас есть доступ к системным метрикам (через Grafana, Prometheus, Datadog, NewRelic и т.д.), обязательно подключайте их в связке с Locust. Только так можно получить полную картину производительности.
Технологии
Вот стек инструментов, которые мы будем использовать:
Python 3.12 — для написания нагрузочных тестов
Locust — инструмент симуляции нагрузки
HTTPX — для отправки запросов к REST API
Pydantic — для сериализации, десериализации и валидации данных
Pydantic Settings — для удобной работы с конфигурацией проекта
Faker — для генерации случайных данных
Почему именно Locust?
В экосистеме нагрузочного тестирования существует множество инструментов: JMeter, k6, Gatling, Artillery. У каждого — свои сильные стороны. Но если вы работаете с Python, Locust — безусловный лидер. Вот почему:
Нативный Python. Сценарии пишутся на чистом Python, без XML, YAML или UI-конфигов. Это позволяет использовать привычные конструкции языка, логировать, отлаживать, импортировать свои модули и писать тесты с максимальной гибкостью.
Хорошая документация и активное сообщество. Locust активно развивается, регулярно обновляется и имеет большое сообщество разработчиков. На GitHub и StackOverflow легко найти ответы на вопросы, примеры расширений и best practices.
Гибкость и кастомизация. Locust позволяет точно настраивать поведение пользователей: задавать веса задач, использовать SequentialTaskSet, комбинировать сценарии, задавать события на старте и завершении, переопределять поведение на любом уровне.
Поддержка хуков, событий и расширений. Можно использовать event hooks для логирования, метрик, интеграции с внешними системами. Это особенно полезно в связке с CI/CD или системами мониторинга.
Кастомные API-клиенты на базе HTTPX
Хотя в Locust есть встроенный HTTP-клиент, мы сознательно отказываемся от него в пользу собственного решения. Причина проста: нам нужен универсальный API-клиент, который будет использоваться не только внутри сценариев Locust, но и при генерации тестовых данных (сидинге), а также — при необходимости — в интеграционных автотестах. Это позволяет не дублировать логику и работать с API единообразно в разных контекстах.
Встроенный клиент Locust хорош для простых примеров, но он не даёт необходимой гибкости. В нём нет строгой типизации, он жёстко завязан на структуру Locust, его сложно конфигурировать централизованно и практически невозможно удобно переиспользовать вне контекста нагрузочного теста. Поэтому мы создадим собственный HTTP-клиент, основанный на связке HTTPX и Pydantic.
Наш кастомный клиент будет изолированным и модульным. Он будет валидировать входящие и исходящие данные с помощью моделей Pydantic, централизованно управляться через конфигурацию и при необходимости — логировать запросы и ответы. Такой подход позволяет отделить инфраструктурную часть (нагрузку, сценарии, конфигурацию) от логики общения с API.
В результате у нас получится чистая и масштабируемая архитектура, в которой тестовые сценарии остаются простыми и читаемыми, а вся логика взаимодействия с API сосредоточена в одном, легко поддерживаемом месте.
Почему HTTPX, а не Requests?
Библиотека Requests хоть и популярна, но уже давно перестала активно развиваться. У неё до сих пор нет встроенной аннотации типов, хотя эта возможность появилась в Python ещё в версии 3.5 (а на момент написания статьи скоро выйдет уже 3.14). Прошло более 10 лет, но в Requests так и не добавили полноценную поддержку аннотаций типов, что говорит о слабой эволюции библиотеки.
Requests в своё время завоевала популярность благодаря простоте и отсутствию достойных альтернатив. Но сегодня есть более современное, мощное и удобное решение — HTTPX.
Что даёт HTTPX по сравнению с Requests:
Встроенные аннотации типов
Удобный объект Client для повторного использования соединений
Event hooks (хуки для обработки событий)
Полноценная поддержка асинхронных запросов
Поддержка HTTP/2
Современная и понятная документация
Множество других полезных возможностей
Всё это делает HTTPX более гибким, производительным и удобным инструментом. Requests, скорее всего, так и останется на уровне синхронных запросов без поддержки современных возможностей.
Если вы не пишете legacy-код и создаёте новый проект нагрузочных тестов на Python, то HTTPX — очевидный выбор.
Почему Pydantic?
Тут всё просто: Pydantic — это стандарт де-факто для работы с данными в Python.
Он позволяет:
Валидировать данные на основе аннотаций типов
Удобно сериализовать/десериализовать JSON
Гарантировать строгую типизацию на уровне данных
Работать с конфигурацией проекта через Pydantic Settings
У Pydantic почти нет достойных альтернатив. Стандартные dataclasses в Python даже близко не дают такой же функциональности, особенно валидации и строгой типизации данных.
Если вам нужны чистые, предсказуемые и надёжные данные — Pydantic обязателен в любом современном проекте.
Модели для описания структур данных
Прежде чем работать с API https://api.sampleapis.com/fakebank/accounts, необходимо определить структуру данных, которые мы будем отправлять и получать.
Для этого используем Pydantic, так как он:
Позволяет автоматически валидировать данные
Поддерживает сериализацию и десериализацию JSON
Позволяет использовать встроенные валидаторы, такие как HttpUrl и EmailStr, для проверки корректности данных
Обеспечивает удобную типизацию
Мы определим:
CreateOperationSchema
– модель для создания новой операцииUpdateOperationSchema
– модель для обновления данных (используется для PATCH запросов)OperationSchema
– расширенная модель с id, представляющая конечную структуру операцииOperationsSchema
– контейнер для списка операций
{
"id": 25,
"debit": 6.99,
"credit": null,
"category": "Merchandise",
"description": "Benderbräu",
"transactionDate": "2016-02-25"
}
from datetime import date
from pydantic import BaseModel, Field, RootModel, ConfigDict
class CreateOperationSchema(BaseModel):
"""
Модель для создания новой банковской операции.
Поля:
- debit (float | None): Сумма списания со счёта
- credit (float | None): Сумма зачисления на счёт
- category (str): Категория операции
- description (str): Описание операции
- transaction_date (date): Дата транзакции (передаётся в формате "transactionDate")
"""
model_config = ConfigDict(populate_by_name=True) # Позволяет использовать alias при сериализации/десериализации
debit: float | None
credit: float | None
category: str
description: str
transaction_date: date = Field(alias="transactionDate") # Указываем alias для соответствия API
class UpdateOperationSchema(BaseModel):
"""
Модель для обновления банковской операции (используется в PATCH запросах).
Все поля являются необязательными, так как можно обновлять только часть данных.
Поля:
- debit (float | None): Новая сумма списания
- credit (float | None): Новая сумма зачисления
- category (str | None): Новая категория
- description (str | None): Новое описание
- transaction_date (date | None): Новая дата транзакции (alias "transactionDate")
"""
model_config = ConfigDict(populate_by_name=True)
debit: float | None
credit: float | None
category: str | None
description: str | None
transaction_date: date | None = Field(alias="transactionDate")
class OperationSchema(CreateOperationSchema):
"""
Модель банковской операции, содержащая ID.
Наследуется от CreateOperationSchema и добавляет поле:
- id (int): Уникальный идентификатор операции
"""
id: int
class OperationsSchema(RootModel):
"""
Контейнер для списка операций.
Поле:
- root (list[OperationSchema]): Список операций
"""
root: list[OperationSchema]
CreateOperationSchema
– используется при создании новой операции. Включает поля для суммы списания (debit
), суммы зачисления (credit
), категории (category
), описания (description
) и даты (transaction_date
).UpdateOperationSchema
– предназначена для PATCH-запросов, где можно передавать только изменяемые поля. Все параметры здесь опциональны, так как частичное обновление не требует передачи всех данных.OperationSchema
– расширенная версияCreateOperationSchema
, добавляет полеid
, которое присваивается сервером. Используется для представления операции в ответах API.OperationsSchema
– список операций. Наследуется от RootModel, что позволяет работать с массивами объектов в Pydantic.
Почему Pydantic, а не TypedDict или dataclasses?
Pydantic – это лучшее решение для работы с API-данными.
TypedDict и NamedTuple – подходят больше для аннотаций типов, но не для валидации данных и сериализации.
dataclasses – могут быть альтернативой, но у них нет встроенной валидации и сериализации JSON. Они работают хорошо в локальных моделях, но для API Pydantic – гораздо удобнее.
Pydantic позволяет автоматически проверять данные, использовать алиасы для полей и работать с JSON без дополнительных преобразований.
Генерация фейковых данных
При отправке запросов к API нам нужно создавать множество случайных данных для разных сценариев. Чтобы автоматизировать этот процесс и избавиться от ручного заполнения, используем библиотеку Faker и реализуем класс Fake
. Класс Fake
будет предоставлять удобный интерфейс для генерации нужных значений.
from datetime import date, timedelta
from faker import Faker
class Fake:
"""
Класс-обертка над Faker, предоставляющий удобные методы генерации фейковых данных
для банковских операций.
"""
def __init__(self, faker: Faker):
"""
Инициализирует объект Fake с экземпляром Faker.
:param faker: Экземпляр Faker для генерации случайных данных.
"""
self.faker = faker
def date(self, start: timedelta = timedelta(days=-30), end: timedelta = timedelta()) -> date:
"""
Генерирует случайную дату в заданном диапазоне.
:param start: Начальный диапазон (по умолчанию -30 дней от текущей даты).
:param end: Конечный диапазон (по умолчанию сегодняшняя дата).
:return: Случайная дата в заданном диапазоне.
"""
return self.faker.date_between(start_date=start, end_date=end)
def money(self, start: float = -100, end: float = 100) -> float:
"""
Генерирует случайную сумму денег.
:param start: Минимальное значение (по умолчанию -100).
:param end: Максимальное значение (по умолчанию 100).
:return: Случайное число с плавающей запятой в заданном диапазоне.
"""
return self.faker.pyfloat(min_value=start, max_value=end)
def category(self) -> str:
"""
Генерирует случайную категорию расходов.
:return: Одна из предопределенных категорий ('food', 'taxi', 'fuel' и т.д.).
"""
return self.faker.random_element(['food', 'taxi', 'fuel', 'beauty', 'restaurants'])
def sentence(self) -> str:
"""
Генерирует случайное описание операции.
:return: Строка с описанием.
"""
return self.faker.sentence()
# Создаем глобальный экземпляр `fake`, который будем использовать в других модулях.
fake = Fake(faker=Faker())
Класс Fake
инкапсулирует логику библиотеки Faker и предоставляет удобный API для работы. Теперь вместо множества вызовов Faker().some_method()
в коде можно просто использовать fake.some_method()
.
Теперь добавим фейковую генерацию прямо в модели Pydantic, используя параметр default_factory. Это позволит автоматически заполнять поля случайными значениями при создании модели.
from datetime import date
from pydantic import BaseModel, Field, RootModel, ConfigDict
from tools.fakers import fake
class CreateOperationSchema(BaseModel):
"""
Модель для создания новой банковской операции.
Поля:
- debit (float | None): Сумма списания со счёта
- credit (float | None): Сумма зачисления на счёт
- category (str): Категория операции
- description (str): Описание операции
- transaction_date (date): Дата транзакции (передаётся в формате "transactionDate")
"""
model_config = ConfigDict(populate_by_name=True)
debit: float | None = Field(default_factory=fake.money) # Генерация случайной суммы списания со счёта
credit: float | None = Field(default_factory=fake.money) # Генерация случайной суммы зачисления на счёт
category: str = Field(default_factory=fake.category) # Генерация случайной категории
description: str = Field(default_factory=fake.sentence) # Генерация случайного описания
transaction_date: date = Field(alias="transactionDate", default_factory=fake.date) # Генерация случайной даты
class UpdateOperationSchema(BaseModel):
"""
Модель для обновления банковской операции (используется в PATCH запросах).
Все поля являются необязательными, так как можно обновлять только часть данных.
Поля:
- debit (float | None): Новая сумма списания
- credit (float | None): Новая сумма зачисления
- category (str | None): Новая категория
- description (str | None): Новое описание
- transaction_date (date | None): Новая дата транзакции (alias "transactionDate")
"""
model_config = ConfigDict(populate_by_name=True)
debit: float | None = Field(default_factory=fake.money)
credit: float | None = Field(default_factory=fake.money)
category: str | None = Field(default_factory=fake.category)
description: str | None = Field(default_factory=fake.sentence)
transaction_date: date | None = Field(alias="transactionDate", default_factory=fake.date)
class OperationSchema(CreateOperationSchema):
"""
Модель банковской операции, содержащая ID.
Наследуется от CreateOperationSchema и добавляет поле:
- id (int): Уникальный идентификатор операции
"""
id: int
class OperationsSchema(RootModel):
"""
Контейнер для списка операций.
Поле:
- root (list[OperationSchema]): Список операций
"""
root: list[OperationSchema]
Применение default_factory. Каждое поле получает случайное значение при создании экземпляра модели. Пример работы:
operation = CreateOperationSchema()
print(operation)
Вывод (данные случайные):
{
"debit": -25.4,
"credit": 87.6,
"category": "fuel",
"description": "Paid for fuel at a gas station.",
"transactionDate": "2025-03-30"
}
Настройки нагрузочных тестов
Реализуем централизованный подход к управлению настройками для нагрузочных тестов. Это позволит легко изменять параметры без необходимости редактировать код.
В данном примере нам нужно хранить только URL API и таймаут запросов, но в будущем можно расширять этот механизм.
Для управления настройками будем использовать Pydantic Settings — удобную библиотеку, которая позволяет загружать переменные окружения в виде Pydantic-моделей.
from pydantic import BaseModel, HttpUrl
from pydantic_settings import BaseSettings, SettingsConfigDict
class HTTPClientConfig(BaseModel):
"""
Настройки HTTP-клиента.
Поля:
url (HttpUrl): Базовый URL для API.
timeout (float): Таймаут для запросов в секундах.
"""
url: HttpUrl
timeout: float
@property
def client_url(self) -> str:
"""
Возвращает URL API в строковом формате.
"""
return str(self.url)
class Settings(BaseSettings):
"""
Главная модель для хранения всех настроек проекта.
Загружает переменные из файла `.env` и поддерживает вложенные структуры.
Поля:
fake_bank_http_client (HTTPClientConfig): Настройки HTTP-клиента.
"""
model_config = SettingsConfigDict(
env_file='.env', # Загружаем переменные из файла .env
env_file_encoding='utf-8', # Указываем кодировку файла
env_nested_delimiter='.' # Позволяет использовать вложенные переменные (FAKE_BANK_HTTP_CLIENT.URL)
)
fake_bank_http_client: HTTPClientConfig # Настройки HTTP-клиента
# Глобальный доступ к настройкам
settings = Settings()
-
Класс
HTTPClientConfig
-
Класс
Settings
Наследуется от BaseSettings (Pydantic Settings).
-
model_config
определяет:Где искать переменные (из файла .env).
Кодировку файла (utf-8).
Поддержку вложенных переменных
(env_nested_delimiter='.')
.
fake_bank_http_client: HTTPClientConfig
— добавляет вложенные настройки для HTTP-клиента.
FAKE_BANK_HTTP_CLIENT.URL="https://api.sampleapis.com"
FAKE_BANK_HTTP_CLIENT.TIMEOUT=100
FAKE_BANK_HTTP_CLIENT.URL
— адрес API, который будет использовать HTTP-клиент.FAKE_BANK_HTTP_CLIENT.TIMEOUT
— таймаут запросов (в секундах).Благодаря
env_nested_delimiter='.'
, переменные в файле .env автоматически конвертируются в вложенные структуры внутриSettings
.
Теперь можно просто инициализировать Settings
и использовать его:
settings = Settings()
print(settings.fake_bank_http_client.client_url) # "https://api.sampleapis.com"
print(settings.fake_bank_http_client.timeout) # 100
API клиенты
Для работы с API https://api.sampleapis.com/fakebank/accounts мы используем библиотеку HTTPX. Она современная, производительная и отлично подходит для создания универсального API-клиента. Но одной отправки запросов недостаточно — в рамках нагрузочного тестирования важно, чтобы каждый вызов API автоматически попадал в систему метрик Locust. Нам нужно отслеживать время ответа, ошибки, размер тела и при этом поддерживать понятную, агрегируемую статистику по маршрутам.
Locust, по умолчанию, умеет собирать метрики только со встроенных клиентов. Если мы используем стороннюю библиотеку — такую как HTTPX — то должны вручную передавать информацию о каждом запросе в движок Locust. Именно для этого используются event hooks.
Как работают event hooks в HTTPX
HTTPX позволяет внедрить свою логику в жизненный цикл запроса с помощью так называемых хуков. Они бывают двух типов: request
— вызывается перед отправкой запроса, и response
— вызывается после получения ответа. Это удобный механизм, который позволяет реализовать всё: от логирования и трассировки до сбора метрик и интеграции с внешними инструментами.
def log_request(request):
print(f"Request event hook: {request.method} {request.url} - Waiting for response")
def log_response(response):
request = response.request
print(f"Response event hook: {request.method} {request.url} - Status {response.status_code}")
client = httpx.Client(event_hooks={'request': [log_request], 'response': [log_response]})
В нашем случае мы реализуем два хука. Первый срабатывает перед отправкой запроса и сохраняет метку времени начала — это позволит затем вычислить длительность запроса. Второй срабатывает после получения ответа. Он извлекает метку старта, считает время выполнения в миллисекундах, определяет длину тела ответа и отправляет всё это в Locust через environment.events.request.fire()
.
Важно, что в хуке на стороне ответа мы также ловим исключения (HTTPError
, HTTPStatusError
) — это позволяет нам регистрировать ошибки не как "просто 500", а как полноценные события в отчёте.
Реализация хуков
import time
from httpx import Request, Response, HTTPStatusError, HTTPError
from locust.env import Environment
def locust_request_event_hook(request: Request) -> None:
"""
Хук, вызываемый HTTPX перед отправкой запроса.
Сохраняет текущее время (timestamp) в поле request.extensions["start_time"],
чтобы затем можно было вычислить длительность запроса.
"""
request.extensions["start_time"] = time.time()
def locust_response_event_hook(environment: Environment):
"""
Возвращает функцию-хук для HTTPX, вызываемую после получения ответа.
Этот хук:
- извлекает информацию о запросе (метод, URL, маршрут),
- считает длительность ответа,
- определяет длину тела,
- фиксирует исключения (если они возникли),
- и отправляет всё это как событие в Locust.
Параметры:
----------
environment : locust.env.Environment
Объект окружения Locust, содержащий систему событий (events),
через которую отправляются данные для HTML-отчёта.
Возвращает:
-----------
Callable[[Response], None]
Замкнутая функция, вызываемая после каждого ответа HTTPX.
"""
# Вложенная функция (замыкание), которая будет реально использоваться как хук.
# Она "запоминает" переменную `environment` из внешней области видимости.
def inner(response: Response) -> None:
exception: HTTPError | HTTPStatusError | None = None
# Пытаемся проверить статус ответа. Если он не 2xx/3xx — сохраняем исключение.
try:
response.raise_for_status()
except (HTTPError, HTTPStatusError) as error:
exception = error
# Извлекаем оригинальный запрос, чтобы получить информацию о маршруте.
request = response.request
# Получаем route из extensions (если был передан явно),
# иначе используем "сырое" значение path из URL.
route = request.extensions.get("route", request.url.path)
# Вытаскиваем метку времени начала запроса.
# Если её не было (что маловероятно) — подставляем текущую.
start_time = request.extensions.get("start_time", time.time())
# Вычисляем время ответа в миллисекундах.
response_time = (time.time() - start_time) * 1000
# Считаем длину тела ответа в байтах.
response_length = len(response.read())
# Отправляем событие в систему метрик Locust.
# Это позволяет отобразить запрос в HTML-отчёте.
environment.events.request.fire(
request_type="HTTP", # Тип запроса (можно указать любое значение, например "gRPC")
name=f"{request.method} {route}", # Имя маршрута — с методом, агрегируется в отчёте
response_time=response_time, # Длительность запроса
response_length=response_length, # Длина тела
response=response, # Сам объект ответа (можно использовать в кастомных хендлерах)
exception=exception, # Ошибка, если была
context=None # Можно передавать контекст — например, пользователь или сессионные данные
)
return inner # Возвращаем замыкание
Почему используется замыкание (closure)
Мы могли бы просто передавать environment как аргумент в каждый вызов, но это неудобно: HTTPX ждёт, что event_hooks["response"]
будет списком функций с сигнатурой Callable[[Response], None]
. То есть она должна принимать только response
. Никакие дополнительные параметры, такие как environment, туда передать нельзя напрямую.
Чтобы обойти это ограничение, мы создаём вложенную функцию inner()
, которая «захватывает» переменную environment из внешней области видимости. Это и есть замыкание — стандартный приём в Python, когда вложенная функция запоминает значения переменных из своей внешней среды, даже если она вызывается позже и в другом контексте.
Таким образом, locust_response_event_hook(environment)
возвращает готовую для HTTPX функцию-хук, которая при каждом срабатывании знает, куда и как отправлять метрику.
Зачем нужен extensions["route"]
Когда мы тестируем реальные API, URL часто содержит динамические параметры: ID пользователя, номер операции, дату. Например, /accounts/123
, /accounts/456
и т.п. Если отправлять метрики в Locust с такими значениями как есть, то отчёт превратится в список из сотен уникальных строк, по сути — мусор. Чтобы этого избежать, мы передаём через extensions шаблон маршрута: /accounts/{id}
. Этот шаблон подставляется в метрике вместо реального пути и позволяет агрегировать статистику правильно.
Базовый API-клиент и подключение к Locust
На этом этапе у нас есть хуки для интеграции с Locust, и теперь пора построить над ними полноценный фундамент — абстракцию HTTP-клиента, через которую будут идти все запросы. Задача — отделить инфраструктурные детали вроде хуков, маршрутов, конфигурации и логирования от конкретных бизнес-клиентов, таких как OperationsClient
. Эта изоляция позволит переиспользовать код, упростит тестирование и повысит читаемость.
Вместо того чтобы работать напрямую с httpx.Client, мы создаём класс BaseClient
, который становится универсальным слоем между HTTPX и любым конкретным API-клиентом. Он инкапсулирует вызовы get
и post
, принимает дополнительные параметры, включая extensions, и работает с уже сконфигурированным клиентом — либо обычным, либо с подключёнными хуками Locust.
Ключевая особенность реализации — использование поля extensions
. Это специальный словарь, который HTTPX позволяет передавать в каждый запрос. Он остаётся незаметным для внешнего API, но может использоваться внутри хуков, чтобы передавать вспомогательные данные. И вот здесь появляется важный инженерный момент: extensions["route"]
.
Проблема в том, что API часто содержит динамические маршруты: /accounts/123
, /accounts/456
, и так далее. Если передавать их как есть в Locust
, то в метриках будет по 1000 разных URL, каждый со своей строкой. Это создаёт хаос в статистике, мешает анализу и делает HTML-отчёты практически бесполезными. Чтобы этого избежать, мы вручную передаём шаблон маршрута — например, /accounts/{id}
— через extensions["route"]
, и уже в хуке на стороне response
используем именно его в качестве имени метрики. Это простой, но мощный приём, позволяющий агрегировать данные и сохранить чистую статистику.
import logging
from typing import Any, TypedDict
from httpx import Client, URL, Response, QueryParams
from locust.env import Environment
from clients.event_hooks import locust_request_event_hook, locust_response_event_hook
from config import HTTPClientConfig
class ClientExtensions(TypedDict, total=False):
"""
Специальный словарь расширений запроса.
Используется для передачи метаданных, не относящихся к телу запроса.
В нашем случае — это `route`, который нужен, чтобы подменять
динамические пути (например: /accounts/123) на шаблонные
(например: /accounts/{id}) при передаче метрик в Locust.
"""
route: str
class BaseClient:
"""
Абстрактный HTTP-клиент, инкапсулирующий методы get/post
и работу с extensions. Основан на httpx.Client.
"""
def __init__(self, client: Client):
"""
Конструктор принимает уже настроенный httpx.Client,
в котором могут быть прописаны timeout, base_url, event_hooks и т.д.
"""
self.client = client
def get(
self,
url: URL | str,
params: QueryParams | None = None,
extensions: ClientExtensions | None = None
) -> Response:
"""
Отправляет GET-запрос и, при необходимости, передаёт route через extensions.
Это нужно для корректной агрегации статистики.
"""
return self.client.get(url, params=params, extensions=extensions)
def post(
self,
url: URL | str,
json: Any | None = None,
extensions: ClientExtensions | None = None
) -> Response:
"""
POST-запрос с поддержкой кастомных расширений.
"""
return self.client.post(url, json=json, extensions=extensions)
def get_http_client(config: HTTPClientConfig) -> Client:
"""
Конструктор базового клиента без хуков.
Используется в сидинге или вспомогательных модулях.
"""
logging.getLogger("httpx").setLevel(logging.WARNING)
return Client(timeout=config.timeout, base_url=config.client_url)
def get_locust_http_client(config: HTTPClientConfig, environment: Environment) -> Client:
"""
Конструктор клиента с поддержкой хуков Locust.
Используется во всех сценариях, где нужно собирать метрики.
"""
logging.getLogger("httpx").setLevel(logging.WARNING)
return Client(
timeout=config.timeout,
base_url=config.client_url,
event_hooks={
"request": [locust_request_event_hook],
"response": [locust_response_event_hook(environment)]
}
)
Но сам по себе BaseClient
не создаёт клиент. Этим занимаются два вспомогательных конструктора — get_http_client
и get_locust_http_client
. Первый используется, когда нам не нужно собирать метрики (например, в сидинге или обычных автотестах). Второй — когда мы запускаем тест в среде Locust и хотим, чтобы все метрики автоматически отправлялись в HTML-отчёт.
И в том, и в другом случае мы явно задаём base_url
и timeout
из конфигурации, а также устанавливаем уровень логгера для HTTPX, чтобы не засорять stdout при выполнении нагрузочных тестов или сидинга.
def get_http_client(config: HTTPClientConfig) -> Client:
"""
Конструктор базового клиента без хуков.
Используется в сидинге или вспомогательных модулях.
"""
logging.getLogger("httpx").setLevel(logging.WARNING)
return Client(timeout=config.timeout, base_url=config.client_url)
Во втором варианте — get_locust_http_client
— мы подключаем хуки, определённые ранее. Один срабатывает до запроса (сохраняет время старта), второй — после получения ответа (считает время, длину и отправляет событие в Locust).
def get_locust_http_client(config: HTTPClientConfig, environment: Environment) -> Client:
"""
Конструктор клиента с поддержкой хуков Locust.
Используется во всех сценариях, где нужно собирать метрики.
"""
logging.getLogger("httpx").setLevel(logging.WARNING)
return Client(
timeout=config.timeout,
base_url=config.client_url,
event_hooks={
"request": [locust_request_event_hook],
"response": [locust_response_event_hook(environment)]
}
)
Этот слой завершает низкоуровневую часть инфраструктуры. Он изолирует httpx
, предоставляет единый API, делает клиент совместимым с Locust и позволяет бизнес-клиентам фокусироваться на логике, а не на конфигурации и метриках.
Реализация клиента для операций: OperationsClient
После создания базового клиента пришло время реализовать клиентскую обёртку над конкретным REST API. В нашем случае — это OperationsClient
, который работает с endpoint’ом /accounts на сервисе fakebank.
Этот клиент строится по принципу двух уровней:
Низкоуровневые методы, которые просто отправляют HTTP-запросы и возвращают
Response
. Они удобны в отладке, автотестах, и при необходимости точечного контроля над телом ответа.Высокоуровневые методы, которые инкапсулируют всю логику — сериализацию запроса, десериализацию ответа и генерацию данных. Эти методы мы будем использовать в сидинге и нагрузочных тестах.
Важно! Чтобы избежать ошибок и дублирования адресов эндпоинтов в проекте, рекомендуется вынести все URI в отдельный Enum. Это позволит централизованно управлять URL-адресами и избежать опечаток.
from enum import Enum
class APIRoutes(str, Enum):
"""
Перечисление всех URI-адресов API для проекта.
Это перечисление позволяет централизованно управлять всеми маршрутами API,
что помогает избежать ошибок при их использовании и упрощает масштабирование.
"""
CARDS = "/fakebank/cards"
CLIENTS = "/fakebank/clients"
OPERATIONS = "/fakebank/accounts" # Основной URI для работы с операциями
STATEMENTS = "/fakebank/statements"
NOTIFICATIONS = "/fakebank/notifications"
def __str__(self):
return self.value
APIRoutes
— перечисление всех возможных эндпоинтов, с которыми будет работать приложение. Это позволяет централизовать и стандартизировать использование адресов.В реальных проектах вам возможно придется добавлять новые маршруты, и это будет намного удобнее, если они будут прописаны в одном месте.
from httpx import Response
from locust.env import Environment
from clients.base_client import BaseClient, ClientExtensions, get_http_client, get_locust_http_client
from config import settings
from schema.operations import CreateOperationSchema, OperationSchema, OperationsSchema
from tools.routes import APIRoutes
class OperationsClient(BaseClient):
"""
API-клиент для работы с endpoint'ом /accounts (операции).
Содержит низкоуровневые методы, возвращающие Response (get/post),
и высокоуровневые, возвращающие десериализованные Pydantic-модели.
"""
def get_operations_api(self) -> Response:
"""
Отправляет GET-запрос на /accounts.
Возвращает "сырой" httpx.Response без обработки.
"""
return self.get(APIRoutes.OPERATIONS)
def get_operation_api(self, operation_id: int) -> Response:
"""
Отправляет GET-запрос на /accounts/{id}.
Также передаёт шаблон маршрута через extensions["route"],
чтобы в Locust метриках не было мусора по ID.
"""
return self.get(
f"{APIRoutes.OPERATIONS}/{operation_id}",
extensions=ClientExtensions(route=f"{APIRoutes.OPERATIONS}/{{operation_id}}")
)
def create_operation_api(self, operation: CreateOperationSchema) -> Response:
"""
Отправляет POST-запрос на /accounts с JSON-телом.
Использует сериализацию через Pydantic с alias'ами.
"""
return self.post(
APIRoutes.OPERATIONS,
json=operation.model_dump(mode='json', by_alias=True)
)
def get_operations(self) -> OperationsSchema:
"""
Высокоуровневый метод: получает список операций и
десериализует ответ в Pydantic-модель OperationsSchema.
Используется в сценариях Locust и автотестах.
"""
response = self.get_operations_api()
return OperationsSchema.model_validate_json(response.text)
def get_operation(self, operation_id: int) -> OperationSchema:
"""
Высокоуровневый метод: получает одну операцию по ID
и преобразует ответ в Pydantic-модель OperationSchema.
"""
response = self.get_operation_api(operation_id)
return OperationSchema.model_validate_json(response.text)
def create_operation(self) -> OperationSchema:
"""
Высокоуровневый метод: создаёт операцию со случайными данными,
отправляет её на сервер, валидирует ответ как OperationSchema.
Используется для сидинга и в нагрузке.
"""
request = CreateOperationSchema()
response = self.create_operation_api(request)
return OperationSchema.model_validate_json(response.text)
def get_operations_client() -> OperationsClient:
"""
Создаёт клиент операций без хуков Locust.
Используется в сидинге или автономных автотестах.
"""
return OperationsClient(client=get_http_client(settings.fake_bank_http_client))
def get_operations_locust_client(environment: Environment) -> OperationsClient:
"""
Создаёт клиент операций с поддержкой event hooks Locust.
Используется внутри нагрузочных сценариев.
"""
return OperationsClient(
client=get_locust_http_client(settings.fake_bank_http_client, environment)
)
Структура клиента
class OperationsClient(BaseClient):
OperationsClient
наследуется от BaseClient
, получая в своё распоряжение стандартные методы get()
и post()
с поддержкой extensions
, конфигурации и хуков Locust.
Низкоуровневые методы API
def get_operations_api(self) -> Response:
return self.get(APIRoutes.OPERATIONS)
Метод отправляет GET-запрос на /accounts
и возвращает "сырое" тело ответа. Такой формат пригоден для API-тестов или отладочной работы с данными.
def get_operation_api(self, operation_id: int) -> Response:
return self.get(
f"{APIRoutes.OPERATIONS}/{operation_id}",
extensions=ClientExtensions(route=f"{APIRoutes.OPERATIONS}/{{operation_id}}")
)
Этот метод тоже работает на низком уровне, но с одной важной особенностью: он передаёт в extensions
шаблон маршрута. Это необходимо, чтобы в метриках Locust не появлялись сотни уникальных строк вроде /accounts/1
, /accounts/2
, а вместо этого всё агрегировалось под ключом /accounts/{operation_id}
.
def create_operation_api(self, operation: CreateOperationSchema) -> Response:
return self.post(
APIRoutes.OPERATIONS,
json=operation.model_dump(mode='json', by_alias=True)
)
Метод create_operation_api()
принимает модель Pydantic и сериализует её в JSON, используя алиасы (например, transaction_date
→ transactionDate
). Возвращается опять же Response
.
Высокоуровневые методы: использование в нагрузке и сидинге
def get_operations(self) -> OperationsSchema:
response = self.get_operations_api()
return OperationsSchema.model_validate_json(response.text)
Метод get_operations()
вызывает низкоуровневый API, затем валидирует и преобразует тело ответа в Pydantic-модель OperationsSchema
. Это упрощает работу с ответом — теперь мы получаем не JSON или строку, а полноценный Python-объект.
def get_operation(self, operation_id: int) -> OperationSchema:
response = self.get_operation_api(operation_id)
return OperationSchema.model_validate_json(response.text)
Аналогично: сначала вызываем API, затем парсим ответ. Важно, что OperationSchema
— это строго типизированная структура. Ошибки при несоответствии типов, форматов или отсутствующих полей будут видны сразу.
def create_operation(self) -> OperationSchema:
request = CreateOperationSchema()
response = self.create_operation_api(request)
return OperationSchema.model_validate_json(response.text)
Это наиболее «высокоуровневый» метод. Он полностью инкапсулирует всю цепочку: генерацию фейковых данных, сериализацию, отправку запроса и десериализацию ответа. Именно его мы будем вызывать в сценариях Locust и в сидинге, потому что он даёт результат «из коробки» без ручной работы.
Билдеры клиента
Как и в случае с BaseClient
, мы создаём два конструктора клиента — один для обычного использования, другой — для интеграции с Locust.
def get_operations_client() -> OperationsClient:
return OperationsClient(client=get_http_client(settings.fake_bank_http_client))
get_operations_client()
создаёт клиента без хуков. Это используется, например, в сидинге, когда нам не нужно собирать метрики — а просто сгенерировать или проверить данные.
def get_operations_locust_client(environment: Environment) -> OperationsClient:
return OperationsClient(
client=get_locust_http_client(settings.fake_bank_http_client, environment)
)
А get_operations_locust_client()
используется внутри сценариев Locust. Он создаёт httpx.Client
с подключёнными хуками, которые автоматически будут отправлять все метрики и ошибки в HTML-отчёт Locust.
Заключение
Такой клиент легко расширять, он изолирован от логики нагрузочного теста и может использоваться как в ручных тестах, так и в CI. При этом мы сохраняем строгую типизацию, поддержку роутинга, сериализацию, совместимость с хуками и централизованную конфигурацию. Именно так и должен выглядеть клиент в инженерной нагрузочной инфраструктуре.
Сидинг
Один из ключевых этапов подготовки к нагрузочному тестированию — это сидинг (от англ. seeding), то есть предварительное наполнение системы нужными данными. Без сидинга любые метрики производительности будут искажены: вы будете тестировать не реальную нагрузку, а начальные этапы использования системы, вроде регистрации и настройки.
Например, если вы хотите проверить, как система обрабатывает массовые операции по уже существующим пользователям — сначала этих пользователей нужно создать. Причём не просто «записать в базу», а подготовить полные, осмысленные данные: активные счета, историю операций, документы и всё, что положено по бизнес-логике.
По сути, сидинг — это создание реалистичного состояния системы до запуска тестов, чтобы эмулировать поведение настоящих пользователей, а не вновь зарегистрированных «болванок».
Почему сидинг нужно делать через API, а не напрямую в базу
Когда речь заходит о создании данных, возникает логичный вопрос: а как именно наполнять систему — через базу или через API?
Простой путь — напрямую вставить данные в базу SQL-скриптами или ORM. Но на практике такой подход опасен, хрупок и практически всегда ведёт к скрытым багам. Поэтому в рамках этого проекта мы используем сидинг через API — и вот почему.
1. API сохраняет бизнес-логику
Любой сервис содержит важные правила: валидации, зависимости, триггеры, расчётные поля. При вставке напрямую в базу вы обходите всю эту логику, и в результате получаете «битые» или неполные данные, которые не проходят через те же проверки, что и в реальной системе.
При работе через API все операции проходят через тот же код, что используется в продакшене — это гарантирует валидность и консистентность.
2. Минимум дублирования и упрощённая поддержка
Сидинг через базу требует от вас вручную воссоздавать все связи и зависимости: сначала пользователя, потом счёт, потом операции и документы. Любое изменение в бизнес-логике потребует переписывать сидер. Через API — это уже реализовано. Вы просто вызываете соответствующие методы.
3. Защита от ошибок и интеграция с логами
Сидинг через API автоматически попадает в логи, метрики и трассировку сервиса. Вы можете отследить ошибки, посмотреть, какие объекты были созданы и какова их структура. А значит — сидинг становится предсказуемым и безопасным.
4. Масштабируемость
API-сидинг проще масштабировать. Вы можете распараллелить генерацию данных, повторно использовать существующие клиенты и сценарии, не заботясь о внутреннем устройстве базы. Это особенно важно в микросервисной архитектуре, где за одним объектом могут стоять десятки сервисов и процессов.
Почему мы делаем сидинг именно так
Мы хотим, чтобы все данные, участвующие в нагрузочном тесте, были:
консистентны (например, у операции есть привязанный пользователь и счёт),
валидны (соответствуют всем правилам API),
понятны (их можно проследить в логах и при отладке),
реалистичны (как на боевом окружении).
Поэтому мы будем использовать специальные API-клиенты, которые вызывают реальные методы создания объектов (например, POST /operations), и строим поверх них сидинг-скрипты. Это даёт нам:
безопасность и контроль,
гибкость в создании сценариев,
и гарантию, что нагрузочное тестирование идёт на честных, настоящих данных.
Схема сидинга: входной план и результат
Для организации сидинга мы разделяем его на две части: входной план (что нужно создать) и результат (что было создано). Это позволяет явно контролировать, какие объекты мы хотим получить, и затем использовать их повторно в тестах или других сидерах.
План сидинга (SeedsPlan)
from pydantic import BaseModel, Field
class SeedOperationsPlan(BaseModel):
"""
План генерации операций.
Используется для задания количества операций, которые необходимо создать в процессе сидинга.
В будущем может быть расширен (например, по типу операций, статусу, сумме и т.п.).
"""
count: int = 0 # Количество операций, которое нужно создать
class SeedsPlan(BaseModel):
"""
Общий план сидинга для всех сущностей.
На текущий момент содержит только блок по операциям,
но в будущем может быть дополнен (например: пользователи, счета, чеки и пр.).
"""
operations: SeedOperationsPlan = Field(default_factory=SeedOperationsPlan)
Объяснение:
SeedOperationsPlan
— описывает, сколько операций нужно сгенерировать. В будущем сюда могут быть добавлены дополнительные параметры: тип операции, сумма, статус и т.д.SeedsPlan
— корневая схема, которая может включать в себя планы генерации разных сущностей (не только операций, но и пользователей, счетов и т.д.).
Этот план подаётся на вход сидеру и определяет его поведение: сколько данных и какого типа нужно создать.
Результат сидинга (SeedsResult)
import random
from pydantic import BaseModel, Field
class SeedOperationResult(BaseModel):
"""
Результат сидинга одной операции.
Содержит идентификатор (ID), который вернулся от API после создания операции.
Может использоваться в тестах или последующей логике — например, для получения подробностей операции.
"""
operation_id: int # Уникальный ID созданной операции
class SeedsResult(BaseModel):
"""
Общий результат сидинга. Хранит созданные сущности по типам.
Сейчас содержит только операции, но в дальнейшем может быть дополнен пользователями, счетами и т.п.
"""
operations: list[SeedOperationResult] = Field(default_factory=list)
def get_next_operation(self) -> SeedOperationResult:
"""
Возвращает следующую созданную операцию (в порядке добавления).
Используется для последовательных сценариев, например, когда операции используются строго одна за другой.
"""
return self.operations.pop(0)
def get_random_operation(self) -> SeedOperationResult:
"""
Возвращает случайную операцию из уже созданных.
Используется для имитации произвольного пользовательского поведения.
"""
return random.choice(self.operations)
Объяснение:
SeedOperationResult
— результат генерации одной операции. Хранит её ID, который можно использовать при дальнейшем взаимодействии (например, для запроса по ID).SeedsResult
— агрегирует все созданные данные, сгруппированные по типу. Здесь у нас толькоoperations
, но можно добавитьusers
,accounts
и т.п.-
Методы
get_next_operation()
иget_random_operation()
позволяют гибко выбирать данные:по порядку (для синхронных сценариев),
случайным образом (для имитации разнообразия в нагрузке).
Почему это удобно
Явное разделение логики — отдельно описываем, что хотим, и отдельно — что получили.
Гибкость и масштабируемость — при добавлении новых сущностей (пользователи, счета) план и результат просто расширяются.
Переиспользуемость — результат можно сериализовать в файл и переиспользовать в других тестах, сидерах или даже в автотестах.
Типизация и валидация — с помощью
pydantic
гарантируется корректность данных на входе и выходе.
Реализация сидинг-билдера
Теперь, когда мы определили, что сидинг нужно делать именно через API, давайте реализуем простой, но расширяемый сидинг-билдер. Его задача — получить на вход план сидинга и вернуть результат, пригодный для использования в нагрузочных сценариях.
В нашей системе мы хотим, чтобы сидер:
создавал заданное количество операций через API,
возвращал ссылки на созданные сущности (например, ID операций),
работал через универсальные Pydantic-схемы
SeedsPlan
иSeedsResult
,мог быть расширен в будущем — например, для создания пользователей, счетов, документов и так далее.
from clients.operations_client import OperationsClient, get_operations_client
from seeds.schema.plan import SeedsPlan
from seeds.schema.result import SeedsResult, SeedOperationResult
class SeedsBuilder:
"""
Основной билдер для генерации сидинговых данных (через API-клиентов).
Отвечает за выполнение плана сидинга — создание всех необходимых сущностей
(на текущем этапе — только операций), с соблюдением всей бизнес-логики приложения.
"""
def __init__(self, operations_client: OperationsClient):
"""
Инициализация билдера с API-клиентом для операций.
:param operations_client: Клиент, обеспечивающий вызовы к операциям.
Обычно это HTTPX-клиент с базовой или нагрузочной конфигурацией.
"""
self.operations_client = operations_client
def build_operation_result(self) -> SeedOperationResult:
"""
Создаёт одну операцию через API и оборачивает её результат
в модель `SeedOperationResult`, пригодную для последующего использования в тестах.
:return: SeedOperationResult с ID созданной операции.
"""
operation = self.operations_client.create_operation()
return SeedOperationResult(operation_id=operation.id)
def build(self, plan: SeedsPlan) -> SeedsResult:
"""
Основной метод генерации сидинговых данных согласно плану.
Выполняет создание операций по количеству, указанному в `plan.operations.count`.
:param plan: Объект `SeedsPlan`, описывающий, какие сущности и в каком количестве необходимо создать.
:return: SeedsResult — итоговый набор данных, пригодный для использования в нагрузочных сценариях.
"""
return SeedsResult(
operations=[self.build_operation_result() for _ in range(plan.operations.count)]
)
def get_seeds_builder() -> SeedsBuilder:
"""
Упрощённый фабричный метод для получения готового билдера.
Использует стандартный HTTP-клиент (не Locust) для подготовки данных.
Этот метод можно подменить в тестах или заменить при необходимости изолированной конфигурации.
"""
return SeedsBuilder(operations_client=get_operations_client())
Весь сидинг построен вокруг одной сущности — operation
. Мы используем уже готовый OperationsClient
, в котором реализован вызов create_operation()
через HTTP API. Каждая созданная операция добавляется в результат (SeedsResult
) в виде объекта SeedOperationResult
, содержащего ID операции.
На вход подаётся SeedsPlan
, в котором указано, сколько операций нужно создать. Это удобно для конфигурирования сидинга извне — например, из JSON-файла или командной строки.
Такая архитектура позволяет легко масштабировать сидинг. Если завтра потребуется сидировать не только операции, но и пользователей, счета, документы — достаточно будет:
добавить соответствующий блок в
SeedsPlan
иSeedsResult
,реализовать метод
build_user_result()
(или аналогичный),обновить метод
build()
для поддержки новой сущности.
Таким образом, сидинг остаётся прозрачным, расширяемым и строго типизированным.
Сохраняем результат сидинга
После выполнения сидинга у нас на руках оказывается структура SeedsResult
, в которой собраны все созданные объекты (например, operation_id
операций). Эти данные могут понадобиться позже:
для запуска нагрузочного сценария,
для анализа проблем,
для отладки,
для повторного использования,
или для CI/CD пайплайна.
Чтобы не терять эти данные, мы сохраняем результат в файл.
import os
from seeds.schema.result import SeedsResult
def save_seeds_result(result: SeedsResult, scenario: str):
"""
Сохраняет результат сидинга в JSON-файл в директорию ./dumps.
:param result: Объект SeedsResult с данными о созданных сущностях.
:param scenario: Название сценария (будет использовано в имени файла).
"""
if not os.path.exists("dumps"):
os.mkdir("dumps") # создаём директорию, если её ещё нет
seeds_file = f"./dumps/{scenario}_seeds.json"
with open(seeds_file, 'w+', encoding="utf-8") as file:
file.write(result.model_dump_json()) # сериализуем через Pydantic
Такой подход даёт сразу несколько преимуществ:
Повторное использование. Сидинг — это затратная операция. Если вы уже сгенерировали данные, нет смысла делать это повторно. Вместо этого можно загрузить файл и использовать готовые ID.
Отладка и воспроизводимость. При падении сценария вы можете проверить, какие именно данные использовались. Это повышает прозрачность и снижает вероятность скрытых багов.
Поддержка CI/CD. В автоматических пайплайнах удобно сначала запустить сидинг, сохранить результат, а затем использовать его в нагрузочном тесте. Особенно, если вы хотите добиться повторяемости тестов.
Масштабирование. Позже можно реализовать загрузку, кеширование, дифференциацию сценариев, фильтрацию сидированных данных и т.д.
Базовая архитектура нагрузочных сценариев: User, TaskSet, SequentialTaskSet
Теперь, когда у нас есть подготовленные тестовые данные (сидинг), мы можем перейти к следующему важному этапу — изучению архитектуры нагрузочных сценариев. В этом блоке мы не будем реализовывать полноценные сценарии, а сосредоточимся на базовых строительных блоках, из которых они в дальнейшем будут собираться:
виртуальные пользователи (
User
),наборы задач (
TaskSet
),последовательные сценарии (
SequentialTaskSet
).
Эти компоненты определяют, какие действия выполняются, с какой частотой и задержкой, какие API вызываются и в какой последовательности. Понимание их работы необходимо для построения более сложных и реалистичных сценариев, к которым мы перейдём позже.
Базовый виртуальный пользователь
Чтобы не повторять одни и те же настройки в каждом сценарии, мы создадим базовый класс пользователя, от которого будут наследоваться все остальные. Он задаёт:
базовый
host
(адрес сервиса по умолчанию),стратегию ожидания между задачами (
wait_time
),и помечается как абстрактный, чтобы не запускался напрямую.
from locust import User, between
class BaseLocustUser(User):
"""
Базовый класс виртуального пользователя в Locust.
Этот класс не выполняет никаких задач сам по себе, а предназначен для наследования.
Он задаёт общую конфигурацию, которая будет использоваться во всех сценариях:
- `host`: фиктивное значение, чтобы удовлетворить требования Locust
- `wait_time`: время ожидания между задачами (здесь — от 1 до 3 секунд)
- `abstract`: помечает класс как абстрактный, чтобы Locust не запускал его напрямую
ВАЖНО:
Мы не используем встроенный HTTP-клиент Locust. Вместо этого у нас есть
собственные API-клиенты на базе `httpx`, которые полностью управляют отправкой запросов.
Поэтому значение `host` в данном контексте не используется по назначению — это просто
заглушка, необходимая для корректной работы фреймворка Locust.
"""
host = "localhost" # Фиктивный host, Locust требует его, но мы не используем
wait_time = between(1, 3) # Пауза между задачами — от 1 до 3 секунд
abstract = True # Не использовать этот класс напрямую в запуске
Зачем нужен абстрактный пользователь?
Locust ищет все классы, унаследованные от User
, и считает их «реальными пользователями», которых нужно запускать. Но в нашем случае мы хотим использовать BaseLocustUser
только как базу для других классов, чтобы переиспользовать общее поведение и настройки.
Поэтому мы выставляем:
abstract = True
Это позволяет избежать случайного запуска базового класса и делает архитектуру чище.
Почему host = "localhost" — это заглушка?
Locust требует, чтобы у каждого User
было указано поле host
. Обычно это базовый URL, к которому будут отправляться запросы через встроенный HTTP-клиент (например, HttpUser). Но в нашем случае:
Мы не используем HttpUser, а базируемся на обычном
User
Все запросы отправляются через кастомные
httpx
-клиенты, у которыхbase_url
задаётся отдельноПоэтому значение
host
никак не влияет на выполнение сценария, но его необходимо задать, чтобы избежать ошибки запуска
Почему between(1, 3)?
wait_time определяет, сколько времени должен ждать виртуальный пользователь между двумя задачами. Это необходимо для моделирования реального поведения, где пользователь не делает запросы каждую миллисекунду.
between(1, 3)
означает, что между задачами будет случайная пауза от 1 до 3 секунд.Этот диапазон можно настроить в каждом конкретном сценарии — например, сделать паузы короче или длиннее.
Базовые TaskSet и SequentialTaskSet
После того как мы определили абстрактного пользователя (BaseLocustUser
), следующим шагом будет реализация базовых TaskSet-классов, в которых мы будем описывать сценарии поведения пользователей.
В Locust существуют два базовых типа TaskSet:
Тип |
Особенность |
---|---|
Задачи (tasks) выполняются в произвольном порядке, с учётом веса задач |
|
Задачи выполняются строго последовательно, сверху вниз |
Если вы хотите смоделировать пользователя, который выполняет конкретную бизнес-последовательность (например: логин → покупка → выход), используйте SequentialTaskSet.
Если поведение состоит из повторяющихся или случайных действий (например: пользователь запрашивает выписку, делает покупку, открывает новый счёт в случайном порядке) — подойдёт обычный TaskSet.
Чтобы не дублировать код в каждом сценарии, мы выносим базовую логику в два абстрактных класса:
from locust import TaskSet, SequentialTaskSet
from clients.operations_client import OperationsClient, get_operations_locust_client
class BaseLocustTaskSet(TaskSet):
"""
Базовый класс для сценариев, использующих TaskSet (случайный порядок задач).
В атрибуте `operations_client` хранится экземпляр клиента OperationsClient,
который мы инициализируем в методе `on_start`, передавая туда окружение Locust.
Этот клиент будет использоваться для отправки HTTP-запросов через httpx
с поддержкой всех нужных хуков и метрик.
"""
operations_client: OperationsClient # Кастомный API-клиент для работы с операциями
def on_start(self) -> None:
"""
Метод, вызываемый перед началом выполнения задач (tasks).
Здесь мы инициализируем клиента с привязкой к текущему окружению Locust.
"""
self.operations_client = get_operations_locust_client(self.user.environment)
class BaseLocustSequentialTaskSet(SequentialTaskSet):
"""
Базовый класс для сценариев, использующих строго последовательное выполнение задач.
Отличие от BaseLocustTaskSet — только в типе сценария: здесь задачи выполняются
по порядку, сверху вниз, как в обычной процедуре.
"""
operations_client: OperationsClient
def on_start(self) -> None:
"""
Также инициализируем API-клиент до запуска основного сценария.
"""
self.operations_client = get_operations_locust_client(self.user.environment)
Как это использовать в реальных сценариях?
Если вы хотите написать последовательный сценарий из нескольких шагов, создайте класс, унаследованный от
BaseLocustSequentialTaskSet
, и опишите задачи в нужном порядке с помощью@task
.Если сценарий более свободный и пользователь выполняет действия случайно, создайте класс от
BaseLocustTaskSet
, добавьте задачи с разнымweight
— и Locust сам будет выбирать порядок выполнения.
Нагрузочные сценарии
В этом разделе мы реализуем два типовых сценария нагрузки:
один — с созданием данных на лету,
второй — с использованием заранее подготовленных данных через сидинг.
Первый сценарий имитирует поведение нового пользователя, который только что зашёл в систему, создал операцию (например, покупку), а затем запросил список операций и открыл детальную информацию по одной из них. Такой сценарий хорош для тестирования "холодного старта", но он увеличивает время разгона нагрузки и может искажать метрики из-за первичной инициализации.
Второй сценарий работает с уже сидированными (заранее созданными) данными. Мы заранее создаём операции через API и сохраняем их. Затем, во время нагрузки, пользователь просто делает запросы: получает список, выбирает случайную операцию и запрашивает её детали. Это приближает тест к реальному поведению пользователей и даёт более стабильные и репрезентативные метрики.
Оба подхода имеют право на существование — мы реализуем оба, чтобы вы могли выбрать нужный под свою задачу.
Сценарий без сидинга: создание операции, просмотр списка и деталей
Начнём с реализации самого простого сценария — без предварительно подготовленных данных.
Представим типичную пользовательскую активность: клиент заходит в приложение, совершает какую-либо финансовую операцию (например, покупку), затем переходит в раздел "история операций", просматривает список всех операций и выбирает одну для просмотра деталей. Это последовательный и логически связанный пользовательский поток, который мы и хотим нагрузить.
/scenarios/get_operation_without_seeds/scenario.py
from locust import task
from schema.operations import OperationSchema
from tools.locust.task_set import BaseLocustSequentialTaskSet
from tools.locust.user import BaseLocustUser
class GetOperationSequentialTaskSet(BaseLocustSequentialTaskSet):
# Храним объект созданной операции, чтобы использовать в следующих шагах
operation: OperationSchema | None = None
@task
def create_operation(self):
# Создаём новую операцию через API — это имитирует действие пользователя,
# например, покупку или перевод в приложении
self.operation = self.operations_client.create_operation()
@task
def get_operations(self):
# Получаем список всех операций текущего пользователя
self.operations_client.get_operations()
@task
def get_operation(self):
# Пытаемся получить детали по конкретной операции
# Однако операция могла не создаться — например, сервис вернул 500 или 429
# Поэтому делаем проверку и корректно выходим
if not self.operation:
return
self.operations_client.get_operation(self.operation.id)
class GetOperationUser(BaseLocustUser):
# Указываем TaskSet, который будет выполняться этим пользователем
tasks = [GetOperationSequentialTaskSet]
Мы используем
SequentialTaskSet
, чтобы задачи выполнялись в строгом порядке: сначала создаётся операция, затем выполняются чтения.Объект
operation
сохраняется между задачами, чтобы можно было запросить детали по конкретному ID.Это логика одного виртуального пользователя, который каждый раз заново создаёт операцию.
Почему используется SequentialTaskSet, а не TaskSet
Мы намеренно используем SequentialTaskSet
, чтобы гарантировать порядок выполнения задач:
сначала создаётся операция,
затем вызывается метод получения списка операций,
затем — получение конкретной операции.
Если бы мы использовали обычный TaskSet
, Locust бы выбирал задачи в случайном порядке, основываясь на их "весах", и мы бы столкнулись с ситуацией, когда запрос get_operation
выполняется до создания операции — что привело бы к ошибкам.
Почему мы не задаём веса задач (@task(n))
Во многих сценариях веса задач указываются для моделирования вероятностей — например, операция "открыть документ" может встречаться чаще, чем "удалить аккаунт".
В данном случае мы используем SequentialTaskSet
, а значит порядок жёстко зафиксирован и вес не играет никакой роли. Все задачи выполняются ровно один раз в том порядке, в котором они описаны. Поэтому декораторы @task
без аргументов здесь абсолютно корректны.
Зачем проверка if not self.operation
Операция создаётся через внешний API. Это значит, что:
сервис может временно недоступен (5xx),
включена защита от нагрузки (429),
произошла ошибка сериализации, валидации или другая ошибка бизнес-логики.
Если операция не создалась, нет смысла продолжать сценарий. Поэтому мы делаем проверку на None
, и мягко выходим, не вызывая get_operation
.
if not self.operation:
return
Такой подход:
предотвращает шум в логах и отчётах;
исключает бессмысленные ошибки;
повышает устойчивость сценариев.
Конфигурация запуска
/scenarios/get_operation_without_seeds/v1.0.conf
locustfile = ./scenarios/get_operation_without_seeds/scenario.py
spawn-rate = 2
run-time = 30s
headless = true
users = 20
html = ./scenarios/get_operation_without_seeds/report.html
Для каждого сценария мы создаём отдельный конфигурационный файл с расширением .conf
, в котором зафиксированы все параметры нагрузки: количество пользователей, скорость разгона, длительность теста, путь к файлу сценария и путь к отчёту. Это не просто удобство, а основа инженерного подхода к воспроизводимым нагрузочным тестам.
В отличие от ручного запуска с флагами в командной строке, .conf
-файлы:
позволяют запускать сценарии с одинаковыми параметрами из CI/CD;
исключают ошибки при повторных прогонах;
сохраняются в Git и участвуют в истории изменений.
Кроме того, конфигурации мы версионируем — например, v1.0.conf
, v2.0.conf
. Это даёт нам возможность:
фиксировать конкретные профили нагрузки, под которые получены результаты;
отслеживать изменения нагрузки во времени;
точно сравнивать отчёты только внутри одной и той же версии профиля.
Таким образом, .conf
-файл — это не просто способ запускать тесты, а часть архитектуры нагрузочного тестирования, обеспечивающая прозрачность, стабильность и масштабируемость.
Почему нагрузка скромная?
Обратите внимание: в конфигурации указан умеренный профиль нагрузки — всего 20 пользователей, скорость запуска 2 пользователя в секунду, продолжительность 30 секунд. Это не случайность.
Мы работаем с тестовым API https://api.sampleapis.com/fakebank/accounts, у которого установлен рейт-лимит. При избыточной нагрузке сервис начинает возвращать ответы с кодом 429 Too Many Requests
. Такие ошибки искажают результат тестирования и мешают сосредоточиться на логике сценария.
Поэтому:
мы сознательно ограничиваем количество пользователей;
не стремимся "положить" сервис;
и соблюдаем культуру инженерного тестирования — тестируем не ради нагрузки, а ради понимания поведения системы под управляемым профилем.
Да и давайте честно — наглеть не будем, особенно в рамках стенда, который создан для обучения и отладки, а не для боевых боёв с миллионами запросов.
Сценарий с использованием сидинга
В отличие от предыдущего сценария, где пользователь сначала создавал данные сам, здесь мы заранее подготовим всё необходимое — операции будут созданы ещё до запуска теста, и виртуальные пользователи будут использовать уже готовые данные.
Зачем это нужно?
Это ускоряет разгон нагрузки, так как пользователям не нужно тратить время на создание данных.
Это исключает дополнительную нагрузку на систему от действий, которые не являются основной целью теста (например, регистрация или генерация операций).
Это позволяет протестировать сценарии "существующих пользователей", что ближе к продакшен-реальности.
/scenarios/get_operation_with_seeds/scenario.py
from locust import events, task
from locust.env import Environment
from seeds.builder import get_seeds_builder
from seeds.dumps import save_seeds_result
from seeds.schema.plan import SeedsPlan, SeedOperationsPlan
from seeds.schema.result import SeedOperationResult
from tools.locust.task_set import BaseLocustTaskSet
from tools.locust.user import BaseLocustUser
@events.init.add_listener
def init(environment: Environment, **kwargs):
# Выполняем сидинг перед запуском пользователей.
# Это важно: при старте теста система уже должна содержать готовые операции.
builder = get_seeds_builder()
# Формируем план сидинга: создадим 20 операций через API
result = builder.build(plan=SeedsPlan(operations=SeedOperationsPlan(count=20)))
# Сохраняем результат в файл — для дебага, воспроизводимости и реюза
save_seeds_result(result=result, scenario="get_operations_with_seeds")
# Кладём результат в environment, чтобы он был доступен всем пользователям
environment.seeds = result
class GetOperationTaskSet(BaseLocustTaskSet):
seed_operation: SeedOperationResult
def on_start(self) -> None:
super().on_start()
# При старте каждого пользователя выбираем случайную операцию из сидинга
self.seed_operation = self.user.environment.seeds.get_random_operation()
@task(1)
def get_operations(self):
# Пользователь запрашивает список операций
self.operations_client.get_operations()
@task(3)
def get_operation(self):
# Пользователь смотрит детали по заранее подготовленной операции
self.operations_client.get_operation(self.seed_operation.operation_id)
class GetOperationUser(BaseLocustUser):
# Привязываем TaskSet к пользователю
tasks = [GetOperationTaskSet]
Почему используется TaskSet, а не SequentialTaskSet
В этом сценарии порядок выполнения действий не важен:
пользователь может сначала открыть список операций,
а потом выбрать одну из них (или наоборот).
Мы намеренно используем TaskSet
, чтобы позволить Locust перемешивать задачи и запускать их в случайном порядке — согласно указанным весам. Это даёт более естественное, стохастическое поведение, ближе к реальному использованию.
Как работают веса задач: @task(n)
В TaskSet
веса задач управляют частотой вызовов:
@task(1)
означает: задача должна выполняться один раз из всех доступных вызовов.@task(3)
означает: задача должна выполняться в три раза чаще, чем задача с весом 1.
То есть в данном случае распределение будет:
get_operations
— 25% (1 из 4),get_operation
— 75% (3 из 4).
Это поведение задаёт соотношение реальных сценариев: пользователь чаще открывает конкретные детали операции, чем весь список.
Использование событий Locust (event hooks)
Ключевой особенностью этого сценария является использование событийной системы Locust, а именно хука @events.init.add_listener
:
@events.init.add_listener
def init(environment: Environment, **kwargs):
...
Этот хук срабатывает один раз перед запуском теста, сразу после инициализации окружения Locust. Именно в нём мы и выполняем сидинг:
Получаем сидинг-билдер через
get_seeds_builder()
.Создаём 20 операций с помощью
builder.build(...)
.Сохраняем результат сидинга в файл
get_operations_with_seeds_seeds.json
.Передаём результат в
environment.seeds
, чтобы он стал доступен всем пользователям.
Почему именно здесь?
Логика сидинга отделена от сценариев — сценарий остаётся чистым и сфокусированным на бизнес-действиях.
Сидинг выполняется ровно один раз, независимо от числа пользователей.
Подготовленные данные попадают в shared state (
environment
), и могут быть переиспользованы всеми пользователями без дублирования.
Сам сценарий
class GetOperationTaskSet(BaseLocustTaskSet):
seed_operation: SeedOperationResult
def on_start(self) -> None:
super().on_start()
self.seed_operation = self.user.environment.seeds.get_random_operation()
В методе on_start
каждый виртуальный пользователь получает одну случайную операцию из общего результата сидинга. Далее он будет с ней работать.
@task(1)
def get_operations(self):
self.operations_client.get_operations()
@task(3)
def get_operation(self):
self.operations_client.get_operation(self.seed_operation.operation_id)
Пользователь:
сначала получает список всех операций (не обязательно, но имитирует поведение UI),
затем делает детальный запрос по своей конкретной операции.
Задание get_operation
весит в 3 раза больше, чем get_operations
, что приближает поведение к реалистичному сценарию: пользователь может просмотреть список операций один раз, но конкретную операцию — несколько раз (или UI может отправлять такой запрос повторно).
Зачем мы сохраняем результат сидинга?
save_seeds_result(result=result, scenario="get_operations_with_seeds")
Мы сохраняем результат в файл, потому что:
это позволяет повторно использовать данные без повторного сидинга;
это даёт возможность проанализировать сидированные данные;
это удобно для отладки: можно посмотреть, какие ID операций были созданы, если что-то пошло не так.
Конфигурация запуска
/scenarios/get_operation_with_seeds/v1.0.conf
locustfile = ./scenarios/get_operation_with_seeds/scenario.py
spawn-rate = 2
run-time = 30s
headless = true
users = 20
html = ./scenarios/get_operation_with_seeds/report.html
Эта конфигурация повторяет структуру первого сценария. Используется умеренный профиль нагрузки, так как:
стенд учебный, и у него есть ограничение на количество запросов (rate limit),
мы хотим избежать
429 Too Many Requests
,нам важно сохранить чистоту метрик, не перегружая систему искусственно.
Запуск на CI/CD
Настроим workflow-файл для автоматического запуска нагрузочных тестов в GitHub Actions, генерации Locust HTML-отчета с сохранением истории и публикации его на GitHub Pages.
/.github/workflows/load-tests.yaml
# Название workflow-а — будет отображаться в интерфейсе GitHub Actions
name: Load Tests
# Указываем, что запуск возможен вручную (через кнопку "Run workflow")
on:
workflow_dispatch:
inputs:
SCENARIO:
type: 'choice' # Тип ввода — выпадающий список
default: './scenarios/get_operation_with_seeds/v1.0.conf' # Значение по умолчанию
options: # Возможные значения — пути до .conf-файлов сценариев
- ./scenarios/get_operation_with_seeds/v1.0.conf
- ./scenarios/get_operation_without_seeds/v1.0.conf
required: true # Значение обязательно
description: 'Locust config file' # Подпись к полю в интерфейсе GitHub
jobs:
run-tests:
# Используем образ Ubuntu для запуска тестов
runs-on: ubuntu-latest
timeout-minutes: 30 # На всякий случай ограничиваем длительность прогона
steps:
# Шаг 1: Клонируем репозиторий в раннер
- name: Check out repository
uses: actions/checkout@v4
# Шаг 2: Устанавливаем нужную версию Python
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
# Шаг 3: Устанавливаем зависимости из requirements.txt
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
# Шаг 4: Создаём директорию для будущего HTML-отчета Locust
# Используем run_id чтобы сохранить отчёт уникально для каждого запуска
- name: Create report directory
run: mkdir -p ./reports/${{ github.run_id }}
# Шаг 5: Запускаем нагрузочные тесты по выбранному сценарию
# HTML-отчёт будет сохранён в папке reports/<run_id>/index.html
- name: Run load tests and generate HTML report
run: locust --config=${{ github.event.inputs.SCENARIO }} --html=./reports/${{ github.run_id }}/index.html
# Шаг 6: Загружаем HTML-отчёт как артефакт, чтобы он был доступен после завершения CI
- name: Upload Locust reports
if: always() # Всегда выполнять, даже если тесты упали
uses: actions/upload-artifact@v4
with:
name: reports # Название артефакта
path: reports # Путь до директории с отчетами
publish-report:
if: always() # Публикация должна выполняться даже при падении тестов
needs: [ run-tests ] # Этот job зависит от выполнения run-tests
runs-on: ubuntu-latest
steps:
# Шаг 1: Скачиваем артефакт с отчетом из предыдущей job
- name: Download Locust reports
uses: actions/download-artifact@v4
with:
name: reports
path: reports
# Шаг 2: Публикуем HTML-отчёт на GitHub Pages (ветка gh-pages)
- name: Deploy HTML report to GitHub Pages
uses: peaceiris/actions-gh-pages@v4
with:
github_token: ${{ secrets.GITHUB_TOKEN }} # Авторизация через встроенный токен
keep_files: true # Не удаляем предыдущие отчёты (архивируем историю)
publish_dir: reports # Папка, откуда брать содержимое
publish_branch: gh-pages # Целевая ветка для GitHub Pages
Ссылки на документацию для всех использованных actions можно найти ниже:
Почему workflow_dispatch?
Даёт возможность запускать тесты вручную с выбором конфигурации. Удобно для запуска под конкретную версию нагрузки или перед релизом.
Зачем сохраняем в reports/${{ github.run_id }}?
Чтобы каждый запуск имел свою уникальную папку с отчётом. Это позволяет:
сохранять историю запусков,
публиковать и сравнивать отчёты между собой,
исключить перезапись.
Почему два job-а?
Мы разделяем логику:
один отвечает за запуск и генерацию отчёта,
другой — за публикацию. Это делает пайплайн более управляемым и удобным для отладки.
Почему gh-pages?
Это стандартная ветка GitHub Pages. Любой HTML-файл, добавленный туда, станет доступен по адресу https://<user>.github.io/<repo>/<путь>.
Почему keep_files: true?
Мы сохраняем отчёты за все предыдущие прогоны. Это важно для анализа динамики производительности во времени, особенно при регрессии.
Разрешения для Workflow
Если сейчас запустить нагрузочные тесты на GitHub Actions то, будет ошибка, говорящая о том, что у github token из workflow по умолчанию нет прав на записть в репзоиторий

Для исправления этой ошибки необходимо вручную изменить настройки прав workflow:
-
Откройте вкладку Settings в репозитории GitHub.
-
Перейдите в раздел Actions → General.
Прокрутите страницу вниз до блока Workflow permissions.
Выберите опцию Read and write permissions.
-
Нажмите кнопку Save для сохранения изменений.
После выполнения этих шагов можно отправить код с нагрузочными тестами в удалённый репозиторий.
Запуск нагрузочного теста вручную через GitHub Actions
После того как вы настроили workflow-файл и закоммитили его в репозиторий, вы можете запускать нагрузочные тесты в один клик прямо из интерфейса GitHub.
Перейдите на вкладку Actions вашего репозитория — она находится в верхнем меню.
В списке workflow-ов слева выберите Load Tests.
Нажмите на кнопку Run workflow в правом верхнем углу.
В выпадающем списке Locust config file выберите нужный
.conf
-файл — например,./scenarios/get_operation_with_seeds/v1.0.conf
. Это определит, какой именно сценарий нагрузки будет выполнен.Нажмите зелёную кнопку Run workflow.

После этого GitHub Actions запустит пайплайн: прогонит тест, сгенерирует HTML-отчёт Locust и (если всё настроено) опубликует его на GitHub Pages.
Если нагрузочные тесты пройдут успешно, Locust HTML-отчёт будет сгенерирован и загружен в ветку gh-pages, после чего автоматически запустится workflow pages build and deployment. Этот процесс публикует Locust HTML-отчёт на GitHub Pages, делая его доступным для просмотра в браузере.
Важно! Перед запуском workflow необходимо убедиться, что в репозитории существует ветка gh-pages. Если ветка отсутствует, её необходимо создать в удалённом репозитории, иначе публикация Locust HTML-отчёта на GitHub Pages не будет работать.
Проверка настроек GitHub Pages
Если workflow pages build and deployment не запустился, необходимо проверить настройки GitHub Pages:
Откройте вкладку Settings в репозитории.
Перейдите в раздел Pages → Build and deployment.
Убедитесь, что параметры соответствуют настройкам на скриншоте ниже.
На этой же странице будет отображаться виджет со ссылкой на опубликованный Allure-отчёт.

Доступ к Locust HTML-отчётам
Каждый отчёт публикуется на GitHub Pages с уникальным идентификатором run id, в котором он был сгенерирован.
Все сгенерированные Locust HTML-отчёты также можно найти в ветке gh-pages.
Перейдя по ссылке на GitHub Pages, можно открыть сгенерированный Locust HTML-отчёт с результатами нагрузочного тестирования.

По итогу, после корректной настройки, при каждом новом запуске нагрузочных тестов Locust HTML-отчёт будет автоматически сохраняться в ветку gh-pages и публиковаться на GitHub Pages.

Заключение
Все ссылки на код, отчеты и запуски нагрузочных тестов в CI/CD можно найти на моем GitHub: