Привет, Хабр!

В .NET живёт старый, понятный инструмент для конкурентной обработки — BlockingCollection<T>. Коллекция, которая упрощает модель producer–consumer, даёт строгую ограниченную вместимость и предсказуемую блокировку при пустоте или переполнении. Да, у нас есть Channel<T>, есть TPL Dataflow, есть миллион самодельных очередей на SemaphoreSlim. Но когда нужны простые правила и железный backpressure без BlockingCollection по-прежнему закрывает задачи хорошо.

Что именно даёт коллекция

  1. Ограниченная вместимость. Её задают в конструкторе. Значение — это бюджет по очереди. Производитель автоматически притормаживает, когда очередь заполнена.

  2. Блокирующие операции. Add и Take блокируют поток до освобождения места или появления элемента. Есть неблокирующие аналоги TryAdd и TryTake с таймаутами и CancellationToken.

  3. Контроль завершения жизненного цикла. Производители вызывают CompleteAdding, потребители ориентируются по IsCompleted и выходят без ворнингов и подвисаний.

  4. Перечисление с изъятием. GetConsumingEnumerable позволяет писать потребителя через foreach, без ручных циклов ожидания.

  5. Массивы очередей. Есть TakeFromAny и AddToAny, когда нужно читать из нескольких источников как из одного.

В качестве базовой коллекции под капотом по дефолту используется ConcurrentQueue<T> (FIFO), но можно подложить и ConcurrentStack<T> (LIFO), и ConcurrentBag<T> — зависит от семантики.

Каркас пула воркеров без подвисаний

Задача: дать backpressure, аккуратно остановить по сигналу, не ловить повисшие Take, не сжечь CPU спином.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

public sealed class WorkerPool<T> : IDisposable
{
    private readonly BlockingCollection<T> _queue;
    private readonly Task[] _workers;
    private readonly CancellationTokenSource _cts = new();

    public WorkerPool(
        int boundedCapacity,
        int degreeOfParallelism,
        Action<T, CancellationToken> handler)
    {
        if (boundedCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(boundedCapacity));
        if (degreeOfParallelism <= 0) throw new ArgumentOutOfRangeException(nameof(degreeOfParallelism));
        if (handler is null) throw new ArgumentNullException(nameof(handler));

        _queue = new BlockingCollection<T>(new ConcurrentQueue<T>(), boundedCapacity);

        _workers = new Task[degreeOfParallelism];
        for (int i = 0; i < degreeOfParallelism; i++)
        {
            _workers[i] = Task.Factory.StartNew(() =>
            {
                var ct = _cts.Token;
                while (!_queue.IsCompleted)
                {
                    try
                    {
                        var item = _queue.Take(ct);
                        handler(item, ct);
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (InvalidOperationException)
                    {
                        break; // очередь завершена и пуста
                    }
                }
            }, TaskCreationOptions.LongRunning);
        }
    }

    public bool TryPost(T item, TimeSpan timeout, CancellationToken ct = default)
        => _queue.TryAdd(item, timeout, ct);

    public void Complete() => _queue.CompleteAdding();

    public void Dispose()
    {
        try
        {
            _cts.Cancel();
            _queue.CompleteAdding();
            Task.WaitAll(_workers, TimeSpan.FromSeconds(5));
        }
        finally
        {
            _queue.Dispose();
            _cts.Dispose();
        }
    }
}

TaskCreationOptions.LongRunning просит планировщик выделить отдельный поток под воркер, это снижает конкуренцию за пул, когда обработка не микроскопическая. Не стоит ставить LongRunning везде по привычке. Если обработка очень короткая, обычные Task.Run без LongRunning дают достаточную утилизацию.

Когда производители вызвали CompleteAdding и очередь опустела, Take кидает InvalidOperationException.

Таймауты, отмена и отказ от пустых спинов

Спин с TryTake в вечном цикле, где задержка реализована Thread.Sleep(1) — плохая идея. Это нагружает CPU без пользы и портит энергопотребление. Вариант с таймаутами и внешней отменой выглядит адекватнее:

public static void DrainWithTimeout<T>(
    BlockingCollection<T> bc,
    Action<T> handle,
    TimeSpan wait,
    CancellationToken ct)
{
    while (!bc.IsCompleted)
    {
        try
        {
            if (bc.TryTake(out var item, (int)wait.TotalMilliseconds, ct))
            {
                handle(item);
            }
            // иначе таймаут — просто идём в новую итерацию
        }
        catch (OperationCanceledException)
        {
            break;
        }
        catch (InvalidOperationException)
        {
            break;
        }
    }
}

Таймаут выбирайте исходя из SLA на латентность. Если важна минимальная задержка, таймаут меньше, но выше накладные расходы на переключения. Если нужна экономия CPU, таймаут больше. Универсальной цифры нет.

Правильное завершение при нескольких производителях

Типичная ошибка вызывать CompleteAdding из потребителя, потому что кажется, всё. Если производителей несколько, это прямой путь к потерянным элементам. Завершение — ответственность уровня, который точно знает, что новые элементы больше не придут. Потребителям достаточно IsCompleted.

Безопаснее всего вызывать CompleteAdding в finally рядом с производством, даже если прод не добежал до конца цикла из-за отмены или исключения.

Ещё один момент, о котором часто забывают. Dispose на коллекции не обязан разбудить блокирующие Take. Правильный порядок: CompleteAdding, дождаться выхода воркеров, после этого Dispose. Пытаться убить ожидание диспозом так себе затея.

Выбор базовой коллекции под капотом

По умолчанию стоит ConcurrentQueue<T> — это FIFO и в большинстве систем это то, что нужно. Если задача специфичная:

