«E pur si muove!»
Галилео Галилей.
1. Предисловие
Настоящая статья является важным шагом на пути к созданию демонстратора системы движков, взаимодействующих в потоке данных на базе встроенного интерпретатора Forth в Elixir. Интерпретатор Forth движков описан в предшествующей статье [1]. В серия статей [2, 3, 4] рассказывалось о рабочих моментах разработки демонстратора.
Данная статья описывает ручную сборку распределенной системы узлов, выполняющих единую задачу вычисления корней квадратного уравнения по предписанию графа обработки данных в потоке. Далее для краткости будем называть такой граф графом обслуживания.
Такую же постановку демонстрационной задачи вычисления корней квадратного уравнения на графе обслуживания я сделал 5 лет тому назад в статье «Анти–Тьюринг», https://habr.com/ru/articles/593379/.
Тогда это был прообраз системы движков. Настоящей реализации демонстратора принципа обработки данных в потоке на Elixir предшествовала работа по написанию встроенного интерпретатора Forth и широкая систематизация движков, шлюзов, кнопок и тактовых генераторов в составе системы.
Перейдём сразу к делу.
2. Задача демонстратора системы.
Для испытания системы движков взята всем известная задача решения квадратного уравнения, решение которой описывается следующим псевдокодом:
a ← <значение>
b ← <значение>
c ← <значение>
det ← b*b – 4*a*c
sqrt ← sqrt(det) wen det >= 0
x1 ← (-b – sqrt) / 2 a wen а <> 0
x2 ← (-b + sqrt) / 2 a wen а <> 0
Задача нахождения корней квадратного уравнения очень подходит для демонстратора. В ней есть
· свойство естественного распараллеливания выполнения алгоритма,
· точки ветвления алгоритмов и точки сбора данных,
· четкая формализация проверок в пунктах пропуска (checkpoints) потока управления.
Нам необходимо отобразить предложенный алгоритм в граф обслуживания. В предыдущей статье [2] была предложена классификация шлюзов на графе обслуживания:
· Setpoint — входная уставка,
· R_gateway — шлюз поступающих регулярных данных,
· Ir_gateway — шлюз поступающих нерегулярных данных и
· Stock — шлюз выходных данных.
Для построения графа обслуживания на плоскости trunk добавили новый узел GenEngine. Граф обслуживания алгоритма нахождения корней квадратного представлен на Рис. 1.

