
В моем текущем проекте много задач, которые выполняются в фоне. Из внешнего сервиса прилетают данные и проходят несколько стадий обработки. Обработка реализована через механизм очередей. Это удобно, можно варьировать количество воркеров на каждый тип процессов. Да и в случае, если что-то упадет, очередь будет копиться, и данные не потеряются — обработаются, как только проблема будет устранена.
Чтобы из одного процесса создать задачу для следующей стадии обработки, мы просто вызывали в конце обработки
dispatch(), примерно так:class MyFirstJob extends Job
{
use DispatchesJobs;
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
$this->doSomething($this->data);
$this->dispatch(new MySecondJob($this->data)); // Second task
}
}
А следующая стадия обработки инициировалась совершенно аналогично:
class MySecondJob extends Job
{
use DispatchesJobs;
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
$this->doSomething($this->data);
if ($this->someCondition($this->data)) {
$this->dispatch(new MyThirdJob($this->data)); // Third task
}
}
}
Поначалу было хорошо, но добавлялись новые стадии обработки и цепочка росла. В очередной раз, когда надо было добавить еще одну стадию обработки (новая очередь), я поймал себя на мысли, что уже не помню точно, что и в какой последовательности обрабатывается. И по коду понять это уже не так-то и просто. Там появились элементы бизнес логики: в таком-то случае запускается такая-то обработка, в другом случае создается сразу набор задач. В общем, все то, что мы так “любим” видеть в больших системах.
Ох-ох, подумал я, пора что-то предпринять. И решил, что будет очень удобно вынести управление порядком обработки (порядок вызовов
dispatch()) в отдельный код. Тогда все будет логично и наглядно — вот у нас бизнес процесс (управляющий код, менеджер очередей), вот у нас отдельные его кусочки (очереди).Я так и сделал и до сих пор доволен. Сейчас расскажу, что именно сделал. Буду рад, если и вам пригодится этот подход.
Управление очередями
У нас несколько независимых процессов обработки данных. Чтобы описать каждый алгоритм отдельно, делаем абстрактный класс для менеджера очередей.
<?php
namespace App\Jobs\Pipeline;
use App\Jobs\Job;
use Illuminate\Foundation\Bus\DispatchesJobs;
abstract class PipelineAbstract
{
use DispatchesJobs;
/**
* @param array $params
* @return PipelineAbstract
*/
public function start(array $params)
{
$this->next(null, $params);
return $this;
}
/**
* @param Job $currentJob
* @param array $params Set of parameters for starting new jobs
*/
abstract public function next(Job $currentJob = null, array $params);
/**
* @param Job $job
*/
protected function startJob(Job $job)
{
$this->dispatch($job);
}
}
В методе
next() у нас как раз и будет реализован бизнес процесс. startJob() — просто обертка над dispatch() на всякий случай. А start() будем использовать в том месте, где надо инициировать весь процесс обработки данных (там, где прилетают данные из внешнего сервиса).Пример реализации бизнес логики:
<?php
namespace App\Jobs\Pipeline;
use App\Jobs\Job;
use App\Jobs\MyFirstJob;
use App\Jobs\MySecondJob;
use App\Jobs\MyThirdJob;
class ProcessDataPipeline extends PipelineAbstract
{
/**
* @inheritdoc
*/
public function next(Job $currentJob = null, array $params)
{
// Start first job
if ($currentJob === null)
{
$this->startJob(new MyFirstJob($params, $this));
}
if ($currentJob instanceof MyFirstJob)
{
$this->startJob(new MySecondJob($params, $this));
}
if ($currentJob instanceof MySecondJob)
{
if ($this->someCondition($params))
{
$this->startJob(new MyThirdJob($params, $this));
}
}
}
}
Вот и все. Остается только заменить запуск
MyFirstJob.Было
$this->dispatch(new MyFirstJob($data));
Стало
(new ProcessDataPipeline())->start($data);
А вместо добавления заданий в остальные очереди вызовем метод
next().Было
$this->dispatch(new MySecondJob($data));
Стало
$this->next($data);
Чуть не забыл. Нам еще придется доработать для этого базовый класс очереди. В коде выше видно, что мы при инстанцировании объекта очереди теперь еще передаем туда помимо данных объект пайплайна.
<?php
namespace App\Jobs;
use App\Jobs\Pipeline\PipelineAbstract;
abstract class Job
{
/**
* @param array $params
*/
public function next(array $params)
{
if ($this->pipeline)
{
$this->pipeline->next($this, $params);
}
}
}
И в конструкторах конкретных джобов принимаем экземпляр пайплайна, чтобы шаги бизнес логики (вызов метода
next()) обрабатывались нужной реализацией пайплайна.class MyFirstJob extends Job
{
/**
* @param mixed data
* @param PipelineAbstract|null $pipeline
*/
public function __construct($data, PipelineAbstract $pipeline = null)
{
$this->data = $data;
$this->pipeline = $pipeline;
}
}
Вот теперь всё. Получилось похоже на цепочку ответственности. Я постарался объяснить идею простым языком. Если вам вдруг тоже захотелось так сделать, то тут я опубликовал рабочий пример реализации, возможно так кому то будет удобнее, чем на словах:
Что хорошего
- Описание процесса обработки данных теперь не размазано по коду, а сосредоточено в одном методе.
- Появилась возможность аккуратно добавить новое поведение в механизм управления очередью. Например, логирование, хранение в базе состояний обработки каждого шага.
- Стало легче добавлять новые стадии обработки и менять порядок выполнения задач.
Кстати, в свежей версии Laravel появился похожий инструмент
withChain(), он гарантирует выполнение задач в строгой последовательности. В простых случаях этого будет достаточно. Но в случаях, когда есть условия запуска того или иного процесса, когда данные для следующего процесса рождаются в предыдущем, все же нужен более универсальный механизм. Например, такой, о котором я и рассказал в этой статье. Комментарии (9)