  • Нужен LIFO, потому что последний пришедший важнее — используйте ConcurrentStack<T>.

  • Пул объектов с безразличием к порядку — ConcurrentBag<T>.

PriorityQueue<TElement,TPriority> не реализует IProducerConsumerCollection<T>, напрямую подложить нельзя. Если нужна приоритезация, делаем свою оболочку, либо переходим к Channel<T> и маршрутизации с несколькими каналами.

GetConsumingEnumerable: чистый потребитель без ручных циклов

Нравится многим из-за читаемости. Перечислитель блокируется в ожидании элемента и завершает перечисление, когда очередь закрыта и опустела.

public static Task StartConsumer<T>(
    BlockingCollection<T> bc,
    Action<T> handler,
    CancellationToken ct)
{
    return Task.Factory.StartNew(() =>
    {
        foreach (var item in bc.GetConsumingEnumerable(ct))
            handler(item);
    }, TaskCreationOptions.LongRunning);
}

У этого подхода один нюанс: не кладите тяжёлую логику прямо в перечислитель, тяните наружу. Тогда тесты и отладка проще.

Конвейер из нескольких стадий с чётким управлением завершением

Хорошая практика — стадия на вход, стадия на выход. Каждая стадия после завершения обработки входа вызывает CompleteAdding на своём выходе. Следующая стадия завершится автоматически.

public static class Pipeline
{
    public static void Run(CancellationToken ct)
    {
        using var stage1 = new BlockingCollection<int>(boundedCapacity: 256);
        using var stage2 = new BlockingCollection<string>(boundedCapacity: 256);

        var producer = Task.Run(() =>
        {
            try
            {
                for (var i = 0; i < 10_000; i++)
                {
                    ct.ThrowIfCancellationRequested();
                    stage1.Add(i, ct);
                }
            }
            finally
            {
                stage1.CompleteAdding();
            }
        }, ct);

        var middle = Task.Run(() =>
        {
            try
            {
                foreach (var n in stage1.GetConsumingEnumerable(ct))
                {
                    var line = $"value={n}";
                    stage2.Add(line, ct);
                }
            }
            finally
            {
                stage2.CompleteAdding();
            }
        }, ct);

        var consumer = Task.Run(() =>
        {
            foreach (var s in stage2.GetConsumingEnumerable(ct))
            {
                // финальная обработка
            }
        }, ct);

        Task.WaitAll(producer, middle, consumer);
    }
}

Остановка прозрачная. Нет дублирующих сигналов. Нет скрытых флагов. Всё видно в конструкторе и в finally.

Несколько входов: TakeFromAny и аккуратный выход

Когда источников несколько, можно читать из любой очереди, которая готова. Важно не забыть критерий выхода, иначе можно зависнуть в ожидании.

public static void DrainAny(
    BlockingCollection<byte[]>[] inputs,
    Action<byte[]> handle,
    CancellationToken ct)
{
    while (true)
    {
        if (AllCompleted(inputs)) break;

        try
        {
            int idx = BlockingCollection<byte[]>.TakeFromAny(inputs, out var item, ct);
            handle(item);
        }
        catch (OperationCanceledException)
        {
            break;
        }
        catch (InvalidOperationException)
        {
            // одна из очередей завершилась и пуста — проверим общую готовность и продолжим
        }
    }

    static bool AllCompleted(BlockingCollection<byte[]>[] arr)
    {
        foreach (var bc in arr)
            if (!bc.IsCompleted) return false;
        return true;
    }
}

Справедливости по источникам контракт не обещает. Никаких гарантий чередования нет. Если нужен приоритет, делайте его сами: например, сначала TryTake с нулевым таймаутом из приоритетной, потом TryTakeFromAny из остальных с таймаутом.

Ошибки обработчиков и вредные элементы

Исключение внутри обработчика это не событие самой очереди, это ошибка прикладной логики. Решение должно быть явным. Удобно сделать инъекцию обработчика ошибок и выделенный dead letter выход.

public sealed class SafeWorkerPool<T> : IDisposable
{
    // ... те же поля, что и раньше
    private readonly Action<Exception, T>? _onError;

