Практически в каждом высоконагруженном .NET-проекте рано или поздно появляется один и тот же паттерн:

Есть коллекция данных.

Для каждого элемента нужно выполнить дорогую операцию.

Например:

  • вычислить хэш;

  • получить эмбеддинг;

  • обратиться к ИИ-модели;

  • построить отчёт;

  • обработать изображение;

  • выполнить финансовый расчёт;

  • провести нормализацию данных.

На первый взгляд решение очевидно:

Parallel.ForEach(items, item =>
{    Process(item);
});

Но проблема возникает тогда, когда внутри коллекции появляются дубликаты.

Например:

  • один и тот же пользователь встречается десятки раз;

  • один и тот же документ приходит из разных источников;

  • одинаковые сообщения попадают в очередь;

  • одинаковые строки требуют одинакового преобразования.

В такой ситуации Parallel.ForEach честно пересчитывает всё заново.

Даже если вычисление уже выполнялось секунду назад.

Даже если результат уже лежит в памяти.

Даже если в коллекции 99% одинаковых данных.

Именно эту проблему решает Principium.Parallel.

Когда это действительно нужно

Библиотека не пытается заменить TPL.

Если у вас уникальные данные и дешёвая обработка — используйте обычный Parallel.ForEach.

Но есть несколько сценариев, где выигрыш может достигать десятков и сотен раз.

Сценарий 1. Генерация эмбеддингов

Допустим, вы строите RAG.

В коллекции 100 000 документов.

Из них 80 000 уже индексировались раньше.

Без кэша:

100 000 вызовов embedding model

С Principium:

20 000 вызовов embedding model

20 000 вызовов embedding model

Остальное берётся из памяти.

Сценарий 2. Массовая обработка сообщений

В очереди постоянно встречаются повторяющиеся события:

User 1 updated
User 1 updated
User 1 updated
User 1 updated
User 1 updated

В большинстве случаев интересует только последнее состояние.

Повторные вычисления просто сжигают CPU.

Сценарий 3. ETL и Data Processing

При импорте миллионов записей часто встречаются повторяющиеся ключи.

Типичный код выглядит так:

Parallel.ForEach(rows, row =>
{    Normalize(row);
});

На практике половина процессорного времени уходит на обработку одинаковых данных.

Сценарий 4. Работа с LLM

Если вычисление стоит дорого:

payload => llm.Generate(payload)

то даже небольшое количество дубликатов превращается в огромные потери времени и денег.

Сценарий 5. Когда GroupBy(...).Last() не работает

Есть важный класс задач, где наивная агрегация через GroupBy(...).Select(g => g.Last()) не даёт эквивалентного результата.

Это происходит, когда итоговая обработка зависит не только от «последнего элемента по ключу», но и от промежуточного состояния или уже выполненных вычислений.

Например: ключи участвуют в каскадной обработке, где результат одного элемента влияет на обработку других, или когда вычисление строится как граф зависимостей, а не как независимые группы.

В таких сценариях предварительная группировка ломает саму модель вычислений: вы теряете контекст, порядок и возможность переиспользовать частично рассчитанные результаты.

Именно поэтому подход Principium.Parallel не пытается «схлопнуть коллекцию заранее», а делает коалесинг во время выполнения — сохраняя семантику LWW и параллелизм одновременно.

Вот аккуратный абстрактный пример кода, который хорошо иллюстрирует именно этот сценарий (каскадная зависимость + состояние + невозможность предварительного GroupBy(...).Last()):

// Пример: обработка графа зависимостей задач
// Каждый узел влияет на другие, и результат зависит от промежуточного состояния
var nodes = GetNodes(); 
// (Id, Payload)
// НЕЛЬЗЯ так:
// теряется порядок распространения влияния и промежуточное состояние
// var result = nodes.GroupBy(x => x.Id).Select(g => g.Last());
// Parallel.ForEach(result, ProcessNode);
// ✔ Правильная модель: состояние накапливается во время вычислений
// и может влиять на другие ключи
var globalState = new ConcurrentDictionary<int, NodeState>();
var result = Paralleling.ForEach(    
  nodes,    
  node => node.Id,    
  node => node.Payload,    
  payload =>    
  {        
    var state = ComputeState(payload);        
    // важно: результат может влиять на другие узлы        
    foreach (var dep in state.Dependencies)        
    {            
      globalState.AddOrUpdate(                
        dep,                
        _ => state,                
        (_, old) => Merge(old, state)            
      );        
    }        
    return state;    
  },    
  new PrincipiumOptions(        
    ttl: TimeSpan.FromMinutes(10),        
    cacheCapacity: 100_000,        
    requireLww: true),    
  adaptiveKey: "dependency-graph");