iwex
26.04.2018 16:34А не проще было бы просто описать порядок выполнения джобов и не плодить if() {...} ?

mnv Автор
26.04.2018 16:42Вы про метод
next()? Не думал как тут можно обойтись без условий. Он вызывается из каждого джоба одинаковым способом. Внутри этого метода надо как-то понять, что делать дальше исходя из того, на какой стадии сейчас находимся. От этого и возникают условия. Тут получается этакая фабрика джобов. На мой взгляд наличие условий в этом методе — не страшно. Но если есть идеи как избавиться тут от if-ов, буду рад.
iwex
26.04.2018 16:53Для начала, какой смысл вызывать один джоб за другим? 1 раз отправили джоб чтобы запрос не висел, а внутри джоба сделать ту же цепочку.
Если все же всё пихать в джобы- вместо if просто сделать карту что за чем идет:
protected static $jobsSequence = [ MyFirstJob::class => MySecondJob::class, ... ]
или сделать просто массив с очередью и использовать поиск по массиву + брать след индекс.

mnv Автор
26.04.2018 17:19внутри джоба сделать ту же цепочку
Я в статье объяснил, почему это оказалось не удобно.
Насчет карты, можно. Это будет чуть ближе к варианту со штатнымwithChain(). Это уже дело техники и предпочтений, и зависит от ситуации. В моем случае, как я упоминал выше, есть элементы бизнес логики, иногда надо создавать 1 джоб, иногда сразу группу джобов, с разными входными данными, тут практичнее использовать условия.

L0NGMAN
27.04.2018 00:47Можете привести конкретный пример из жизни где вы используете чаининг джобов?

mnv Автор
27.04.2018 08:03Вот пример. Из внешнего сервиса прилетает новый документ на обработку. Нужно найти в нем ключевые слова, определить их тональность, сделать еще ряд манипуляций. Это делается через очереди. Каждая задача — в своей очереди. И эта цепочка как раз описана в отдельном потомке
PipelineAbstract.
Помимо этого иногда надо повторно обработать старые документы. В этом случае происходит почти то же самое, но немножко по другому. Чтобы не плодить условия внутри джобов или сами джобы, оказалось очень удобно сделать второй наследник
PipelineAbstractи там описать процесс повторной обработки материала.

Triazepin
29.04.2018 13:28Вы используете жесткое связывание когда пытаетесь в задачу добавить добавить логику управлением задач. Задача должна делать свою работу, в завершении нужно бросить событие если результат интересует еще кого-то. А в листенерах уже сопоставлять эти события с другими действиями, если надо.

mnv Автор
29.04.2018 13:42Да, можно и так. Но у меня жесткого связывания нет, логику управления задачами я как раз вынес за пределы задач. С листенерами получится примерно так же — надо передавать данные и информацию о том, в рамках какого процесса сейчас идет обработка данных, если нужно, чтобы одну и ту же задачу можно было использовать в разных процессах.
AMaxKaluga
Идут годы, а ничего не меняется.
Вспомнил Borland Pascal/C++, потоки сообщений Windows3.1 и самописную подсистему реализацию графической среды со своей системой потоков сообщений и «заимствованными» классами объктов из BP.