Рис. 1. Структура алгоритма задачи квадратного уравнения в виде графа обслуживания.
Подробное писание шлюзов :a, :b и :c дано в статье [1]. Предполагается, что шлюз выходных данных :memo служит для складывания данных в архив. У шлюза :memo типа Stock отсутствует список следующих узлов, поэтому поток данных завершается.
Движки GenEngine и шлюзы обслуживают прохождение данных по скелетным ребрам графа.
При создании движков и шлюзов вызывается «конструктор», которому передаются среди прочих параметров:
· атом названия движка/шлюза;
· список элементов, следующих по потоку за движком/шлюзом.
Для законченности описания графа на Рис. 1 дадим его свойства: конечный, ориентированный, ациклический без параллельных ребер и петель.
Примечание. Если бы на графе были бы циклы, это означало бы, что для реализации движков GenEngine нельзя использовать GenServer OTP, которые запрещают посылать сообщения самому себе. А серверные процессы с сохранением состояния могут!? И это выглядит перспективно!
3. Движки управления графом обслуживания.
Элемент run_stop «класса» Button представляет новую разновидность движков типа кнопки, запускающих/останавливающих обслуживание графа. Элемент run_stop фактически принадлежит параллельной плоскости управления, или диспетчеризации, switch показанной на Рис. 1. Эта отдельная тема обсуждения, в которой будут задействованы монады [4].
По реализации кнопка похож на шлюз типа Setpoint, т.к. элемент предназначен для взаимодействия с внешним миром и пропускает/генерирует два значений: :off или :on. Гипотетически кнопки бывают двух видов: сохраняющие состояние и без сохранения состояния. Наша кнопка run_stop сохраняет состояние между нажатиями.
Кнопка на схеме имеет два потомка b и c по потоку графа обслуживания. Но сама кнопка run_stop никак не принадлежит графу обслуживания.
4. Концепция GenEngine. Движение данных по графу.
В отличие от шлюзов в обобщенном движке GenEngine включается в игру встроенный интерпретатор. Для этого «конструктору» движков GenEngine среди параметров передается определение слова, или скрипт, который будет запускаться на встроенной машине Forth при прохождении данных.
На Рис. 1 промежуточные вершины графа обслуживания :det, :sqrt, :x1 и :x2, имеющие тип GenEngine, выполняют следующие типовые операции:
1) приёма данных из входных каналов,
2) проверка актуальности данных,
3) обработка данных встроенным интерпретатором Forth,
4) передачи, данных в выходные каналы.
Эти операции разделены движка GenEngine между двумя блоками:
· внешний блок маршрутизации сообщений через Elixir,
· внутренний блок обработки данных во встроенном Forth.
Таким образом, Elixir является инфраструктурой построения графа обслуживания и маршрутизации данных, а стеки Forth содержит в себе прикладные алгоритмы задачи.
Основные моменты реализации обобщенного движка GenEngine рассмотрены в следующем разделе.
5. Формат сообщения между обобщенными движками GenEngine.
Пакет типового сообщения имеет формат:
{tag, sender_name, value, timestamp, birthmark }
, где tag принимает в основном значения :flow, :run, :stop;
birthmark является метрикой о родительских шлюзах, на которых были порождены данные.
6. Реализация обобщенного движка GenEngine.
В статье [4] описан API движка и приёмы загрузки движка монады исполнительным кодом и переменными Forth вызовом функции execute, реализующим асинхронный обратный вызов типа cast в технологии OTP GenServer. В этом плане движок GenEngine не отличается от движка Monad.
Для движка Monad применяются паттерны модели акторов и для их функционирования достаточно было реализовать три функции API обобщенного движка:
• execute(engine, words) — запуск на eng_name выполнения набора words,
• add_var(engine, name, value) — установить переменную name со значением value,
• forward(engine, name, value) — упаковать переменную name со значением value в кортеж, результат переслать следующим движкам.
Потом добавились отладочные/надзорные функции:
• get_word(engine, word_name) — проинспектировать на движке engine слово word_name,
• get_var(engine, var_name) — проинспектировать на движке engine переменную var_name,
• tos(engine) — проинспектировать на движке engine значение вершины стека с удалением его из стека.
Можно сказать, что функция API tos введена на замену стандартного слова «dot» вывода на консоль значения вершины стека данных. Функция tos выдает результат вычислений в канал, а использование слова "dot" в сетевой среде теряет всякий смысл.
Рассмотрим поля «структуры» состояния обобщенного движка:
1) forth_state — состояние интерпретатора Forth
· virt_code — колода обрабатываемого кода,
· data_stack— стек данных,
· return_stack— обратный данных,
· dictionary — словарь;
2) self_name — собственное имя движка,
3) sources — список имен входных каналов данных,
4) stocks — список имен выходных каналов данных.
Состояние обобщенного движка организована в словарь ключевых слов Elixir, что дает свободу по передаче параметров и распаковки «структуры».
По–видимому, в дальнейшем список полей будет расширяться. Например, будет добавлено поле максимально допустимой временной разбежки прихода данных через входные каналы индивидуальной для движка.
7. Синхронизация потоков данных на входе GenEngine. Решение проблемы «гонки данных».
Здесь проблема «гонки данных» понимается как «устаревание» данных на входе многоканального обобщенного движка, т.к. данные поступают не синхронно.
Теоретически вариантов решений проблемы с «гонкой данных» может быть множество. В демонстраторе применяется наиболее простой, который пришёл на ум. По крайней мере, он эффективен в моих планируемых к реализации прикладных задачах.
1) В конфигурации системы прошита временная разбежка dispersion данных на входе обобщенного движка;
2) Поступающие данные содержат timestamp момента порождения;
3) С приходом очередного пакета сообщения он записывается в словарь и определяется максимальный и минимальный timestamp в словаре.
4) Если словарь собрал полный набор данных и разница между максимальным и минимальным timestamp меньше dispersion, то данные передаются на во встроенный интерпретатор Forth и запускается исполнение определения целевого слова.
Если в конфигурации общей временной разбежке присвоить значение :infinity, то данные на входе обобщенного движка вообще не будут устаревать.
8. Задание сборки системы движков.
Здесь я просто приведу код модуля Assembly, который формирует словарь описания сборки движков.
def start(_type, _args) do
units = %{rassokha@vak2:
[{Stock, [:repo, [:x1, :x2], []]}],
trunk@vak2:
[{GenEngine, [:det, [:a, :b, :c], [:sqrt], "det.fs"]}, # скрипт в файле
{GenEngine, [:sqrt, [:det], [:x1, :x2], ": sqrt det @ DUP 0 >= IF SQRT ELSE error TO-ATOM TO-TUPLE THEN ;"]},
{GenEngine, [:x1, [:a, :b, :sqrt], [:repo], "x1.fs"]},
{GenEngine, [:x2, [:a, :b, :sqrt], [:repo], "x2.fs"]}
] ,
butt@vak2:
[{Setpoint, [:a, [], [:det, :x1, :x2], 1]}, # значение по умолчанию – 1
{Ir_gateway, [:b, [:run_stop], [:det, :x1, :x2]]},
{R_gateway, [:c, [:run_stop], [:det]]}
],
switch@vak2:
[{Button, [:run_stop, [], [:b, :c], :off]}] # начальное состояние – выключено
}
Dispatcher.start_link(units)
end
Управление передает серверу Dispatcher. Диспетчер инициирует создание шлюзов и обобщенных движков согласно переданной «структуры» units, а потом подгружает движкам скрипты.
Структура модуля Assembly очень похожа на модуль Application в Elixir, который запускает процессы и передает управление супервизору. При сравнении поведений Assembly с Application можно фундаментальную точка разрыва с традиционным паттерном управления централизованной системой:
Supervisor -> подвизоры -> … -> рабочие процессы
Если супервизор выполняет роль верхнего наблюдателя централизованной системы, поддерживающего стратегию «Let it crash!», то у диспетчера роль более деликатная и точная по дирижированию распределенной системой, например, проведение остановки системы по сценарию аварийной ситуации.
Для демонстрации особенностей Dispatcher рассмотрим скрипты движков. Движки GenEngine загружаются скриптами Forth из соответствующих файлов: det.fs, x1.fs и x2.fs. А у :sqrt скрипт определяется непосредственно в коде Assembly. Это удобно для коротких скриптов.
Скрипт определения слова sqrt начинается с защитной конструкции: в случае отрицательно значения переменной det маршрутизатору движка возвращается не факт аварии вычисления, ведущий к перезапуску рабочего процесса, а кортеж {:error, значение det} и движок отсылает сигнал ошибки в Dispatcher, что сигнализирует о неисправности на предыдущем этапе обработки потока. Таким образом, Dispatcher может обработать сигнал разумным образом.
В настоящей реализации демонстратора Dispatcher просто фиксирует приход сообщения о «неисправности». О поведении Dispatcher предстоит ещё рассказать более детально после исследований в следующих публикациях.
9. Отрицательный опыт настройки структуры кластера распределенных узлов
Как я уже упоминал, кластер распределенных узлов мной запускался вручную. Запускались пять распределенных узлов:
· main — узел, на котором запускается модуль Assembly и выполняется модуль Dispatcher,
· rassokha — верхний узел стоковых шлюзов,
· trunk — стволовой узел основных движков,
· butt — нижний узел входных шлюзов и модуля ввода уставки Setpoint; модуль Setpoint перехватывает процесс обработки консоли узла,
· switch — узел управляющей кнопки: модуль Button перехватывает процесс обработки консоли узла.
На данном этапе распределение узлов и назначении им модулей производилось по субъективной оценке об оптимизации производительности. Ещё учитывалось требование перехвата консоли ввода данных с клавиатуры.
После запуска оболочки Elixir отдельного узла командой iex –sname имя_узла … выполнялись команды создания инфраструктуры кластера. В качестве примера приведу эти команды для узла butt (комель), в котором удаленно запускается модуль Setpoint:
iex(switch@loclhost)1> Node.connect(main@loclhost)
true
iex(switch@loclhost)2> :global.register_name(:setpoint_ldr, :erlang.group_leader)
Как видно из приведенной команды в распределенной системе узлов Erlang/Elixir вопрос перехвата процесса обработки ввода/вывода консоль решается… в полу–ручном режиме. По крайней мере, попытка переключаться непосредственно в коде на лидера группы мне не удалась, хотя функция такая имеется.
В модуле Setpoint перехват консоли производится следующим образом:
leader = :global.whereis_name :setpoint_ldr
IO.input(leader, “Уставка? ”)
Я привел здесь эти подробности, чтобы собрать здесь хоть какие–то практические советы по созданию распределенного приложения. А применение соответствующего оператора Node.connect(butt@loclhost) прямо в коде диспетчера порождало неработоспособную систему (!?). Поэтому собираюсь проштудировать фундаментальную работу [6].
Общий запуск демонстратора распределенной системы движков производится в окне узла main@loclhost командой:
Assembly.start(nil, nil)
Приведение демонстратора распределенной системы движков в работу производится ответом «Yes» на приглашение:
Кнока run_stop выключена! Включить? (Yes)
в окне узла switch@loclhost.
10. Результаты испытания демонстратора.
Тестирование системы выполнялось на кластере 5–ти распределенных узлов. Данные генерировались на узле butt случайным образом с периодом 4000-5000 мс. Допустимое время временной разбежки данных на узлах 2000 мс.
Вообще, я понял, что поведение системы напрямую зависит от выбора временных параметров. Это в свою очередь зависит от понимания специалистом обслуживаемых процессов.
Шлюз :memo записывал получаемые сообщения в текстовый файл log.txt. Пример содержимого файла log.txt приведен ниже:
Вторник, 12 мая 2026 г.
06:12:34.744 Сток memo получил 1.0 от x1. Исходные данные: %{c: 7, b: 3, a: -10.0}
06:12:34.744 Сток memo получил -0.7 от x2. Исходные данные: %{c: 7, b: 3, a: -10.0}
06:12:38.903 Сток memo получил 0.5114748010928823 от x1. Исходные данные: %{c: 19, b: 14, a: -100.0}
06:12:38.903 Сток memo получил -0.37147480109288233 от x2. Исходные данные: %{c: 19, b: 14, a: -100.0}
06:13:09.352 Сток memo получил 0.8347442444041993 от x1. Исходные данные: %{c: 18, b: 16, a: -45.0}
06:13:09.352 Сток memo получил -0.47918868884864363 от x2. Исходные данные: %{c: 18, b: 16, a: -45.0}
…
06:18:58.927 Сток memo получил 2.8519221295943136 от x1. Исходные данные: %{c: 15, b: 9, a: -5.0}
06:18:58.928 Сток memo получил -1.0519221295943137 от x2. Исходные данные: %{c: 15, b: 9, a: -5.0}
06:20:26.234 Сток memo получил 1.8685170918213299 от x1. Исходные данные: %{c: 3, b: 4, a: -3.0}
06:20:26.234 Сток memo получил -0.5351837584879964 от x2. Исходные данные: %{c: 3, b: 4, a: -3.0}
…
11. Итоги
В настоящей статье доведена до уровня реализации концепция формулы:
«Структура алгоритмов + данные = программа»
В отличие от известных фреймворков систем обработки данных в потоке результат представляет собой систему программирования статической структуры потоков.
По–моему мнению статического структуры потоков данных полезны в задачах небольшого масштаба, например, в программировании микроконтроллеров. Если Богу будет угодно, то напишу и об этом.
Сборка распределенной программы производится вручную. В дальнейшем будет предложен способ автоматической сборки программы обработки данных в потоке. Поэтому на GitHub проект пока выложен не будет.
Литература
1. «Опыт реализации интерпретатора Forth на языке Elixir», https://habr.com/ru/articles/985894/.
2. «Hello, World! Hello, World! Hello, в парадигме обработки данных в потоке», https://habr.com/ru/articles/1012632/,
3. «Попытка имитации расширения модуля Elixir как класса ООП», https://habr.com/ru/articles/1007648/,
4. «Реализация прототипов взаимодействующих движков Forth класса тактовых генераторов», https://habr.com/ru/articles/1002748/.
5. . «Анти–Тьюринг», https://habr.com/ru/articles/593379/
6. Ф.Чезарини, С.Виноски, «Проектирование масштабируемых системс помощью Erlang/OTP» — М: ДМК Пресс, 2017.