Привет! С вами снова писатель-программист из компании Simpl Group (да, без e).
Совсем недавно я выступала на нашем внутреннем Meet Up — уже 6-м, между прочим, — и рассказала своим коллегам занимательную историю, которую поведаю сегодня и вам. Не про ведьм и демонов, конечно, как в моей книге. А про цирк — цифровой цирк, в котором задачи прыгают через обручи, катаются на велосипедах и не падают.
Или, по крайней мере, мы стараемся, чтобы не падали.

(К слову, книгу тоже можете почитать: «Пороки», Ingini)

Представим ситуацию

У вас есть очередь, которая умеет выполнять только один трюк — например, отправлять задачи в расчёт. Всё, как в старом цирке: один артист, один номер.

Но однажды появляется новая задача. И за ней ещё одна. И вот уже зрители хотят видеть не только прыжки через обруч, но и медведя на велосипеде, жонглёров, акробатов и даже кибердракона с машинным обучением.

Значит, мы должны перестроить всё шоу. Так, чтобы:

  • Добавлять новых артистов минимальными усилиями;

  • Не перестраивать манеж каждый раз;

  • Всё работало: надёжно, масштабируемо и эффектно.

Кто в нашем цирке — участники шоу

Представим, что у нас есть две задачи:
Одна — это лошадки, прыгающие через обруч;
Вторая — это мишка на велосипеде, проезжающий то в одну, то в другую сторону.

Что есть общего в этих двух выступлениях?

  • Контроллеры — дрессировщики, подающиеся голосом: «Вперёд!».

  • Репозитории — реквизиторы, вытаскивающие снаряжение из склада (БД).

  • Очереди — манеж, где артисты ждут, пока их объявят.

  • Экзекьютор — тёмный коридор, ведущий от кулис к свету рампы.

  • Менеджеры — двери между закулисьем и сценой.

  • Другие сервисы — собственно сцена, где и происходит номер.

Но есть нюанс:

  • Лошадки прыгают по одной.

  • Медведи катаются по трое.

  • Кто-то выходит через Kafka, кто-то — через HTTP.

По сути мы имеем парочку небольших различий, для которых будет неверно создавать второй сервис или писать почти аналогичный код. Поэтому представим, что кулисы и дрессировщики у мишек и лошадок одни и те же, а двери, актеры и сцены разные.

Как устроен наш манеж — Архитектура

Теперь, когда мы поняли, кто у нас выступает, давайте посмотрим, как работает наш цирк изнутри. Что там в кулисах, и почему никто не спотыкается?

Идея проста:

Сделать единый цирк, куда можно легко впустить любого нового артиста: хоть медведя, хоть жонглёра, ну и да, кибердракона тоже.

В базе

У каждой задачи есть два слоя костюма:

  • Общий: ID, тип, статус, время постановки, ошибки.

  • Специфичный: параметры конкретного артиста.

Получается:

task                       -- общий склад задач 
task_{taskType}_parameters -- гардероб для костюмов

В коде

Самое главное наше оружие — абстрактный дженерик класс на всё, что скорее всего будет использовано не только для одного типа задач.

Покажу вам, как может выглядеть примерный код.

1. Модельки

public interface ITaskParameters { }
public interface ITaskDto { }

public record TaskOneParameters(int Value) : ITaskParameters;
public record TaskTwoParameters(string Data) : ITaskParameters;

public record TaskOneDto(int Value) : ITaskDto;
public record TaskTwoDto(string Data) : ITaskDto;

public class QueueTask where TParam : ITaskParameters
{
    public QueueTask(TParam parameters)
    {
        Parameters = parameters;
        TaskInfo = new QueueTaskInfo();
    }

    public TParam Parameters { get; }
    public QueueTaskInfo TaskInfo { get; }
}

public class QueueTaskInfo
{
	public Guid Id { get; set; }
    public DateTime QueueTime { get; set; }
    public QueueTaskStatus Status { get; set; }
    public QueueTaskType Type { get; set; }
}

public enum QueueTaskStatus
{
    ReadyForExecution,
    InProgress,
    Completed,
    Failed
}

public enum QueueTaskType
{
    TaskOne,
    TaskTwo
}

2. Контроллеры


/// <summary>
/// Базовый контроллер для постановки задач в очередь
/// </summary>
[Route("api/[controller]")]
[ApiController]
public abstract class AbstractQueueTasksController : ControllerBase
    where TParam : ITaskParameters
{
    protected AbstractQueueTasksController(IMediator mediator)
    {
        _mediator = mediator;
    }

    protected IMediator _mediator { get; }
    
    /// <summary>
	/// Общий метод для всех типов задач
	/// </summary>
    [HttpGet("GetTasks")]
	public Task<...> GetAsync(CancellationToken cancellationToken = default)
	{
	    return _mediator.Send(new AbstractGetQueueTasksRequest(), cancellationToken);
	}
}