Почему здесь GroupBy(...).Last() не работает

Если бы мы сделали предварительное схлопывание:

nodes.GroupBy(x => x.Id).Select(g => g.Last())

мы бы:

  • потеряли промежуточные состояния ComputeState

  • не смогли бы обновить globalState корректно

  • разрушили бы зависимость между узлами графа

  • превратили потоковую модель в «плоскую», хотя она графовая

Здесь ключ не просто идентификатор для дедупликации.

Он участвует в динамическом взаимодействии между вычислениями, где:

  • порядок влияет косвенно через состояние

  • результаты одного элемента меняют обработку других

  • нельзя заранее решить, какой элемент "последний важный"

И именно такие сценарии и являются границей, где GroupBy(...).Last() перестаёт быть эквивалентной моделью вычисления, а не просто оптимизацией.

Что предлагает Principium

Principium анализирует входной набор данных и автоматически выбирает стратегию выполнения.

Под капотом существуют три режима.

ParallelOnly

Используется при низком количестве дубликатов.

Максимально похож на обычный Parallel.ForEach.

Дубликатов мало
→ кэш не нужен
→ просто параллельное выполнение

CacheOnly

Используется при среднем количестве дубликатов.

Сначала проверяем кэш
Потом считаем только отсутствующие значения

CoalesceAndCache

Используется при высокой дупликации.

Дубликаты схлопываются
+
используется кэш
+
вычисления выполняются параллельно

Именно этот режим даёт максимальный выигрыш.

Установка

Через .NET CLI:

dotnet add package Principium.Parallel

Через Package Manager:

Install-Package Principium.Parallel

Подключаем пространство имён:

using Principium;

Первый пример

Допустим, есть список пользователей.

var users = Enumerable.Range(1, 10000)    
  .Select(x => $"User_{x}")    
  .ToList();

Обработка:

var results = Paralleling.ForEach(    
  users,    
  keySelector: x => x,    
  payloadSelector: x => x,    
  work: value =>    {        
    Thread.Sleep(10);        
    return value.ToUpperInvariant();    
  });

Результат:

Console.WriteLine(results["User_100"]);

Пример с дубликатами

record UserEvent(int UserId, string Payload);
var events = new[]
{    
new UserEvent(1, "A"),    
new UserEvent(1, "B"),    
new UserEvent(1, "C"),    
new UserEvent(2, "D")
};

Запуск:

var result = Paralleling.ForEach(    
  events,    
  x => x.UserId,    
  x => x.Payload,    
  payload =>    {        
    return payload.ToUpperInvariant();    
  });

Для UserId = 1 останется только последнее значение.

То есть будет соблюдена семантика:

Last Write Wins (LWW)

Настройка

Поведение можно изменить через PrincipiumOptions.

var options = new PrincipiumOptions
{    
SampleSize = 4096,    
LowDupThreshold = 0.05,    
HighDupThreshold = 0.80,    
RequireLww = true,    
Ttl = TimeSpan.FromMinutes(10),    
CacheCapacity = 100000
};

Использование:

var result = Paralleling.ForEach(    
  source,    
  x => x.Id,    
  x => x.Payload,    
  Process,    
  options);

Переиспользование кэша между вызовами

Самый интересный режим.

Можно передать adaptiveKey:

var result = Paralleling.ForEach(    
  source,    
  x => x.Id,    
  x => x.Payload,    
  Process,    
  adaptiveKey: "orders");

Теперь внутренний движок и кэш будут использоваться повторно.

Если данные приходят пачками и содержат повторения, производительность может вырасти на порядок.

Что происходит внутри

Для каждого значения строится 128-битный отпечаток:

FNV-128 fingerprint

Затем происходит:

  1. Проверка кэша.

  2. Проверка срока жизни.

  3. Проверка совпадения отпечатка.

  4. Возврат результата без повторного вычисления.

По умолчанию сравнение строгое.

Также можно использовать Hamming Distance для нестрогого совпадения.

var options = new PrincipiumOptions
{    
HammingThreshold = 8
};

Бенчмарки

Тестовый стенд:

Windows 11
.NET 8
10 000 элементов
Тяжёлая CPU-bound нагрузка

Результаты:

Сценарий

Parallel.ForEach

Dict LWW

MemoryCache

Principium Cold

Principium Warm

99% дубликатов

682 ms

14 ms

15 ms

19 ms

1 ms

90% дубликатов

632 ms

80 ms

81 ms

619 ms

