Эндрю Келли, создатель и ведущий разработчик языка программирования Zig, недавно рассказал о будущем асинхронного I/O в Zig, его ключевых примитивах, механизмах отмены и тонкости разграничения асинхронности и параллелизма. Он пригласил заинтересованных разработчиков к активному тестированию и формированию будущего интерфейса ввода-вывода Zig.
Это предварительный обзор новых примитивов асинхронного ввода-вывода, которые будут доступны в грядущем Zig 0.16.0, релиз которого ожидается примерно через три-четыре месяца. Есть еще много чего обсудить, но пока это вводная часть к основному API синхронизации, который будет доступен для использования во всем коде Zig.
Для начала, давайте попробуем сохранить простоту и понять основы, а затем постепенно будем добавлять в код все больше асинхронных элементов.
Пример 0
В нашем первом примере нет ничего асинхронного. Это, по сути, "Hello, World!" на Zig.
const std = @import("std");
pub fn main() !void {
    doWork();
}
fn doWork() void {
    std.debug.print("working\n", .{});
    var timespec: std.posix.timespec = .{ .sec = 1, .nsec = 0 };
    _ = std.posix.system.nanosleep(×pec, ×pec);
}Вывод:
0s $ zig run example0.zig
0s working
1s $Пример 1
Далее мы немного подготовимся. Пока еще не используем async/await, но мне нужны кое-какие инструменты, прежде чем мы добавим сложности.
const std = @import("std");
const Io = std.Io;
const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
fn juicyMain(gpa: Allocator, io: Io) !void {
    _ = gpa;
    doWork(io);
}
fn doWork(io: Io) void {
    std.debug.print("working\n", .{});
    io.sleep(.fromSeconds(1), .awake) catch {};
}
pub fn main() !void {
    // Настройка аллокатора.
    var debug_allocator: std.heap.DebugAllocator(.{}) = .init;
    defer assert(debug_allocator.deinit() == .ok);
    const gpa = debug_allocator.allocator();
    // Настройка нашей реализации ввода-вывода.
    var threaded: std.Io.Threaded = .init(gpa);
    defer threaded.deinit();
    const io = threaded.io();
    return juicyMain(gpa, io);
}Вывод (такой же, как и раньше):
0s $ zig run example0.zig
0s working
1s $Настройка реализации std.Io очень похожа на настройку аллокатора. Обычно вы делаете это один раз, в main(), а затем передаете экземпляр по всему приложению. Переиспользуемый код должен принимать параметр Allocator, если ему нужно выделять память, и должен принимать параметр Io, если ему нужно выполнять операции ввода-вывода.
В данном случае, это реализация Io, основанная на потоках. Она не использует KQueue, не использует IO_Uring, не использует цикл событий. Это многопоточная реализация нового интерфейса std.Io.
Эта настройка будет одинаковой во всех примерах, поэтому теперь мы можем сосредоточиться на нашем коде примера, который такой же, как и в прошлый раз. Все еще ничего интересного - мы просто вызываем doWork, который, конечно же, просто вызывает sleep().
Пример 2
Избыточный (повторяющийся) код настройки здесь и далее опускается.
fn juicyMain(gpa: Allocator, io: Io) !void {
    _ = gpa;
    var future = io.async(doWork, .{io});
    future.await(io); // идемпотентно
}
fn doWork(io: Io) void {
    std.debug.print("working\n", .{});
    io.sleep(.fromSeconds(1), .awake) catch {};
}Вывод (такой же, как и раньше):
0s $ zig run example0.zig
0s working
1s $Теперь мы используем async/await для вызова doWork. Для Zig async/await означает разделение вызова функции и возврата из функции.
Этот код такой же, как и раньше. Он абсолютно такой же, потому что мы не поместили никакого кода между async и await. Мы делаем вызов, а затем немедленно ждем возврата.
Пример 3
В следующем примере у нас две вещи одновременно:
fn juicyMain(gpa: Allocator, io: Io) !void {
    _ = gpa;
    var a = io.async(doWork, .{ io, "hard" });
    var b = io.async(doWork, .{ io, "on an excuse not to drink Spezi" });
    a.await(io);
    b.await(io);
}
fn doWork(io: Io, flavor_text: []const u8) void {
    std.debug.print("working {s}\n", .{flavor_text});
    io.sleep(.fromSeconds(1), .awake) catch {};
}Вывод:
0s $ zig run example3.zig
0s working on an excuse not to drink Spezi
0s working hard
1s $Если вы присмотритесь, то увидите, что он ждал не две секунды; он ждал одну секунду, потому что эти операции происходят одновременно. Это демонстрирует, почему использование async/await полезно - вы можете выразить асинхронность. В зависимости от выбранной вами реализации ввода-вывода, она может использовать выраженную вами асинхронность и ускорить ваш код. Например, в этом случае std.Io.Threaded смог выполнить двухсекундную работу за одну секунду реального времени.
Пример 4
Давайте приблизим пример к реальному сценарию, введя возможность сбоя.
fn juicyMain(gpa: Allocator, io: Io) !void {
    var a = io.async(doWork, .{ gpa, io, "hard" });
    var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
    try a.await(io);
    try b.await(io);
}
fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) !void {
    // Симуляция возникновения ошибки:
    if (flavor_text[0] == 'h') return error.OutOfMemory;
    const copied_string = try gpa.dupe(u8, flavor_text);
    defer gpa.free(copied_string);
    std.debug.print("working {s}\n", .{copied_string});
    io.sleep(.fromSeconds(1), .awake) catch {};
}Это тот же код, что и раньше, за исключением того, что первая задача вернет ошибку.
Угадайте, что произойдет при запуске этого кода?
Вывод:
0s $ zig run example4.zig
0s working on an excuse not to drink Spezi
1s error(gpa): memory address 0x7f99ce6c0080 leaked:
1s /home/andy/src/zig/lib/std/Io/Threaded.zig:466:67: 0x1053aae in async (std.zig)
1s     const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
1s                                                                   ^
1s /home/andy/src/zig/lib/std/Io.zig:1548:40: 0x1164f94 in async__anon_27344 (std.zig)
1s     future.any_future = io.vtable.async(
1s                                        ^
1s /home/andy/misc/talks/zigtoberfest/async-io-examples/example4.zig:8:21: 0x116338a in juicyMain (example4.zig)
1s     var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
1s                     ^
1s /home/andy/misc/talks/zigtoberfest/async-io-examples/example4.zig:35:21: 0x1163663 in main (example4.zig)
1s     return juicyMain(gpa, io);
1s                     ^
1s /home/andy/src/zig/lib/std/start.zig:696:37: 0x1163c83 in callMain (std.zig)
1s             const result = root.main() catch |err| {
1s                                     ^
1s /home/andy/src/zig/lib/std/start.zig:237:5: 0x1162f61 in _start (std.zig)
1s     asm volatile (switch (native_arch) {
1s     ^
1s 
1s thread 1327233 panic: reached unreachable code
1s error return context:
1s /home/andy/src/zig/lib/std/Io.zig:1003:13: 0x11651a8 in await (std.zig)
1s             return f.result;
1s             ^
1s /home/andy/misc/talks/zigtoberfest/async-io-examples/example4.zig:10:5: 0x11633e8 in juicyMain (example4.zig)
1s     try a.await(io);
1s     ^
1s 
1s stack trace:
1s /home/andy/src/zig/lib/std/debug.zig:409:14: 0x103e5a9 in assert (std.zig)
1s     if (!ok) unreachable; // assertion failure
1s              ^
1s /home/andy/misc/talks/zigtoberfest/async-io-examples/example4.zig:27:17: 0x1163698 in main (example4.zig)
1s     defer assert(debug_allocator.deinit() == .ok);
1s                 ^
1s /home/andy/src/zig/lib/std/start.zig:696:37: 0x1163c83 in callMain (std.zig)
1s             const result = root.main() catch |err| {
1s                                     ^
1s /home/andy/src/zig/lib/std/start.zig:237:5: 0x1162f61 in _start (std.zig)
1s     asm volatile (switch (native_arch) {
1s     ^
1s fish: Job 1, 'zig run example4.zig' terminated by signal SIGABRT (Abort)
1s $Проблема в том, что когда срабатывает первый try, он пропускает второй await, что затем ловится проверщиком утечек.
Это ошибка. Но это досадно, не так ли? Ведь мы бы хотели писать код именно так.
Пример 5
Вот исправление:
fn juicyMain(gpa: Allocator, io: Io) !void {
    var a = io.async(doWork, .{ gpa, io, "hard" });
    var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
    const a_result = a.await(io);
    const b_result = b.await(io);
    try a_result;
    try b_result;
}
fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) !void {
    // Симуляция возникновения ошибки:
    if (flavor_text[0] == 'h') return error.OutOfMemory;
    const copied_string = try gpa.dupe(u8, flavor_text);
    defer gpa.free(copied_string);
    std.debug.print("working {s}\n", .{copied_string});
    io.sleep(.fromSeconds(1), .awake) catch {};
}Мы выполняем await'ы, затем выполняем try'ы. Это решит проблему.
Вывод:
0s $ zig run example5.zig
0s working on an excuse not to drink Spezi
1s error: OutOfMemory
1s /home/andy/src/zig/lib/std/Io.zig:1003:13: 0x11651d8 in await (std.zig)
1s             return f.result;
1s             ^
1s /home/andy/misc/talks/zigtoberfest/async-io-examples/example5.zig:13:5: 0x1163416 in juicyMain (example5.zig)
1s     try a_result;
1s     ^
1s /home/andy/misc/talks/zigtoberfest/async-io-examples/example5.zig:38:5: 0x11636e9 in main (example5.zig)
1s     return juicyMain(gpa, io);
1s     ^
1s $Это завершилось успешно. Ошибка была обработана, и ресурсы не протекли. Но это опасная ловушка. Давайте найдем лучший способ выразить это...
Пример 6
Здесь на сцену выходит отмена (cancellation). Отмена - чрезвычайно удобный примитив, потому что теперь мы можем использовать defer, try и await как обычно, и не только исправить ошибку, но и получить более оптимальный код.
fn juicyMain(gpa: Allocator, io: Io) !void {
    var a = io.async(doWork, .{ gpa, io, "hard" });
    defer a.cancel(io) catch {};
    var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
    defer b.cancel(io) catch {};
    try a.await(io);
    try b.await(io);
}
fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) !void {
    // Симуляция возникновения ошибки:
    if (flavor_text[0] == 'h') return error.OutOfMemory;
    const copied_string = try gpa.dupe(u8, flavor_text);
    defer gpa.free(copied_string);
    std.debug.print("working {s}\n", .{copied_string});
    io.sleep(.fromSeconds(1), .awake) catch {};
}Благодаря отмене, мы теперь получаем мгновенные результаты, потому что в тот момент, когда первая задача возвращает ошибку, срабатывают отмены.
Вывод:
0s $ zig run example6.zig
0s working on an excuse not to drink Spezi
0s error: OutOfMemory
0s /home/andy/misc/talks/zigtoberfest/async-io-examples/example6.zig:13:5: 0x116348c in juicyMain (example6.zig)
0s     try a.await(io);
0s     ^
0s /home/andy/misc/talks/zigtoberfest/async-io-examples/example6.zig:38:5: 0x1163909 in main (example6.zig)
0s     return juicyMain(gpa, io);
0s     ^
0s $cancel -  ваш лучший друг, потому что он предотвратит утечку ресурсов и сделает ваш код более оптимальным.
cancel тривиален для понимания: он имеет идентичную семантику с await, за исключением того, что он также запрашивает отмену. Условия, при которых запросы на отмену выполняются, определяются каждой реализацией ввода-вывода.
И cancel, и await идемпотентны по отношению друг к другу и к самим себе.
Пример 7
Далее давайте рассмотрим еще один реальный сценарий: выделение ресурсов. В этом случае мы выделяем строку при успешном выполнении, которой вызывающая сторона должна управлять.
fn juicyMain(gpa: Allocator, io: Io) !void {
    var a = io.async(doWork, .{ gpa, io, "hard" });
    defer if (a.cancel(io)) |s| gpa.free(s) else |_| {};
    var b = io.async(doWork, .{ gpa, io, "on an excuse not to drink Spezi" });
    defer if (b.cancel(io)) |s| gpa.free(s) else |_| {};
    const a_string = try a.await(io);
    const b_string = try b.await(io);
    std.debug.print("finished {s}\n", .{a_string});
    std.debug.print("finished {s}\n", .{b_string});
}
fn doWork(gpa: Allocator, io: Io, flavor_text: []const u8) ![]u8 {
    const copied_string = try gpa.dupe(u8, flavor_text);
    std.debug.print("working {s}\n", .{copied_string});
    io.sleep(.fromSeconds(1), .awake) catch {};
    return copied_string;
}Теперь мы видим, почему cancel и await имеют одинаковый API. Отложенные вызовы cancel выше освобождают выделенный ресурс, обрабатывая как успешные вызовы (ресурс выделен), так и неудачные вызовы (ресурс не выделен).
Вывод:
0s $ zig run example7.zig
0s working on an excuse not to drink Spezi
0s working hard
1s finished hard
1s finished on an excuse not to drink Spezi
1s $Важно здесь то, что, управляя ресурсами таким образом, мы можем писать стандартный, идиоматический код Zig ниже, используя try и return как обычно, не беспокоясь о специальных случаях управления ресурсами.
Пример 8
Теперь мы немного меняем направление. Пришло время узнать, почему асинхронность - это не параллелизм.
В этом примере у нас есть производитель, отправляющий один элемент через небуферизованную очередь потребителю.
fn juicyMain(io: Io) !void {
    var queue: Io.Queue([]const u8) = .init(&.{});
    var producer_task = io.async(producer, .{
        io, &queue, "never gonna give you up",
    });
    defer producer_task.cancel(io) catch {};
    var consumer_task = io.async(consumer, .{ io, &queue });
    defer _ = consumer_task.cancel(io) catch {};
    const result = try consumer_task.await(io);
    std.debug.print("message received: {s}\n", .{result});
}
fn producer(
    io: Io,
    queue: *Io.Queue([]const u8),
    flavor_text: []const u8,
) !void {
    try queue.putOne(io, flavor_text);
}
fn consumer(
    io: Io,
    queue: *Io.Queue([]const u8),
) ![]const u8 {
    return queue.getOne(io);
}Мы используем async для запуска производителя и async для запуска потребителя.
Вывод:
0s $ zig run example8.zig
0s message received: never gonna give you up
0s $Это некорректно завершается успехом. В зависимости от вашей точки зрения, нам либо "повезло", либо "не повезло" из-за того, что пул потоков имел свободные ресурсы параллелизма, которые оказались доступными.
Чтобы увидеть проблему, мы можем искусственно ограничить экземпляр std.Io.Threaded использовать пул потоков размером в один:
Пример 9
// Настройка нашей реализации ввода-вывода.
    var threaded: std.Io.Threaded = .init(gpa);
    threaded.cpu_count = 1;
    defer threaded.deinit();
    const io = threaded.io();
    return juicyMain(io);
}Вывод: (тупик)
Теперь, когда используется только один поток, возникает тупик, потому что потребитель ждет получения чего-то из очереди, а производитель запланирован к выполнению, но еще не запущен.
Проблема в том, что нам нужна была параллельность, а мы просили асинхронность.
Пример 10
Чтобы это исправить, мы используем io.concurrent вместо io.async. Эта операция может завершиться ошибкой error.ConcurrencyUnavailable.
fn juicyMain(io: Io) !void {
    var queue: Io.Queue([]const u8) = .init(&.{});
    var producer_task = try io.concurrent(producer, .{
        io, &queue, "never gonna give you up",
    });
    defer producer_task.cancel(io) catch {};
    var consumer_task = try io.concurrent(consumer, .{ io, &queue });
    defer _ = consumer_task.cancel(io) catch {};
    const result = try consumer_task.await(io);
    std.debug.print("message received: {s}\n", .{result});
}
fn producer(
    io: Io,
    queue: *Io.Queue([]const u8),
    flavor_text: []const u8,
) !void {
    try queue.putOne(io, flavor_text);
}
fn consumer(
    io: Io,
    queue: *Io.Queue([]const u8),
) ![]const u8 {
    return queue.getOne(io);
}Вывод:
0s $ zig run example10.zig
0s message received: never gonna give you up
0s $Теперь код исправлен, потому что мы корректно выразили, что нам нужна параллельность, что std.Io.Threaded выполнил, создав избыточное количество задач (oversubscribing).
Если я добавлю -fsingle-threaded, что действительно ограничивает исполняемый файл одним потоком, переподписка будет недоступна, что приводит к такому выводу:
error: ConcurrencyUnavailable
/home/andy/src/zig/lib/std/Io/Threaded.zig:529:34: 0x1051863 in concurrent (std.zig)
    if (builtin.single_threaded) return error.ConcurrencyUnavailable;
                                 ^
/home/andy/src/zig/lib/std/Io.zig:1587:25: 0x1158b5f in concurrent__anon_26591 (std.zig)
    future.any_future = try io.vtable.concurrent(
                        ^
/home/andy/misc/talks/zigtoberfest/async-io-examples/example10.zig:9:25: 0x1157198 in juicyMain (example10.zig)
    var producer_task = try io.concurrent(producer, .{
                        ^
/home/andy/misc/talks/zigtoberfest/async-io-examples/example10.zig:48:5: 0x115776a in main (example10.zig)
    return juicyMain(io);
    ^Заключение
Существуют прототипы реализаций std.Io, использующие IoUring и KQueue в сочетании с сопрограммами со стеком (stackful coroutines), которые выглядят очень многообещающе, однако эта работа зависит от некоторых улучшений языка, чтобы стать практичной. Также продолжается работа над дизайном безстековых сопрограмм (stackless coroutines). Вот несколько актуальных вопросов, за которыми стоит следить тем, кто заинтересован:
- Ограниченные типы функций (Restricted Function Types) 
- Встроенная функция для определения максимального размера стека для данной функции (Builtin function to tell you the maximum stack size of a given function) 
- Исключение переполнения стека (Eliminate Stack Overflow) 
- Бестековые сопрограммы (Stackless Coroutines) 
- Juicy Main 
Эти API еще не утверждены. Вероятно, потребуется несколько итераций, чтобы довести их до совершенства. Пожалуйста, попробуйте их в реальных приложениях и дайте нам знать, как они работают! Давайте сотрудничать, чтобы сделать интерфейс ввода-вывыода практичным и оптимальным.
 
          