/// <summary>
/// Контроллер для задач типа "TaskOne"
/// </summary>
public class TaskOneController : AbstractQueueTasksController
{
    public TaskOneController(IMediator mediator) : base(mediator) { }

	/// <summary>
	/// Постановка задачи, которая пришла из другого сервиса, а значит дажнные уже обработаны
	/// </summary>
    [HttpPost("Enqueue")]
    public Task EnqueueAsync(TaskOneDto dto, CancellationToken cancellationToken = default)
    {
        return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken);
    }
}

/// <summary>
/// Контроллер для задач типа "TaskTwo"
/// </summary>
public class TaskTwoController : AbstractQueueTasksController
{
    public TaskTwoController(IMediator mediator) : base(mediator) { }

	/// <summary>
	/// Постановка задачи, которая пришла с фронта
	/// </summary>
    [HttpPost("Enqueue")]
    public Task EnqueueAsync(TaskTwoDto dto, CancellationToken cancellationToken = default)
    {
	    ... // тут какая-то обратка и валидация данных
        return _mediator.Send(new EnqueueTaskCommand(dto), cancellationToken);
    }
}

3. Команда и обработчик


/// <summary>
/// Команда постановки задачи в очередь
/// </summary>
public class EnqueueTaskCommand : IRequest
    where TDto : ITaskDto
{
    public EnqueueTaskCommand(TDto dto) => TaskDto = dto;

    public TDto TaskDto { get; }
}

/// <summary>
/// Базовый обработчик постановки задач
/// </summary>
public abstract class EnqueueTaskCommandHandler : IRequestHandler&gt;
    where TParam : ITaskParameters
    where TDto : ITaskDto
{
    private readonly AbstractDataflowQueue _queue;

    protected EnqueueTaskCommandHandler(AbstractDataflowQueue queue)
    {
        _queue = queue;
    }

    public async Task Handle(EnqueueTaskCommand request, CancellationToken cancellationToken)
    {
        if (request is null)
            throw new ArgumentNullException(nameof(request));

        var param = Map(request.TaskDto);
        await _queue.EnqueueAsync(param, cancellationToken);
    }

	/// <summary>
	/// Просто какая-то работа с данными 
	/// </summary>
    protected abstract TParam Map(TDto dto);
}

4. Очередь

public abstract class AbstractDataflowQueue 
    where TParam : ITaskParameters
{
    private readonly SemaphoreSlim _locker = new(1, 1); // Нужен для защиты от одновременной постановки нескольких задач

    protected AbstractQueueTaskRepository _repository { get; }
    protected AbstractBackgroundExecutingTask _executor { get; }

    protected AbstractDataflowQueue(
        AbstractBackgroundExecutingTask executor,
        AbstractQueueTaskRepository repository)
    {
        _executor = executor;
        _repository = repository;
    }

    public async Task EnqueueAsync(QueueTask item, CancellationToken cancellationToken = default)
    {
        if (item is null) 
	        throw new ArgumentNullException(nameof(item));

        await _locker.WaitAsync(cancellationToken);
        try
        {
            item.TaskInfo.QueueTime = DateTime.Now;
            item.TaskInfo.Status = QueueTaskStatus.ReadyForExecution;

            await _repository.SaveAsync(item, cancellationToken);
            await _executor.TrySendQueueTask(item.Id);
        }
        finally
        {
            _locker.Release();
        }
    }
}

5. Экзекьютер

/// <summary>
/// Базовый экзекьютер: достаёт задачу из очереди и отправляет её в менеджер
/// </summary>
public abstract class AbstractBackgroundExecutingTask
    where TParam : ITaskParameters
{
    protected AbstractBackgroundExecutingTask(
        IManager manager,
        AbstractQueueTaskRepository repository,
        int defaultMaxParallelism = 1)
    {
        _manager = manager;
        _repository = repository;

        var options = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = defaultMaxParallelism,
            BoundedCapacity = DataflowBlockOptions.Unbounded
        };

        _block = new ActionBlock(HandleAsync, options);
    }

    protected IManager _manager { get; }
    protected AbstractQueueTaskRepository _repository { get; }
    protected ActionBlock _block { get; }

    public bool TrySendQueueTask(Guid taskId)
    {
        return _block.Post(taskId);
    }

    private async Task HandleAsync(Guid taskId)
    {
        var task = await _repository.GetTask(taskId);
        if (task == null) return;

        task.TaskInfo.Status = QueueTaskStatus.InProgress;
        await _manager.TransferTask(task);
        
        task.TaskInfo.Status = QueueTaskStatus.Completed;
        await _repository.UpdateAsync(task);
    }
}

