Самая распространённая ошибка в асинхронном Rust — убеждение, что .await означает «подожди, пока операция завершится». Слово в переводе буквально это и значит, поэтому многие расставляют .await после каждого асинхронного вызова, как точки в конце предложений.
А потом оказывается, что сервер обрабатывает один запрос вместо сотни одновременно, мьютекс намертво виснет с одним-единственным вызовом, отмена в select! теряет половину сообщений, и синхронная версия того же кода работает быстрее. Корень всех этих проблем один: .await не означает «жди». Он означает «дай исполнителю право приостановить меня здесь». И пока вы держите в голове первое значение, асинхронный Rust будет вас наказывать.
В статье рассмотрим что компилятор делает с async fn, зачем нужен Pin, как Tokio решает какую задачу опросить следующей, почему std::sync::Mutex в асинхронном коде иногда срабатывает как мина, и почему даже tokio::sync::Mutex может зависнуть.
.await — это контракт с исполнителем
Когда вы пишете async fn foo(), компилятор создаёт тип, реализующий Future — машину состояний, которая знает, как приостанавливаться и возобновляться. Каждый .await становится точкой, где машина может сказать рантайму: «У меня сейчас нет работы, возвращаю Poll::Pending, разбуди меня когда что-то изменится».
Между точками .await код выполняется синхронно и без вытеснения. В Rust не существует preempt-планирования для асинхронных задач. Если вы написали цикл на миллион итераций без единого .await, никакая другая задача в этом потоке не получит управление, пока цикл не закончится. Планировщик передаёт управление только в точках .await и нигде больше.
Поэтому .await — это контракт: «Вот здесь я разрешаю себя приостановить, а в других местах — нет». Запись let result = some_async_fn().await не означает «вызови и дождись». Она означает «создай Future, опрашивай её, возможно несколько раз приостанавливая задачу, пока не вернёт Ready». Между опросами может произойти что угодно, включая то, что задачу забудут и больше никогда не разбудят.
Как async fn превращается в машину состояний
Возьмём простую функцию:
async fn fetch_two(client: &Client) -> (Data, Data) { let a = client.get("/a").await; let b = client.get("/b").await; (a, b) }
Компилятор превращает её примерно в такое:
enum FetchTwoState<'a> { Start { client: &'a Client }, AwaitingA { client: &'a Client, future_a: GetFuture<'a> }, AwaitingB { a: Data, future_b: GetFuture<'a> }, Done, } impl<'a> Future for FetchTwoFuture<'a> { type Output = (Data, Data); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { loop { match self.state { Start { client } => { let future_a = client.get("/a"); self.state = AwaitingA { client, future_a }; } AwaitingA { client, ref mut future_a } => { match Pin::new(future_a).poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(a) => { let future_b = client.get("/b"); self.state = AwaitingB { a, future_b }; } } } AwaitingB { a, ref mut future_b } => { match Pin::new(future_b).poll(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(b) => { self.state = Done; return Poll::Ready((a, b)); } } } Done => panic!("polled after completion"), } } } }
Реальный компилятор делает хитрее, с MIR-оптимизациями, объединением похожих вариантов, — но суть та же.
Future хранит всё своё состояние внутри себя.
Все локальные переменные, которые «живут» через точку .await, становятся полями enum-варианта. Включая ссылки на другие части той же структуры — future_a может содержать ссылку на client, который тоже хранится в нашей же Future.
Наша Future самореференциальна, тут мы попадаем в проблему, которую Rust решает через Pin.
Pin и проблема перемещения
В обычном Rust значения можно перемещать как угодно: let y = x; копирует биты x в y, и если внутри были указатели в кучу — они продолжают работать. С самореференциальными структурами этот фокус не проходит.
Представьте Future, хранящую внутри ссылку на саму себя. Перемещаем её в другое место и внутренний указатель продолжает смотреть на старый адрес, где теперь мусор. Use-after-free, неопределённое поведение, segfault.
Поэтому существует Pin. Это обёртка, говорящая: «Пока эта Future внутри Pin, её нельзя переместить». Адрес зафиксирован, cигнатура Future::poll принимает Pin<&mut Self> ровно по этой причине.
В обычном асинхронном коде компилятор сам обрабатывает Pin, и вы ничего не замечаете. Но как только начинаете работать с Future вручную (реализовывать трейт, использовать tokio::pin!) Pin начинает вылезать в сигнатурах и ошибках компиляции.
Закреплять Future можно двумя способами:
на куче через
Box::pin(fut)на стеке через
std::pin::pin!(fut).
Стековая версия дешевле и стабилизировалась в Rust 1.68. Каждый раз, когда видите Pin<Box<...>> в своём коде, имеет смысл задуматься, не получится ли обойтись pin! со стека.
Wakers: как исполнитель узнаёт, что задачу пора будить
Наша Future вернула Pending и спит. Как исполнитель узнаёт, что её пора снова опросить?
Через Waker — объект, который Future получает на каждом вызове poll и который обязана зарегистрировать у источника событий, прежде чем вернуть Pending. Когда событие случается, кто-то вызывает waker.wake(), и исполнитель помечает задачу как готовую.
Контракт строгий: если Future вернула Pending, не зарегистрировав Waker — она никогда не проснётся. Исполнитель опрашивает только те задачи, чьи Waker были вызваны. Спящая задача, чей Waker никто не разбудит — это task leak: память занята, но работа не идёт.
Из этого следует одно свойство: Future в Rust ленивы. До первого poll они не делают ничего. Создание Future — это просто конструирование структуры, без побочных эффектов:
let fut = client.get("/api/data"); // запрос НЕ отправлен let result = fut.await; // вот теперь отправлен
Это отличается от того же JS, где Promise запускается сразу при создании.
Как Tokio решает, кого опросить дальше
По умолчанию Tokio многопоточный и использует алгоритм work-stealing. Worker-потоков столько, сколько ядер у процессора. У каждого своя локальная очередь (FIFO, ёмкость 256 задач) и специальный LIFO-слот для самой свежеразбуженной задачи. Плюс одна глобальная очередь на всех и отдельный пул до 512 потоков для spawn_blocking.
Когда worker ищет следующую задачу: сначала проверяет LIFO-слот, потом локальную очередь. Каждые 61 итерацию заглядывает в глобальную очередь, чтобы задачи оттуда не голодали.
Если локальная пуста — пытается украсть половину задач из локальной очереди случайного соседа. Если красть негде — паркуется.
LIFO-слот существует ради message-passing: задача A отправила сообщение задаче B, B должна обработать его немедленно, пока данные в кеше процессора. Но если две задачи постоянно перебрасывают сообщения друг другу, они могут монополизировать слот и заморозить остальные задачи на этом worker-е. Tokio защищается: после трёх последовательных использований LIFO-слот временно отключается.
В release-сборке Tokio автоматически боксирует Future, если её размер превышает 16 КБ (в debug 2 КБ). Гигантская Future с десятками точек .await и большими локальными переменными неявно аллоцирует на куче при каждом spawn. На высокой нагрузке это видно в профиле, и решение обычно в том, чтобы разбить крупную функцию на несколько мелких.
Cooperative budget
У каждой задачи в Tokio есть operational budget — счётчик, инициализируемый числом 128 при каждом опросе верхнеуровневой задачи. Каждая успешная операция с Tokio-ресурсом (канал, мьютекс, сокет) уменьшает budget на единицу. Когда доходит до нуля, ресурсы начинают возвращать Pending, даже если данные доступны. Это вынуждает задачу yield-ить.
Зачем это нужно:
async fn drain(rx: &mut mpsc::Receiver<Msg>) { while let Some(msg) = rx.recv().await { process(msg); } }
Если канал всегда полон, а process быстрая — recv().await всегда возвращает Ready сразу. Без cooperative budget эта задача никогда не отдала бы управление, заняв worker на неопределённый срок. С ним после 128 успешных операций recv возвращает Pending, задача yield-ит, budget сбрасывается.
Если вы используете не-Tokio примитивы (свои каналы, свои Future), они не уменьшают budget и могут реально заморозить worker. В таких случаях вставляйте tokio::task::yield_now().await каждые N итераций.
Мьютекс через .await
Самая распространённая ошибка:
use std::sync::Mutex; async fn handle(state: Arc<Mutex<State>>) { let mut guard = state.lock().unwrap(); guard.update(); do_io().await; // тут начинаются проблемы guard.finalize(); }
Между lock() и finalize() есть .await, а значит guard переживает точку приостановки и попадает в state machine как поле.
std::sync::Mutex использует platform-native примитив: на Linux это futex, на Windows — SRWLock. При contention он паркует поток на уровне ядра. Задача спит на .await, продолжая держать guard. Worker забирает другую задачу.
Если эта другая пытается взять тот же мьютекс — futex паркует поток. Поток теперь не может взять следующую задачу, не может проснуться, ничего не может. Он спит до тех пор, пока кто-то не отпустит лок — а отпустить может только заснувшая задача, до которой никто не доберётся.
На однопоточном рантайме это мгновенный deadlock. На многопоточном Tokio — вероятностный: под низкой нагрузкой работает годами, под пиковой все worker-ы одновременно подцепляют задачи, ждущие этот лок, все паркуются, сервис мёртв ровно тогда, когда нагрузка максимальная.
Если MutexGuard от std::sync оказывается не-Send, компилятор выдаст ошибку при tokio::spawn. Но MutexGuard от parking_lot всегда Send, и компилятор молча пропускает потенциальный deadlock.
Самое простое решение — не держать guard через .await:
async fn handle(state: Arc<Mutex<State>>) { { let mut guard = state.lock().unwrap(); guard.update(); } // ← guard дропается здесь do_io().await; { let mut guard = state.lock().unwrap(); guard.finalize(); } }
Ещё лучше обернуть работу с локом в синхронный метод. В синхронной функции нет точек .await, и guard физически не может пережить то, чего не существует.
Когда же нужен tokio::sync::Mutex?
Согласно официальной документации — когда лок действительно обязан удерживаться через .await. На практике редко. tokio::sync::Mutex устроен иначе: его lock() асинхронный, при contention возвращает Pending и регистрирует Waker. Цена — каждое взятие проходит через task-aware queue с FIFO-семантикой, что медленнее std::sync::Mutex без contention. Tokio docs прямо рекомендуют использовать std::sync::Mutex, если контендится редко и удерживается недолго.
Даже tokio::Mutex может зависнуть
Минимальный сценарий:
let mutex = Arc::new(std::sync::Mutex::new(())); let async_task = tokio::spawn({ let mutex = mutex.clone(); async move { loop { tokio::time::sleep(Duration::from_millis(100)).await; let guard = mutex.lock().unwrap(); drop(guard); } } }); let blocking_task = tokio::task::spawn_blocking({ let mutex = mutex.clone(); move || loop { let guard = mutex.lock().unwrap(); tokio::runtime::Handle::current().block_on(sleepy_task()); drop(guard); } });
Один мьютекс, две задачи, никаких циклов в графе блокировок — и через несколько итераций программа намертво замирает.
spawn_blocking берёт мьютекс и вызывает block_on(sleepy_task()). Эта задача регистрирует Waker в timer wheel. Через 100 мс таймер истекает, I/O-driver вызывает wake(). Tokio переиспользует слот для Waker: если задача уже разбужена, повторный wake() ничего не делает. Под определёнными гонками Waker «теряется», он был вызван, но задача не была перепланирована.
Это не баг Tokio, а нарушение контракта Waker в комбинации с block_on внутри spawn_blocking.
Мораль: block_on внутри spawn_blocking — конструкция, которой стоит избегать. Message passing через канал гораздо безопаснее любых мьютексов в асинхронном коде. Дайте состоянию одного владельца — отдельную задачу — и общайтесь с ней через mpsc.
Два .await подряд работают последовательно
async fn fetch_user_data(id: u64) -> UserData { let profile = fetch_profile(id).await; // 100 мс let orders = fetch_orders(id).await; // 100 мс let reviews = fetch_reviews(id).await; // 100 мс UserData { profile, orders, reviews } }
Триста миллисекунд, а не сто. Каждый .await последовательно приостанавливает задачу до завершения предыдущего вызова. Это самая частая причина нытья мол «асинхронный код такой же медленный, как синхронный».
Для конкурентного выполнения нужен join!:
async fn fetch_user_data(id: u64) -> UserData { let (profile, orders, reviews) = tokio::join!( fetch_profile(id), fetch_orders(id), fetch_reviews(id), ); UserData { profile, orders, reviews } }
join! запускает Future в той же задаче, на одном потоке, переключаясь в точках .await.
Для I/O — супер. Для CPU-bound работы — никакого смысла.
Для настоящей параллельности на нескольких ядрах нужен tokio::spawn для каждой ветки, но у spawn своя цена (аллокация, регистрация в планировщике), и для коротких операций join! может оказаться быстрее.
Правило простое: два .await подряд для независимых операций — это место для join!.
tokio::spawn(...).await: параллельность, которая не случилась
async fn process_batch(items: Vec<Item>) { for item in items { tokio::spawn(process_item(item)).await.unwrap(); } }
Логика автора: «spawn создаёт задачу, она работает параллельно». Технически да,но .await сразу же блокирует текущую задачу до завершения этой spawn-нутой. Снова последовательно, только с лишней церемонией.
Правильный вариант — сначала запустить всё, потом ждать:
let mut set = JoinSet::new(); for item in items { set.spawn(process_item(item)); } while let Some(res) = set.join_next().await { res.unwrap(); }
Самая злая разновидность — accept-loop TCP-сервера:
async fn server(listener: TcpListener) { loop { let (conn, _) = listener.accept().await.unwrap(); process_client(conn).await; // ← остальные клиенты ждут } }
Один клиент захватывает весь сервер. Новые соединения висят в очереди ОС и в какой-то момент отбрасываются. В логах — connection refused, latency растёт, cores idle. Правильно — заспавнить обработку каждого клиента без .await:
async fn server(listener: TcpListener) { loop { let (conn, _) = listener.accept().await.unwrap(); tokio::spawn(process_client(conn)); } }
Cancellation safety
Асинхронную Future в Rust можно дропнуть в любой момент — просто перестать опрашивать. Это происходит при использовании select!, timeout или при дропе JoinHandle. Задача остановится на точке .await, локальные переменные дропнутся, деструкторы вызовутся. Но работа может остаться наполовину сделанной.
Классический пример — read_line:
async fn read_lines(mut reader: BufReader<TcpStream>) { let mut buf = String::new(); loop { tokio::select! { n = reader.read_line(&mut buf) => { println!("got: {}", buf); buf.clear(); } _ = shutdown_signal() => break, } } }
read_line не cancel-safe. Внутри он накапливает байты во временном буфере и пишет в buf только когда наберётся целая строка. Если select! отменит Future в середине чтения — половина строки потеряется внутри дропаемой Future.
Tokio помечает методы как cancel-safe или нет. Часто используемые: read_line — нет, AsyncReadExt::read — да, mpsc::Receiver::recv — да, mpsc::Sender::send — нет, write_all — нет, tokio::time::sleep — да.
mpsc::Sender::send особенно плох, если отменить между «зарезервировал место» и «отправил значение» — место занято, а сообщения нет. Решение видится в паттерне через permit:
// Вместо send, который не cancel-safe: sender.send(value).await?; // Используем reserve + send (cancel-safe): let permit = sender.reserve().await?; // cancel-safe permit.send(value); // синхронно, отменить нельзя
reserve() либо зарезервировал место (и держит permit), либо нет. Если permit дропается — место освобождается. Сама отправка через permit.send() синхронная.
Читайте документацию на предмет cancel-safety. Если Future не cancel-safe — оборачивайте в tokio::spawn. Spawn задача не отменяется при дропе JoinHandle, нужно явно вызывать .abort(). Для критичных операций используйте CancellationToken из tokio_util — задача сама проверяет токен в безопасных точках и решает, как корректно завершиться.
И последнее: в Rust нет async drop. Деструкторы могут содержать только синхронный код. Если вам нужна асинхронная чистка при отмене — делайте её явно до дропа, а не надейтесь на Drop.
Блокирующие вызовы
Любой блокирующий вызов в асинхронной функции замораживает worker-поток на всё время блокировки. На многопоточном Tokio один заблокированный поток не катастрофа. Заблокировали все — катастрофа. На однопоточном Tokio достаточно одного.
Главные подозреваемые:
async fn bad() { std::thread::sleep(Duration::from_secs(1)); // tokio::time::sleep(...).await let data = std::fs::read("file.txt").unwrap(); // tokio::fs::read(...).await let resp = reqwest::blocking::get("...").unwrap(); // reqwest::get(...).await expensive_cpu_work(); // если >100мкс, проблема }
Первые три фиксятся прямой заменой на асинхронный аналог. CPU-bound работа — другое дело. Async не предназначен для тяжёлых вычислений. Для редких блокирующих операций есть tokio::task::spawn_blocking:
let result = tokio::task::spawn_blocking(|| { expensive_cpu_work() }).await.unwrap();
Он отправляет задачу в отдельный пул (до 512 потоков по умолчанию). Worker-потоки остаются свободными. Но spawn_blocking создаёт OS-thread, что дороже асинхронной задачи. Для постоянной CPU-нагрузки лучше rayon с work-stealing на фиксированном пуле, а результаты возвращать через tokio::sync::oneshot:
async fn process(data: Vec<u8>) -> Vec<u8> { let (tx, rx) = tokio::sync::oneshot::channel(); rayon::spawn(move || { let result = heavy_compute(data); let _ = tx.send(result); }); rx.await.unwrap() }
Обнаружить блокировки помогает tokio-console — показывает время, проведённое каждой задачей в poll. Секунды в poll = заблокированный worker.
Async-трейты
В Rust 1.75 async fn в трейтах стабилизировался нативно:
trait Storage { async fn get(&self, key: &str) -> Option<Vec<u8>>; }
Никакого Box, производительность как у мономорфизированной функции. Н
о возвращаемая Future — она Send или нет?
Зависит от содержимого, а компилятор не может это знать на уровне сигнатуры трейта. Если пытаетесь сделать dyn Storage и заспавнить вызов — получаете ошибку про Send-bound.
Если trait object не нужен — используйте дженерик fn handle<S: Storage>(s: S). Мономорфизация выведет Send-bound автоматически.
Если trait object нужен — используйте trait_variant от Tokio:
#[trait_variant::make(Send)] trait Storage { async fn get(&self, key: &str) -> Option<Vec<u8>>; }
Макрос генерирует две версии: Storage (без Send) и SendStorage (с Send). Старый async_trait в 2026 году в новом коде почти не нужен.
Когда async вообще не тот инструмент
Async окупается при множестве конкурентных I/O-операций: сетевой сервер с тысячами соединений, краулер, прокси, база данных. Когда конкурентность сама по себе требование: одновременно слушать ввод и таймер. Когда готовы платить сложностью за throughput.
Не окупается, когда процесс делает один поток работы (CLI-утилита), когда работа CPU-bound (лучше rayon), когда нужен простой скрипт с одним подключением к БД (rusqlite справится).
Один тест, который я применяю: если убрать async/.await из всего кода, программа будет вести себя так же? Если да — async не нужен.
Как смотреть на чужой асинхронный код
Видите std::sync::Mutex рядом с .await в одной функции — потенциальный deadlock, проверьте пересечение guard с точкой приостановки.
Несколько .await подряд для независимых операций — упущенная конкурентность, должен быть join!.
tokio::spawn(...).await сразу — упущенная параллельность.
accept().await с последующим .await в том же цикле — accept-loop без параллелизма.
select! с не-cancel-safe операциями — потенциальная потеря данных.
std::thread::sleep, std::fs, std::net в async — блокировка worker-а.
block_on внутри spawn_blocking — потенциальный deadlock.
Длинные циклы без .await — заморозка worker-а до cooperative budget.
Arc<Mutex<...>> везде — часто признак того, что нужен actor-паттерн через mpsc.
Что в итоге
Async Rust работает по правилам, которые отличаются от синхронного кода, и эти правила нужно знать, а не угадывать. .await — это не «жди», это «отдай управление». Mutex через .await — это не синхронизация, это потенциальный deadlock. select! — это не параллельный if, это место, где Future могут быть отменены посередине работы.
Самый продуктивный подход, который я наблюдаю у опытных ребят — писать как можно меньше .await. Каждый раз, когда добавляете точку приостановки, спросите себя: зачем я здесь даю исполнителю право меня приостановить? Если ответа нет, кроме «ну, потому что функция асинхронная», это писательство async ради async.
В большинстве проектов хороший асинхронный код выглядит почти как синхронный — просто с .await в правильных, продуманных местах, где это реально нужно. Async Rust наказывает за бездумное использование, но щедро вознаграждает за понимание.
Спасибо за прочтение.
Размещайте облачную инфраструктуру и масштабируйте сервисы с надежным облачным провайдером Beget.
Эксклюзивно для читателей Хабра мы даем бонус 10% при первом пополнении.

mayorovp
То, что вы описали, соверешенно точно является багом Tokio, только вот я не могу найти подобного бага на гитхабе. А ещё я не могу найти в приведённом вами коде заявленного использования tokio::Mutex.
Звучит как баг, и совершенно непонятно что мешало разработчикам tokio сделать его безопасным. Но на самом деле проблемы с занятым местом нет, она совсем в другом -
mpsc::Sender::sendпринимает владение сообщением, и при отмене это сообщение дропается. Если дроп сообщения устраивает - нет никаких причин использовать более сложный подход с permit.