Привет! С вами снова писатель-программист из компании Simpl Group (да, без e).
Совсем недавно я выступала на нашем внутреннем Meet Up — уже 6-м, между прочим, — и рассказала своим коллегам занимательную историю, которую поведаю сегодня и вам. Не про ведьм и демонов, конечно, как в моей книге. А про цирк — цифровой цирк, в котором задачи прыгают через обручи, катаются на велосипедах и не падают.
Или, по крайней мере, мы стараемся, чтобы не падали.
(К слову, книгу тоже можете почитать: «Пороки», Ingini)
Представим ситуацию
У вас есть очередь, которая умеет выполнять только один трюк — например, отправлять задачи в расчёт. Всё, как в старом цирке: один артист, один номер.
Но однажды появляется новая задача. И за ней ещё одна. И вот уже зрители хотят видеть не только прыжки через обруч, но и медведя на велосипеде, жонглёров, акробатов и даже кибердракона с машинным обучением.
Значит, мы должны перестроить всё шоу. Так, чтобы:
Добавлять новых артистов минимальными усилиями;
Не перестраивать манеж каждый раз;
Всё работало: надёжно, масштабируемо и эффектно.
Кто в нашем цирке — участники шоу
Представим, что у нас есть две задачи:
Одна — это лошадки, прыгающие через обруч;
Вторая — это мишка на велосипеде, проезжающий то в одну, то в другую сторону.
Что есть общего в этих двух выступлениях?
Контроллеры — дрессировщики, подающиеся голосом: «Вперёд!».
Репозитории — реквизиторы, вытаскивающие снаряжение из склада (БД).
Очереди — манеж, где артисты ждут, пока их объявят.
Экзекьютор — тёмный коридор, ведущий от кулис к свету рампы.
Менеджеры — двери между закулисьем и сценой.
Другие сервисы — собственно сцена, где и происходит номер.
Но есть нюанс:
Лошадки прыгают по одной.
Медведи катаются по трое.
Кто-то выходит через Kafka, кто-то — через HTTP.
По сути мы имеем парочку небольших различий, для которых будет неверно создавать второй сервис или писать почти аналогичный код. Поэтому представим, что кулисы и дрессировщики у мишек и лошадок одни и те же, а двери, актеры и сцены разные.
Как устроен наш манеж — Архитектура
Теперь, когда мы поняли, кто у нас выступает, давайте посмотрим, как работает наш цирк изнутри. Что там в кулисах, и почему никто не спотыкается?
Идея проста:
Сделать единый цирк, куда можно легко впустить любого нового артиста: хоть медведя, хоть жонглёра, ну и да, кибердракона тоже.
В базе
У каждой задачи есть два слоя костюма:
Общий: ID, тип, статус, время постановки, ошибки.
Специфичный: параметры конкретного артиста.
Получается:
task -- общий склад задач
task_{taskType}_parameters -- гардероб для костюмов
В коде
Самое главное наше оружие — абстрактный дженерик класс на всё, что скорее всего будет использовано не только для одного типа задач.
Покажу вам, как может выглядеть примерный код.
1. Модельки
public interface ITaskParameters { }
public interface ITaskDto { }
public record TaskOneParameters(int Value) : ITaskParameters;
public record TaskTwoParameters(string Data) : ITaskParameters;
public record TaskOneDto(int Value) : ITaskDto;
public record TaskTwoDto(string Data) : ITaskDto;
public class QueueTask where TParam : ITaskParameters
{
public QueueTask(TParam parameters)
{
Parameters = parameters;
TaskInfo = new QueueTaskInfo();
}
public TParam Parameters { get; }
public QueueTaskInfo TaskInfo { get; }
}
public class QueueTaskInfo
{
public Guid Id { get; set; }
public DateTime QueueTime { get; set; }
public QueueTaskStatus Status { get; set; }
public QueueTaskType Type { get; set; }
}
public enum QueueTaskStatus
{
ReadyForExecution,
InProgress,
Completed,
Failed
}
public enum QueueTaskType
{
TaskOne,
TaskTwo
}
2. Контроллеры
/// <summary>
/// Базовый контроллер для постановки задач в очередь
/// </summary>
[Route("api/[controller]")]
[ApiController]
public abstract class AbstractQueueTasksController : ControllerBase
where TParam : ITaskParameters
{
protected AbstractQueueTasksController(IMediator mediator)
{
_mediator = mediator;
}
protected IMediator _mediator { get; }
/// <summary>
/// Общий метод для всех типов задач
/// </summary>
[HttpGet("GetTasks")]
public Task<...> GetAsync(CancellationToken cancellationToken = default)
{
return _mediator.Send(new AbstractGetQueueTasksRequest(), cancellationToken);
}
}
/// <summary>
/// Контроллер для задач типа "TaskOne"
/// </summary>
public class TaskOneController : AbstractQueueTasksController
{
public TaskOneController(IMediator mediator) : base(mediator) { }
/// <summary>
/// Постановка задачи, которая пришла из другого сервиса, а значит дажнные уже обработаны
/// </summary>
[HttpPost("Enqueue")]
public Task EnqueueAsync(TaskOneDto dto, CancellationToken cancellationToken = default)
{
return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken);
}
}
/// <summary>
/// Контроллер для задач типа "TaskTwo"
/// </summary>
public class TaskTwoController : AbstractQueueTasksController
{
public TaskTwoController(IMediator mediator) : base(mediator) { }
/// <summary>
/// Постановка задачи, которая пришла с фронта
/// </summary>
[HttpPost("Enqueue")]
public Task EnqueueAsync(TaskTwoDto dto, CancellationToken cancellationToken = default)
{
... // тут какая-то обратка и валидация данных
return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken);
}
}
3. Команда и обработчик
/// <summary>
/// Команда постановки задачи в очередь
/// </summary>
public class EnqueueTaskCommand : IRequest
where TDto : ITaskDto
{
public EnqueueTaskCommand(TDto dto) => TaskDto = dto;
public TDto TaskDto { get; }
}
/// <summary>
/// Базовый обработчик постановки задач
/// </summary>
public abstract class EnqueueTaskCommandHandler : IRequestHandler>
where TParam : ITaskParameters
where TDto : ITaskDto
{
private readonly AbstractDataflowQueue _queue;
protected EnqueueTaskCommandHandler(AbstractDataflowQueue queue)
{
_queue = queue;
}
public async Task Handle(EnqueueTaskCommand request, CancellationToken cancellationToken)
{
if (request is null)
throw new ArgumentNullException(nameof(request));
var param = Map(request.TaskDto);
await _queue.EnqueueAsync(param, cancellationToken);
}
/// <summary>
/// Просто какая-то работа с данными
/// </summary>
protected abstract TParam Map(TDto dto);
}
4. Очередь
public abstract class AbstractDataflowQueue
where TParam : ITaskParameters
{
private readonly SemaphoreSlim _locker = new(1, 1); // Нужен для защиты от одновременной постановки нескольких задач
protected AbstractQueueTaskRepository _repository { get; }
protected AbstractBackgroundExecutingTask _executor { get; }
protected AbstractDataflowQueue(
AbstractBackgroundExecutingTask executor,
AbstractQueueTaskRepository repository)
{
_executor = executor;
_repository = repository;
}
public async Task EnqueueAsync(QueueTask item, CancellationToken cancellationToken = default)
{
if (item is null)
throw new ArgumentNullException(nameof(item));
await _locker.WaitAsync(cancellationToken);
try
{
item.TaskInfo.QueueTime = DateTime.Now;
item.TaskInfo.Status = QueueTaskStatus.ReadyForExecution;
await _repository.SaveAsync(item, cancellationToken);
await _executor.TrySendQueueTask(item.Id);
}
finally
{
_locker.Release();
}
}
}
5. Экзекьютер
/// <summary>
/// Базовый экзекьютер: достаёт задачу из очереди и отправляет её в менеджер
/// </summary>
public abstract class AbstractBackgroundExecutingTask
where TParam : ITaskParameters
{
protected AbstractBackgroundExecutingTask(
IManager manager,
AbstractQueueTaskRepository repository,
int defaultMaxParallelism = 1)
{
_manager = manager;
_repository = repository;
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = defaultMaxParallelism,
BoundedCapacity = DataflowBlockOptions.Unbounded
};
_block = new ActionBlock(HandleAsync, options);
}
protected IManager _manager { get; }
protected AbstractQueueTaskRepository _repository { get; }
protected ActionBlock _block { get; }
public bool TrySendQueueTask(Guid taskId)
{
return _block.Post(taskId);
}
private async Task HandleAsync(Guid taskId)
{
var task = await _repository.GetTask(taskId);
if (task == null) return;
task.TaskInfo.Status = QueueTaskStatus.InProgress;
await _manager.TransferTask(task);
task.TaskInfo.Status = QueueTaskStatus.Completed;
await _repository.UpdateAsync(task);
}
}
/// <summary>
/// Лошадки прыгают по одной
/// </summary>
public class TaskOneExecutor : AbstractBackgroundExecutingTask
{
public TaskOneExecutor(
IManager manager,
AbstractQueueTaskRepository repository)
: base(manager, repository, defaultMaxParallelism: 1) { }
}
/// <summary>
/// Медведи катаются втроём
/// </summary>
public class TaskTwoExecutor : AbstractBackgroundExecutingTask
{
public TaskTwoExecutor(
IManager manager,
AbstractQueueTaskRepository repository)
: base(manager, repository, defaultMaxParallelism: 3) { }
}
6. Репозиторий
public abstract class AbstractQueueTaskRepository
where TParam : ITaskParameters
{
// Простое хранилище в памяти
protected readonly Dictionary<Guid, QueueTask<TParam>> _storage = new();
public virtual Task SaveAsync(QueueTask task, CancellationToken cancellationToken = default)
{
_storage[task.TaskInfo.Id] = task;
return Task.CompletedTask;
}
public virtual Task UpdateAsync(QueueTask task, CancellationToken cancellationToken = default)
{
if (_storage.ContainsKey(task.TaskInfo.Id))
{
_storage[task.TaskInfo.Id] = task;
}
return Task.CompletedTask;
}
public virtual QueueTask? GetTask(Guid taskId)
{
_storage.TryGetValue(taskId, out var task);
return task;
}
...
}
+реализации, сохранение в бд и другая логика
7. Менеджеры
public interface IManager
where TParam : ITaskParameters
{
Task TransferTask(QueueTask task);
}
/// <summary>
/// Тут у нас кафка
/// </summary>
public class TaskOneManager : IManager
{
private readonly ITaskOneProducer _producer;
private readonly ITaskOneConsumer _consumer;
public TaskOneManager(
ITaskOneProducer producer,
ITaskOneConsumer consumer)
{
_producer = producer;
_consumer = consumer;
}
public async Task TransferTask(QueueTask queueTask)
{
// Отправка задачи через продюсера
await _producer.PublishAsync(queueTask);
// Ожидаем результат через консюмера
await _consumer.GetResult(queueTask.TaskInfo.Id);
}
}
/// <summary>
/// Тут у нас Refit клиент
/// </summary>
public class TaskTwoManager : IManager
{
private readonly ITaskTwoClient _client;
public TaskTwoManager(ITaskTwoClient client)
{
_client = client;
}
public async Task TransferTask(QueueTask task)
{
await _client.SendTaskTwoAsync(task);
}
}
Разумеется, код самый примитивный, который просто показывает, как можно сделать.
И не забудьте зарегистрировать реализации как синглтон объекты (иначе вся ваша очередь потеряется). Только Менеджеры можно сделать Transient.
Итоговая архитектура:

Как бы мы приручили разношёрстных артистов — расширяемость
Теперь представим, что завтра к нам заходят:
Слоны, которые будут делать запросы по SOAP.
Пингвины, которые будут танцевать параллельно в 10 потоков. Наша архитектура говорит: «Да не вопрос». Вот как мы добавляем нового зверя в наш цирк:
В базу:
Новая таблица параметров.
В код:
Реализация абстрактного контроллера, необходимых команд и запросов, модельки
Реализация репозитория, очереди, экзекьютора и менеджера.
DI-регистрация в
Program.cs
. (то есть подписываем, что наши животные могут пользоваться любыми нашими рельсами)
И всё. Весь путь — по накатанной. Никто не мешает мишкам, лошадям и слонам выступать одновременно.
Слова автора
Спасибо большое, что прочитали статью мини-мидла. Надеюсь, что вам понравились метафоры) Моим коллегам на Meet Up очень понравились! А там, между прочим, не только разработчики были, но и аналитики, тестеры и даже медийщики!
Если вам интересны такие мероприятия, то заглядывайте к нам. Возможно мы даже скоро выйдем на более глобальный уровень с нашим митапом)
Ну и, конечно же, если вам есть что сказать, то милости прошу в комментарии. Я всегда рада конструктивной критике!