    public SafeWorkerPool(
        int capacity, int dop,
        Action<T, CancellationToken> handler,
        Action<Exception, T>? onError = null)
    {
        // ... инициализация очереди
        _onError = onError;

        // в воркере:
        // try { handler(item, ct); }
        // catch (Exception ex) { _onError?.Invoke(ex, item); }
    }

    // остальное — как в WorkerPool
}

Если операция идемпотентная — можно ретраить с ограничением по числу попыток, инкрементировать счётчик в метриках, после N выбрасывать в мертвое письмо. Если не идемпотентная — ретраи без обёртки вокруг побочных эффектов опасны.

Метрики и наблюдаемость

Минимальные показатели, которые помогают видеть систему живой:

  • Длина очереди. Текущий снимок, не барьер синхронизации.

  • Число успешно добавленных и извлечённых элементов.

  • Доли таймаутов у производителей и потребителей.

  • Число отменённых операций.

  • Распределение времени обработки элемента.

Пример на System.Diagnostics.Metrics:

using System.Diagnostics.Metrics;

public sealed class BcMetrics : IDisposable
{
    private readonly Meter _meter = new("MyService.BlockingCollection");
    private int _queueLength;

    public Counter<long> Added { get; }
    public Counter<long> Taken { get; }
    public Counter<long> Timeouts { get; }

    public BcMetrics()
    {
        Added = _meter.CreateCounter<long>("added_total");
        Taken = _meter.CreateCounter<long>("taken_total");
        Timeouts = _meter.CreateCounter<long>("timeouts_total");
        _meter.CreateObservableGauge("queue_length", () => new Measurement<int>(_queueLength));
    }

    public void SetLength(int value) => _queueLength = value;

    public void Dispose() => _meter.Dispose();
}

Обновляйте SetLength(bc.Count) там, где и так есть контекст управления очередью. Не плодите отдельные таймеры для опроса длины на высоких частотах без смысла.

Размер очереди и параллелизм: как выбирать числа

Жёсткая формула конечно же редко работает. Разумный подход:

  • Верхняя граница — исходя из бюджета памяти на элемент и желаемого хвоста в пик. Если один элемент — 2 КБ, и готовы держать в хвосте до 50 тысяч, вместимость 50 тысяч — это плюс-минус 100 МБ данных, не считая накладных расходов.

  • Параллелизм — из профиля работы обработчика. Если он CPU-bound, стартуйте с числа логических ядер минус один. Если обработчик часто ждёт IO, можно увеличить, но следите за прерываниями и конкуренцией.

  • Если видите постоянную блокировку производителей — либо мало воркеров, либо обработка медленная. Увеличение вместимости без ускорения обработки — только наращивает задержку, а не пропускную способность.

Интеграция с Generic Host: корректная остановка по сигналу

В сервисах на IHost хочется, чтобы по ApplicationStopping всё завершалось предсказуемо. Поддержка проста: на сигнале останавливаем производителей, вызываем CompleteAdding, ждём выход воркеров в пределах таймаута, затем выходим.

public sealed class QueueService : IHostedService
{
    private readonly WorkerPool<MyItem> _pool;
    private readonly IHostApplicationLifetime _lifetime;