4 ms

50% дубликатов

651 ms

376 ms

361 ms

2884 ms

74 ms

10% дубликатов

593 ms

384 ms

399 ms

4242 ms

318 ms

0% дубликатов

1013 ms

485 ms

397 ms

4589 ms

929 ms

Почему Principium Cold медленнее

На это стоит обратить внимание.

Многие смотрят только на колонку Cold и делают неправильный вывод.

Cold означает:

Каждый запуск создаётся новый Engine
+
Новый кэш
+
Новая структура анализа

То есть библиотека не может использовать результаты предыдущих вычислений.

Фактически это стресс-тест внутренних накладных расходов.

В реальных системах почти всегда используется Warm-сценарий.

Почему Principium Warm быстрее

Посмотрим на сценарий с 99% дубликатов.

Обычный Parallel.ForEach:

682 ms

Principium Warm:

1 ms

Ускорение:

682x

Причина проста.

Вместо 10 000 вычислений происходит около 100.

Остальное возвращается из памяти.

Где это даёт максимальный эффект

На первый взгляд может показаться, что дубликаты в данных — редкая ситуация.

На практике большинство высоконагруженных систем постоянно работают с повторяющейся информацией.

AI и LLM

Самый очевидный пример последних лет.

Допустим, вы генерируете эмбеддинги для документов:

embeddingModel.GenerateEmbedding(text);

Документ уже индексировался вчера.

Потом сегодня.

Потом после редактирования базы знаний.

Потом после очередного деплоя.

Обычный пайплайн будет генерировать эмбеддинг заново.

Principium просто вернёт результат из кэша.

Если один вызов OpenAI стоит несколько сотен миллисекунд и деньги за токены, экономия получается не только по времени, но и по бюджету.

RAG-системы

Практически любой корпоративный RAG регулярно переиндексирует документы.

Например:

  • инструкции;

  • договоры;

  • техническую документацию;

  • базу знаний.

В реальных компаниях изменение одного документа часто приводит к повторному прохождению через пайплайн тысяч уже обработанных записей.

Дедупликация вычислений здесь даёт один из самых больших эффектов.

ETL и Data Engineering

Классическая ситуация:

Есть CSV на несколько миллионов строк.

Внутри постоянно встречаются одинаковые записи клиентов.

CustomerId = 123
CustomerId = 123
CustomerId = 123
CustomerId = 123

Нормализация адресов.

Очистка данных.

Проверка справочников.

Расчёт показателей.

Один и тот же результат вычисляется снова и снова.

Финансовые системы

Риск-модели, скоринг, антифрод.

В течение дня один и тот же клиент может появляться в десятках операций.

Без дедупликации вычисления повторяются многократно.

При дорогих расчётах это превращается в прямые затраты на инфраструктуру.

Логистика

Расчёт маршрутов.

Проверка складских остатков.

Расчёт тарифов доставки.

Очень часто одинаковые грузы или заказы обрабатываются многократно разными сервисами.

Кэширование вычислений позволяет снять существенную нагрузку с CPU.

Обработка изображений

Например:

DetectFaces(image);

или

GenerateThumbnail(image);

Если одно изображение встречается несколько раз, повторные вычисления не имеют смысла.

Видеоаналитика

Видеокадры часто обрабатываются пакетами.

При дедупликации кадров или схожих сегментов можно избежать большого количества вычислений моделей компьютерного зрения.

IoT и телеметрия

Датчики нередко отправляют одинаковые данные тысячи раз подряд.

Например:

Температура = 21.4
Температура = 21.4
Температура = 21.4
Температура = 21.4

Повторная обработка таких сообщений не несёт дополнительной ценности.

Очереди сообщений

Kafka.

RabbitMQ.

Azure Service Bus.

Практически в любой распределённой системе периодически возникают повторные события.

Особенно после ретраев и восстановления после сбоев.

Если обработчик тяжёлый, стоимость повторного выполнения быстро становится заметной.

Batch-задачи

Ночные пересчёты.

Отчёты.

Агрегации.

Подготовка витрин данных.

Именно здесь часто встречаются сценарии с дупликацией 80–99%, где Principium показывает максимальный выигрыш.

Интеграция в ASP.NET Core

Регистрация:

builder.Services.AddSingleton<MyProcessor>();

Использование:

public class MyProcessor
{    
  public Dictionary<int,string> Process(        
  IEnumerable<MyItem> items)    {        
  return Paralleling.ForEach(            
    items,            
    x => x.Id,            
    x => x.Payload,            
    HeavyWork,            
    adaptiveKey: "main-pipeline");    
}
}

