Привет, Хабр! В предыдущих статьях — «Rust без прикрас: где мы ошибаемся» и «Rust без прикрас: где мы продолжаем ошибаться» мы обсудили всякие неприятные грабли: бездумное использование unwrap(), игнорирование ошибок через let _ =, чрезмерное клонирование, проблемы с хвостовой рекурсией и прочие оплошности. Теперь пришло время взглянуть на другой пласт проблем. На этот раз поговорим о том, как уничтожить производительность вашего Rust-приложения, особенно если оно щеголяет асинхронностью.

Блокирующие операции в асинхронном коде

Первый способ угробить скорость — это делать блокирующие вызовы там, где ждёшь асинхронности. Знаю, звучит противоречиво. Но порой рука тянется вызвать что-нибудь из дефолтной библиотеки прямо внутри async fn. Например, прочитать файл через std::fs, подождать через std::thread::sleep или дернуть блокирующий HTTP-клиент. Компилятор-то пропустит, а вот исполнение потом расплачивается.

Что происходит, когда внутри асинхронной функции вы вызываете обычную, синхронную операцию? По сути, вы блокируете поток-исполнитель (именуемый executor thread), на котором запущена таска. Если runtime однопоточный, всё встает колом, никакие другие задачи не выполняются, пока этот блокирующий вызов не завершится. Если же runtime многопоточный, становится ненамного лучше. Блокируя один поток, вы лишаете планировщик части ресурсов. Задач много, потоков, скажем, восемь, а вы один из них усыпили на веки вечные, в итоге остальные задачи толкаются в оставшихся семи потоках.

Часто допускаемая ошибка в том, чтобы использовать std::thread::sleep вместо асинхронного таймера. Покажу на простом примере, имитирующем обработку запросов:

use tokio::task;
use tokio::time::{sleep, Duration};
use std::time::Instant;

async fn handle_request(id: usize) {
    println!("Задача {id} начала работу");
    // блокирующая пауза
    std::thread::sleep(Duration::from_secs(1));
    println!("Задача {id} завершилась");
}

#[tokio::main(flavor = "current_thread")]  // однопоточный runtime для наглядности
async fn main() {
    let start = Instant::now();
    // запустим несколько задач одновременно
    let mut handles = Vec::new();
    for i in 0..5 {
        handles.push(task::spawn(handle_request(i)));
    }
    // дождемся выполнения всех задач
    for handle in handles {
        let _ = handle.await;
    }
    println!("Общее время: {:?}", start.elapsed());
}

Если выполнить этот код, он будет выполняться примерно 5 секунд, хотя мы запускали пять задач одновременно. Все из-за того, что каждая таска заснула на секунду, удерживая поток. В однопоточном Tokio задачи вынуждены выполняться строго по очереди, пока первая спит, остальные просто ждут. Даже на многопоточном runtime результат вас не обрадует, да, задачи распределятся по нескольким потокам, и, скажем, на системе с 4 ядрами общее время сократится, но всё равно мы потратим ~1 секунду на поток впустую.

Решение: не блокируйте.

Вместо std::thread::sleep используем просто tokio::time::sleep().await это приостанавливает таску, освобождая поток для других работ. В нашем примере, стоило заменить один метод, и все пять задач реально выполнялись бы параллельно ~1 секунду общего времени, а не 5. Аналогично, вместо std::fs::File::read в асинхронном контексте берём tokio::fs::File::read. Практически на каждую блокирующую операцию в стандартной библиотеке есть асинхронный аналог либо у Tokio, либо в async-std, либо в виде отдельного крейта.