/// <summary>
/// Лошадки прыгают по одной
/// </summary>
public class TaskOneExecutor : AbstractBackgroundExecutingTask
{
    public TaskOneExecutor(
        IManager manager,
        AbstractQueueTaskRepository repository)
        : base(manager, repository, defaultMaxParallelism: 1) { }
}

/// <summary>
/// Медведи катаются втроём
/// </summary>
public class TaskTwoExecutor : AbstractBackgroundExecutingTask
{
    public TaskTwoExecutor(
        IManager manager,
        AbstractQueueTaskRepository repository)
        : base(manager, repository, defaultMaxParallelism: 3) { }
}

6. Репозиторий

public abstract class AbstractQueueTaskRepository
    where TParam : ITaskParameters
{
    // Простое хранилище в памяти
    protected readonly Dictionary<Guid, QueueTask<TParam>> _storage = new();

    public virtual Task SaveAsync(QueueTask task, CancellationToken cancellationToken = default)
    {
        _storage[task.TaskInfo.Id] = task;
        return Task.CompletedTask;
    }

    public virtual Task UpdateAsync(QueueTask task, CancellationToken cancellationToken = default)
    {
        if (_storage.ContainsKey(task.TaskInfo.Id))
        {
            _storage[task.TaskInfo.Id] = task;
        }
        return Task.CompletedTask;
    }

    public virtual QueueTask? GetTask(Guid taskId)
    {
        _storage.TryGetValue(taskId, out var task);
        return task;
    }
    
    ...
}

+реализации, сохранение в бд и другая логика

7. Менеджеры

public interface IManager
    where TParam : ITaskParameters
{
    Task TransferTask(QueueTask task);
}

/// <summary>
/// Тут у нас кафка
/// </summary>
public class TaskOneManager : IManager
{
    private readonly ITaskOneProducer _producer;
    private readonly ITaskOneConsumer _consumer;

    public TaskOneManager(
        ITaskOneProducer producer,
        ITaskOneConsumer consumer)
    {
        _producer = producer;
        _consumer = consumer;
    }

    public async Task TransferTask(QueueTask queueTask)
    {
		// Отправка задачи через продюсера
		await _producer.PublishAsync(queueTask);
	
		// Ожидаем результат через консюмера
		await _consumer.GetResult(queueTask.TaskInfo.Id);
    }
}

/// <summary>
/// Тут у нас Refit клиент
/// </summary>
public class TaskTwoManager : IManager
{
    private readonly ITaskTwoClient _client;

    public TaskTwoManager(ITaskTwoClient client)
    {
        _client = client;
    }

    public async Task TransferTask(QueueTask task)
    {
        await _client.SendTaskTwoAsync(task);
    }
}

Разумеется, код самый примитивный, который просто показывает, как можно сделать.

И не забудьте зарегистрировать реализации как синглтон объекты (иначе вся ваша очередь потеряется). Только Менеджеры можно сделать Transient.

Итоговая архитектура:

Как бы мы приручили разношёрстных артистов — расширяемость

Теперь представим, что завтра к нам заходят:

  • Слоны, которые будут делать запросы по SOAP.

  • Пингвины, которые будут танцевать параллельно в 10 потоков. Наша архитектура говорит: «Да не вопрос». Вот как мы добавляем нового зверя в наш цирк:

В базу:

  • Новая таблица параметров.

В код:

  • Реализация абстрактного контроллера, необходимых команд и запросов, модельки

  • Реализация репозитория, очереди, экзекьютора и менеджера.

  • DI-регистрация в Program.cs. (то есть подписываем, что наши животные могут пользоваться любыми нашими рельсами)

И всё. Весь путь — по накатанной. Никто не мешает мишкам, лошадям и слонам выступать одновременно.

Слова автора

Спасибо большое, что прочитали статью мини-мидла. Надеюсь, что вам понравились метафоры) Моим коллегам на Meet Up очень понравились! А там, между прочим, не только разработчики были, но и аналитики, тестеры и даже медийщики!

Если вам интересны такие мероприятия, то заглядывайте к нам. Возможно мы даже скоро выйдем на более глобальный уровень с нашим митапом)

Ну и, конечно же, если вам есть что сказать, то милости прошу в комментарии. Я всегда рада конструктивной критике!

Комментарии (0)