Кэш будет использоваться между HTTP-запросами.

Когда НЕ стоит использовать Principium

Есть ситуации, где обычный Parallel.ForEach лучше.

Например:

  • дубликатов нет;

  • вычисления очень дешёвые;

  • результат никогда не повторяется;

  • данные одноразовые.

В этом случае накладные расходы анализа будут выше выгоды.

И это нормально.

Библиотека рассчитана именно на сценарии с повторяемостью данных.

Сравнение подходов

Подход

Повторное использование результатов

LWW

Кэш

Автоадаптация

Parallel.ForEach

Нет

Нет

Нет

Нет

Dictionary + Parallel

Частично

Да

Нет

Нет

MemoryCache

Да

Нет

Да

Нет

Principium.Parallel

Да

Да

Да

Да

Где взять

NuGet:

https://www.nuget.org/packages/Principium.Parallel

GitHub (бенчмарки):

https://github.com/likeslines-maker/Principium.Parallel

Библиотека полностью бесплатна для:

  • тестирования;

  • исследований;

  • обучения;

  • прототипирования;

  • разработки;

  • нагрузочного тестирования;

  • проверки концепций (PoC).

Никаких ограничений по времени, объёму данных или функциональности при тестировании нет.

Резюме

Principium.Parallel — это адаптивный движок обработки коллекций для .NET, который автоматически выбирает между параллелизмом, кэшированием и дедупликацией.

Он особенно полезен когда:

  • данные содержат много повторов;

  • вычисления дорогие;

  • важна Last-Write-Wins семантика;

  • нужно переиспользовать результаты между запусками.

В сценариях с высокой дупликацией выигрыш может достигать сотен раз относительно обычного Parallel.ForEach.

Если ваши пайплайны регулярно пересчитывают одинаковые данные — возможно, проблема уже не в скорости процессора, а в том, что процессор заставляют делать одну и ту же работу снова и снова.