А как быть, если нужно выполнить тяжелённое вычисление (парсинг большого файла, хеширование, математика) внутри async-функции? Ведь это тоже блокирующая штука, просто на CPU. Вариантов два:

  • Если вычисление не критично по времени, можно разбить его на кусочки и между ними вызывать tokio::task::yield_now().await, позволяя планировщику переключиться на другие задачи.

  • Если же нужно развернуть полный CPU на всю катушку, а вокруг крутится ещё ворох тасков, лучше вынести работу в отдельный поток. Для этого в Tokio есть прикольный метод tokio::task::spawn_blocking. Он выполнит переданную замыкалку в отдельном пуле потоков, специально выделенных под такие блокирующие фокусы. Основный async-цикл при этом не тормозит.

Запустите у себя в проекте tokio-console. Он в рантайме показывает, какие таски долго не выпускают поток или блокируются. Вещь полезная, сразу видно, если вы где-то сунули блокирующий вызов.

Последовательные await вместо параллелизма

Rust — язык системный, и дух последовательного выполнения в нём заложен с рождения. Поэтому неудивительно, что мы часто пишем асинхронный код так, будто он синхронный. Второй способ упустить производительность — это вызывать await подряд один за другим, даже если операции могли бы выполняться параллельно.

Допустим, нужно сходить за данными по двум независимым адресам и затем их обработать. Новичок в Rust напишет что-то вроде:

async fn fetch_data(id: u32) -> String {
    // Имитация сетевого запроса задержкой
    tokio::time::sleep(Duration::from_millis(100)).await;
    format!("data-{id}")
}

async fn process_sequential() {
    let a = fetch_data(1).await;
    let b = fetch_data(2).await;
    println!("Получили данные: {}, {}", a, b);
}

Сначала полностью выполняется fetch_data(1), потом fetch_data(2). Общее время ~200 мс (плюс накладные расходы), потому что мы дождались первой операции, прежде чем начать вторую. А нужно ли ждать? В данном случае нет, запросы независимы, их можно делать одновременно.

Исправляем с помощью параллельного выполнения:

async fn process_parallel() {
    let fut_a = fetch_data(1);
    let fut_b = fetch_data(2);
    // запускаем оба запроса "вместе" и ждём обоих
    let (a, b) = tokio::join!(fut_a, fut_b);
    println!("Получили данные: {}, {}", a, b);
}

С помощью макроса tokio::join! конкурентно выполняем две async-функции. Итог — ~100 мс на оба запроса вместо 200 мс.

В более общем плане, если у вас есть несколько независимых задач, которые можно делать параллельно, делайте их параллельно. Воспользуйтесь join!, либо распустите Future вручную и потом соберите их методом .join() из FuturesExt, или используйте FuturesUnordered для динамического набора задач.

Кстати, распространённая ошибка думать, что ты запустил параллельно, а на самом деле нет. Например, создать несколько Future и тут же поочередно их .await сам по себе это всё та же последовательность. Чтобы точно запустить параллельно, либо явно используйте конструкции вроде join!/join_all, либо spawn задачи.

Но конечно же бывает, что задачи требуют строгого порядка или зависят друг от друга. В таком случае параллелизм может только усложнить код. Тут надо анализировать по ситуации. Главное быть осознанным, если в коде пошел каскад из .await друг за другом, остановитесь и подумайте, нельзя ли их распараллелить. Очень вероятно, что можно.

Чрезмерное создание задач

Асинхронные задачи в Rust лёгкие, как пушинки. Создать тысячу тасков не проблема. Но помните, что контекстные переключения и планирование задач тоже не бесплатны. Третий способ уронить производительность — это создавать непомерно много тасков, особенно для крохотных работ, которые быстрее выполнить последовательно.

Представьте, вы обрабатываете массив из 10000 чисел, и на каждое число запускаете отдельную асинхронную задачу, даже если вся работа умножить на два. Звучит абсурдно, но я видел подобный код.

«Ну async же, сейчас как параллельно всё посчитается!».

А по факту получается гигантский оверхед на управление задачами.

Возьмём пример, нам нужно обработать коллекцию чисел, каждый элемент умножив на 2 и сложив в результат. Покажу два варианта, неудачный и нормальный:

async fn bad_spawn() {
    let mut results = Vec::with_capacity(10000);
    for i in 0..10000 {
        // плохо: создаём задачу на каждый элемент
        let handle = tokio::spawn(async move { i * 2 });
        // сразу же ожидаем завершения (по сути, последовательно)
        let doubled = handle.await.unwrap();
        results.push(doubled);
    }
    println!("Обработано элементов: {}", results.len());
}

async fn good_old_way() {
    let mut results = Vec::with_capacity(10000);
    for i in 0..10000 {
        // просто считаем напрямую, без лишних тасков
        results.push(i * 2);
    }
    println!("Обработано элементов: {}", results.len());
}

В плохом варианте мы 10 тысяч раз вызываем tokio::spawn. Каждая такая операция выделяет новую задачу, ставит её в очередь планировщика, потом мы тут же дожидаемся её завершения. То есть асинхронности никакой, а лишняя возня колоссальная. Второй вариант просто бежит по циклу и считается напрямую, что, естественно, будет быстрее и потребует минимум памяти.

Принцип ясен: не нужно создавать таски без необходимости. Async-задача хороша, когда действительно можно чем-то параллельно загрузить другие ядра или ждать несколько операций одновременно. Если же у вас последовательная логика или очень лёгкая операция, то таски лишь добавят оверхед.

Кстати, если задача не обращается к .await внутри (например, чисто вычисление), то от async-функции вообще смысла нет, она сразу же выполнится, как обычная функция, только вы потратите ресурсы на ее обёртку в Future. В таком случае лучше сделать обычную функцию. Помните: async ≠ параллельно. Async больше про удобное неблокирующее ожидание, а параллельность достигается либо явным spawn, либо использованием нескольких потоков.

Неправильная синхронизация

Конкурентность — это не только про таски, но и про разделяемые ресурсы. Четвёртый способ угробить быстродействие — использовать неэффективную синхронизацию в асинхронном коде. Сюда попадают две распространённые проблемы:

1. Блокировки из синхронного мира. Например, применение std::sync::Mutex внутри async-кода. Сам по себе Mutex полезный примитив, но обычный std::Mutex при захвате блокирует поток. Если вы сделаете что-то такое:

use std::sync::{Arc, Mutex};
let shared_counter = Arc::new(Mutex::new(0));
for _ in 0..100 {
    let counter = shared_counter.clone();
    tokio::spawn(async move {
        // потенциальная проблема: блокирующий мьютекс в таске
        let mut guard = counter.lock().unwrap();
        *guard += 1;
        // ... возможно, ещё какая-то работа ...
    });
}

то можете сами себе вырыть яму. Тут каждое увеличение счётчика захватывает Mutex с блокировкой потока. Tokio приостановить таску не сможет, пока lock() не отпустит. Представьте, две таски на одном потоке доходят до counter.lock(), первая захватывает мьютекс и, скажем, засыпает внутри (или долго что-то делает), вторая таска тоже вызывает lock() и блокирует поток целиком, хотя должна была бы подождать асинхронно. В итоге весь runtime может зависнуть, как минимум до тех пор, пока первая таска не отпустит Mutex. Это почти гарантированный дедлок или, как минимум, полное выпиливание конкурентности.

Даже если дедлок удастся избежать (например, таски бегут на разных потоках, и мьютекс просто гоняет туда-сюда), мы всё равно теряем в производительности. 100 тасков в примере выше будет выполняться фактически последовательно, по очереди захватывая глобальный замок. Зачем тогда вообще было городить async?

Никогда не держите std::Mutex в таске во время .await. Если очень хочется защитить данные, используйте асинхронные примитивы, например, tokio::sync::Mutex. Он работает по-другому. При попытке захвата он не блокирует поток, а при необходимости паркует таску, позволив другим выполняться. Перезапуск таски произойдёт, когда мьютекс освободится. Код перепишем:

use tokio::sync::Mutex;
let shared_counter = Arc::new(Mutex::new(0));  // асинхронный Mutex
for _ in 0..100 {
    let counter = shared_counter.clone();
    tokio::spawn(async move {
        let mut guard = counter.lock().await;  // .await, а не .lock().unwrap()
        *guard += 1;
    });
}

Теперь при конкуренции за Mutex лишние таски просто подождут своей очереди без блокировки потоков. Правда, тут есть нюанс, они всё равно выполняются по очереди, мьютекс же один. Поэтому, если много тасков бьются за один замок, производительность резко падает, мы опять получаем последовательное выполнение, только уже не с дедлоком, а по-честному в очереди.

Минимизируйте область захвата мьютекса. Брать lock().await стоит непосредственно перед доступом к разделяемым данным и сразу отпускать. Не делайте больших вычислений под замком. Если ваша таска после lock().await ещё ходит в сеть или долго чего-то считает, это беда, вы зря держите ресурс.

В таких случаях лучше перестроить логику. Например, скопировать нужные данные, отпустить мьютекс и дальше работать локально. Или использовать более тонкие механизмы, Atomic переменные для счётчиков, RwLock для данных с преобладающим чтением, шардирование данных на несколько мьютексов, чтобы уменьшить конкуренцию, вариантов много. Главное, не превращать асинхронный код в толпу, строящуюся в одну шеренгу. И не забывайте: std::sync::Mutex внутри async, это почти всегда запах проблемы. Берите tokio::sync::Mutex или родственные ему async-аналоги, если очень надо блокировать доступ.

2. Чересчур усердная коммуникация через каналы. Асинхронные каналы (tokio::sync::mpsc и другие) — удобная штучка для организации очередей, конвейеров обработки, межзадачного взаимодействия. Но и тут можно наломать дров. Если вы строите сложный конвейер, где данные проходят через 5-6 очередей, перекочёвывая из одной таски в другую, знайте, что каждая пересылка через канал — это по сути тоже синхронизация и переключение контекста. Сообщение упаковали, отправили, получатель проснулся, распаковал — всё это затраты. Если каналов много и они забиты, runtime тратит кучу сил на распределение сообщений, на просыпание/усыпание тасков, на аллокации под буферы и т.д.

Проблема особенно проявляется, когда каналы не имеют ограничений, об этом дальше отдельно. Но даже с bounded-каналами обилие ступеней в обработке может замедлить pipeline. Старайтесь не городить бесконечных каскадов из тасков, где каждый только то и делает, что пересылает данные дальше. Иногда лучше собрать логику в одной таске с обычными функциями. Лишняя модульность в ущерб скорости не всегда оправдана. Профилируйте такие конвейеры, возможно, обнаружите, что узким местом стала именно промежуточная пересылка. Тогда стоит упростить схему.

В целом, синхронизация это ахиллесова пята асинхронных программ. От неё никуда не деться, но помните о цене. Rust даёт очень тонкий контроль, но не отвлекайте его по пустякам, не блокируйте потоки зря, не ставьте десять очередей, если можно обойтись двумя, и будет вам счастье.

Отсутствие backpressure

Пятый способ похоронить производительность — это игнорировать backpressure. Проще говоря, пустить процесс на самотёк, производить новые задачи или сообщения быстрее, чем система способна их обработать.

Приведу утрированный пример. Допустим, у нас есть бесконечный поток данных, и мы на каждое входящее сообщение создаём таску для обработки. Если входящих событий больше, чем мы можем реально обработать параллельно, задачи будут накапливаться. Планировщик начнёт разрываться, активных тасков тысячи, у каждой своя кучка, своя стековая область, свои структуры. Всё это занимает память и CPU. Новые таски плодятся быстрее, чем завершаются старые. Программа сначала будет тормозить из-за перегрева планировщика, а потом может и до Out-Of-Memory добраться. Производительность упадёт практически до нуля, зато потребление ресурсов до небес.