    public QueueService(IHostApplicationLifetime lifetime)
    {
        _lifetime = lifetime;
        _pool = new WorkerPool<MyItem>(
            boundedCapacity: 10_000,
            degreeOfParallelism: Environment.ProcessorCount,
            handler: ProcessItem);
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _lifetime.ApplicationStopping.Register(() =>
        {
            _pool.Complete();
            _pool.Dispose();
        });
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;

    private static void ProcessItem(MyItem item, CancellationToken ct)
    {
        // обработка
    }
}

Главная мысль не пытаться диспозить что-то, пока есть висящие Take. Сначала закрываем вход, затем ждём.

Когда пора переходить на Channel

Если обработчики асинхронные, вы не хотите занимать потоки ожиданием, и у вас есть смысл в аккуратной, естественной интеграции с async/await — каналы ощутимо удобнее. Там есть bounded-режимы, backpressure, отложенные записи, ReadAllAsync, возможность тонко описать режим полного канала. Для CPU-bound коротких задач выигрыш на каналах часто заметен за счёт отсутствия блокировки потока. Для длительных синхронных — наоборот, особого выигрыша не будет.

Правильный критерий — профиль нагрузки и архитектурная модель. Не переносите систему ради моды.

Итого

BlockingCollection по-прежнему удобный и надёжный способ реализовать производителя и потребителя с ограничением вместимости и предсказуемым завершением. Делитесь кейсами в комментариях.


Даже самая элегантная система рано или поздно сталкивается с грубой реальностью: API превращается в зоопарк интеграций, микросервисы начинают сбоить под нагрузкой, а надежность упирается в неочевидные детали реализации. Если эти проблемы вам знакомы — есть возможность разобрать их на конкретных примерах и найти рабочие решения на бесплатных уроках:

  • 11 сентября в 20:00 — API Gateway и не только: шаги к идеальной архитектуре внешних API. Записаться

  • 17 сентября в 20:00 — Паттерны отказоустойчивости и масштабируемости микросервисной архитектуры. Записаться

Пройдите вступительное тестирование, чтобы понять, насколько вы готовы к системному обучению и подойдёт ли курс "Software Architect" именно вам.

А чтобы оставаться в курсе актуальных технологий и трендов, подписывайтесь на Telegram-канал OTUS.

Комментарии (8)


  1. dyadyaSerezha
    04.09.2025 03:23

    Полезная статья, но

    1. Производитель автоматически притормаживает, когда очередь заполнена.

    2. Блокирующие операции. Add и Take блокируют поток до ...

    Что значит "притормаживает"? Понимаю "Add блокирует поток до". Не понимаю "притормаживает" и чем это отличается от "блокирует поток". Если ничем, зачем вообще пункт 1?

    Ну и моё занудно-любимое:

    Таймаут выбирайте исходя из SLA на латентность.

    Латентность (от лат. latentis — скрытый, невидимый) — свойство объектов или процессов находиться в скрытом состоянии, не проявляя себя явным образом. А вовсе не задержка.


    1. sontarru
      04.09.2025 03:23

      Занудствовать, так занудствовать. Кембриджский словарь даёт всё-таки еще одно значение:

      latency


      1. dyadyaSerezha
        04.09.2025 03:23

        При чем здесь английский словарь и статья на русском? Или мы тут уже сосидж писами слайсим?


      1. iamkisly
        04.09.2025 03:23

        Очевидно, что если бы оно было написано латиницей, то вопросов бы не было.. а так увы


    1. EasyGame
      04.09.2025 03:23

      Полезная статья, но

      О какой полезности речь? Лет этак шесть назад в дотнет выкатили Channel<T> как раз на замену достаточно древней BlockingCollection. Все что о ней нужно знать - это то, что если она встретилась в коде - ее нужно переписать на более эффективный механизм, ну или просто не трогать ;)

      Да, можно подушнить, что каналы нормально не работают в полностью синхронном коде, но такие проекты уже даже не редкость, а скорее реликвии прошлого к которым подпускать неофитов - суть кощунство.


      1. dyadyaSerezha
        04.09.2025 03:23

        Вы удивитесь, но многие очень преочень большие компании работают на версиях языков 10+ летней давности и крайне неохотно переходят даже на версии 8-летней давности.

        Кроме того, статья полезна для общего развития и знания средств .NET. Замечу, все ещё существующих и не объявленных официально устаревшими (obsolete) средств.


      1. iamkisly
        04.09.2025 03:23

        достаточно древней BlockingCollection

        .. и очень медленной


  1. Keeper22
    04.09.2025 03:23

    Казалось бы, при чём тут Мисато Кацураги?