Комментарии (19)


  1. ksbes
    30.05.2026 14:18

    ???

    Parallel.ForEach(items.Distinct(), item =>{ Process(item);});


    1. arhip1986 Автор
      30.05.2026 14:18

      Это убирает только полностью одинаковые элементы внутри одного прохода и не решает задачи Last-Write-Wins по ключу, кэширования результатов между вызовами, адаптивного выбора стратегии выполнения и повторного использования уже вычисленных значений. Поэтому для простых коллекций Distinct() действительно может быть достаточен, а Principium.Parallel ориентирован на сценарии, где дубликаты определяются по ключу, данные приходят батчами, а стоимость вычислений достаточно высока, чтобы выигрыш от коалесинга и кэша был существенным.

      Distinct() решает другую задачу — удаление одинаковых элементов в рамках одного перечисления. Principium.Parallel работает с произвольными объектами через keySelector, поддерживает Last-Write-Wins по ключу, кэширование между вызовами и адаптивно переключается между параллельным выполнением, коалесингом и кэшем в зависимости от уровня дубликатов. Поэтому это не альтернатива Distinct(), а решение для более широкого класса задач.


      1. a-tk
        30.05.2026 14:18

        То есть Вам надо различать, являются ли данные последними? Тогда это не удаление дубликатов. То есть совсем не та проблема, которая заявлена.


      1. a-tk
        30.05.2026 14:18

        Parallel.ForEach(items.GroupBy(x => x.Criterion).Select(x => x.Last()), Process);


        1. arhip1986 Автор
          30.05.2026 14:18

          Principium.Parallel экономит вычисления между многократными проходами и повторяющимися задачами


        1. arhip1986 Автор
          30.05.2026 14:18

          и иногда проблема не в наличии дубликатов во входном списке, а в перекрывающихся зависимостях между вычислениями, которые требуют коалесинга и повторного использования результатов на уровне графа, а не просто фильтрации входной коллекции.

          GroupBy(...).Last() убирает дубликаты входных задач, но не устраняет повторные вычисления пересекающихся зависимостей внутри самих задач, поэтому при графовых или stateful-вычислениях эффект отсутствует.


          1. a-tk
            30.05.2026 14:18

            То есть Вы утащили бизнес-логику в утилитарную библиотеку и почему-то решили, что боль, подобная Вашей, у всех?


            1. arhip1986 Автор
              30.05.2026 14:18

              в наших проектах давно используется и значительно ускоряет, решили поделиться, удобно и эффективно, а главное просто


    1. derpymarine
      30.05.2026 14:18

      Это проприетарная ллм-либа, рождённая за пару дней, без адекватной архитектуры, надзора и ревью ллм-ного кода.

      Там даже в простейшей документации косяки
      netstandard - очевидно не существует
      netstandard - очевидно не существует
      <Project Sdk="Microsoft.NET.Sdk">
      
        <PropertyGroup>
          <OutputType>Exe</OutputType>
          <TargetFramework>net6.0</TargetFramework>
          <RootNamespace>Test_lib_neuroslop</RootNamespace>
          <ImplicitUsings>enable</ImplicitUsings>
          <Nullable>enable</Nullable>
        </PropertyGroup>
      
      </Project>
      
      error: NU1202: Package Principium.Parallel 1.0.1 is not compatible with net6.0 (.NETCoreApp,Version=v6.0). Package Principium.Parallel 1.0.1 supports: net8.0 (.NETCoreApp,Version=v8.0)

      В либе обвязка под лицензию и чуток простеньких абстракций, которые зачем-то оборачивают самый стандартный дотнет.

      Фактически там просто обёртка над ConcurrentDictionary и Dictionary с смешными(с точки зрения производительности) штуками под капотом.


  1. a-tk
    30.05.2026 14:18

    Но проблема возникает тогда, когда внутри коллекции появляются дубликаты.

    Garbage in - garbage out.


    1. arhip1986 Автор
      30.05.2026 14:18

      Не всегда. Во многих реальных системах дубликаты — это не ошибка данных, а нормальная часть бизнес-процесса. Например, события телеметрии, обновления состояния сущностей, сообщения из очередей, ETL-пайплайны, CDC-потоки из БД, пересчёт эмбеддингов или агрегация логов. Там один и тот же ключ может встречаться десятки или сотни раз в одном батче, и задача как раз состоит в том, чтобы эффективно обработать такие данные. Principium.Parallel не исправляет "плохие данные", а оптимизирует типичный сценарий, когда повторения ключей являются ожидаемым свойством входного потока.


    1. MountainGoat
      30.05.2026 14:18

      И что предлагаете? Делать фильтрованную копию многогиговой базы, чтобы по ней прошвырнуться?


      1. arhip1986 Автор
        30.05.2026 14:18

        Нет, речь не про создание копии или предварительную “чистку” многогиговой базы. В большинстве таких сценариев данные уже приходят батчами или потоками, и повторения неизбежны из-за природы доставки (ретраи, at-least-once семантика, повторные события, обновления состояния). Поэтому задача не в том, чтобы заранее построить идеальный набор данных, а в том, чтобы при обработке не выполнять одинаковую тяжёлую работу повторно для одного и того же ключа. Principium.Parallel работает именно на этом уровне — он коалесит вычисления, кэширует результат и переиспользует его внутри и между проходами, не требуя предварительного копирования или фильтрации исходного массива.

        П.С. вроде не на свой вопрос ответ написал, но всё-равно по сути библиотеки, пусть остаётся


  1. a-tk
    30.05.2026 14:18

    В очереди постоянно встречаются повторяющиеся события:

    ...

    В большинстве случаев интересует только последнее состояние.

    Ну так сделайте нормальную очередь.


    1. arhip1986 Автор
      30.05.2026 14:18

      Очередь отвечает за доставку событий, а не за их семантическое слияние. Повторные события возникают из-за ретраев, at-least-once доставки, ребалансировки партиций, повторной публикации продьюсером, сетевых таймаутов и восстановления после сбоев — и это считается нормальной моделью работы Kafka/RabbitMQ/Azure Service Bus. Поэтому “последнее состояние” — это уже задача потребителя, а не очереди. Именно там и появляется необходимость в LWW-логике, коалесинге и кэшировании, которые и закрывает Principium.Parallel: он не заменяет очередь, а обрабатывает неизбежную реальность дубликатов в потоках данных.


      1. a-tk
        30.05.2026 14:18

        Когда у Вас в руках молоток, всё вокруг кажется гвоздями.


        1. arhip1986 Автор
          30.05.2026 14:18

          но когда вокруг одни гвозди, то что в руках - не кажется молотком:)


      1. alex-khv
        30.05.2026 14:18

        В очереди могут быть события, команды и внезапно, данные!

        Элементарная структура данных. Не обязательно все сводится к message broker


  1. Politura
    30.05.2026 14:18

    Ох, блин, прочитал заголовок, думал что-то новое и интересное, а тут нейрослоп пишет про редкий сценарий где кэш поможет. Зачем в заголовке было писать «ваш Parallel.ForEach впустую сжигает CPU»? Мой ниче не сжигает, я не идиот и там где очевидно что кэш поможет - использую его.