Аналогичная история с неограниченными каналами. Тот же Tokio имеет mpsc::unbounded_channel(), который никогда не скажет «стоп»: можете слать сообщения сколько угодно, он всё скушает, расширяя буфер. Если получатель не успевает обрабатывать, буфер растёт беспредельно. В какой-то момент вы заметите, что приложение стало жрать гигабайты памяти, хотя данных вроде тьфу. Это наши невинные миллионы сообщений скопились в очереди, дожидаясь обработки. Толку от такой буферизации мало, задержка растёт, CPU тратится на копирование сообщений, кэш проседает. Одним словом, перегрузка.

Код, иллюстрирующий отсутствие backpressure при работе с каналом:

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // неограниченный канал
    let (tx, mut rx) = mpsc::unbounded_channel::<u32>();

    // получатель: медленно обрабатывает сообщения
    tokio::spawn(async move {
        let mut count = 0;
        while let Some(val) = rx.recv().await {
            count += val;
            // эмулируем задержку обработки
            sleep(Duration::from_millis(10)).await;
        }
        println!("Получено суммарно: {}", count);
    });

    // отправитель: быстро шлёт много сообщений
    for i in 0..100_000 {
        if tx.send(1).is_err() {
            break;
        }
    }
    println!("Отправлено 100000 сообщений");
}

Отправитель мгновенно отправляет 100к сообщений в канал, а получатель обрабатывает по одному раз в 10 мс. Естественно, получатель не поспевает. Канал unbounded всё стерпит, он просто накопит почти все 100k сообщений в памяти. Пиковое использование памяти вырастет, задержка между отправкой и обработкой последнего сообщения будет почти целую секунду. Если увеличить порядок нагрузки, можно спокойно положить программу на колени.

Нужно внедрять механизмы обратного давления. В случае каналов, использовать bounded-каналы. Например, mpsc::channel(100) ограничит очередь 100 сообщениями. Если попытаться отправить 101-е, операция .send().await приостановит таску-отправителя, пока в канале не появится место. Быстрый продюсер притормаживается под возможности консюмера, и система остаётся в более стабильном состоянии. Да, это может снижать пиковую производительность (отправитель будет ждать), зато защищает от неконтролируемого роста очередей.

С задачами похожая история, если к вам валится 10000 запросов в секунду, не пытайтесь одновременно запустить 10000 тасков обработки, если заведомо столько ядер нет и каждая таска немгновенная. Лучше лимитировать параллелизм. Для этого часто используют семафоры или ограниченные пулы. В том же Tokio можно завести, например, Semaphore на определённое количество слотов и перед запуском задачи пытаться захватить слот. Если слотов нет, ждём, пока какой-то таск не освободит. Или воспользоваться утилитами вроде tokio::spawn + JoinSet, где можно вручную контролировать, сколько задач добавить, прежде чем ждать завершения старых. В futures есть интересный метод buffer_unordered(n), который преобразует стрим задач в стрим результатов, исполняя не более n задач одновременно.

В целом, разумное ограничение — залог стабильной производительности. Без ограничений любой асинхронный сервис можно положить, просто закинув работы больше, чем он может выполнить.


Всех этих проблем можно и нужно избежать. Достаточно помнить об основных принципах:

  1. Не блокируй поток, а жди асинхронно;

  2. Используй параллелизм, когда это уместно;

  3. Не размножай таски понапрасну;

  4. Держи общие ресурсы под контролем, а нагрузку в разумных пределах.

Простые правила, но их соблюдение напрямую влияет на ваши TPS, latency и вообще на вашу нервную систему.

И, конечно, учитесь на чужих ошибках. Я своими тут поделился, а если у вас есть свои истории о том, как вы замедлили Rust-приложение и потом героически это чинили, велком в комментарии. Обсудим, поспорим, а может, в следующей статье разберём самые интересные случаи. Спасибо за внимание и быстрых вам программ!

Комментарии (0)