
В статье разбираются возможности распределённых вычислений в Apache Ignite 3. Покажу, как развернуть кластер в Docker, задеплоить собственные джобы и сравнить Ignite 3 с предыдущей версией. Затронем новые возможности Ignite как полноценной распределённой платформы, а не просто in-memory кэша.
Вступление
Apache Ignite набирает всё большую популярность, многие компании начинают использовать его в своих проектах или строить свои решения на его базе (например GridGain или Platform V). Часто Ignite воспринимают как распределённый кэш или in-memory базу данных. Однако это не единственные области его применения. В этой статье я хочу показать, как можно использовать Apache Ignite 3 для распределенных вычислений и сравнить со 2 версией.
Поднятие кластера Apache Ignite 3
Для запуска Apache Ignite 3 будем использовать Docker. Нужно пробросить два порта: 10800 — для клиента и 10300 — для управления кластером по REST, дополнительно задам имя узлу. В Ignite 2 конфигурация задавалась статически, в Ignite 3 настройки можно править динамически через CLI или REST без перезапуска узлов.
После запуска контейнера нужно инициализировать кластер. Выполним REST запрос в котором укажем: clusterName
– название кластера, metaStorageNodes
– узлы на которых хранится мета информация и cmgNodes
– группа управления кластером:
curl -s -X POST http://localhost:10300/management/v1/cluster/init \
-H "Content-Type: application/json" \
-d '{
"clusterName": "demo-cluster",
"metaStorageNodes": ["node1"],
"cmgNodes": ["node1"]
}'
и проверим состояние:
curl -s http://localhost:10300/management/v1/cluster/state

Типы распределённых вычислений
Ignite 3 умеет работать с четырьмя типами распределённых вычислений:
Обычные вычисления
Задача выполняется на одном из узлов кластера. Узел можно выбрать явно или доверить выбор системе (например,JobTarget.anyNode()
).Colocated вычисления
Код выполняется на том узле, где хранятся данные для заданного ключа. Такой подход снижает сетевой трафик и ускоряет обработку — данные не перемещаются по сети, вычисления «подъезжают» к данным.Broadcast вычисления
Одна и та же задача рассылается и выполняется на всех узлах кластера, либо на всех узлах, где расположены партиции конкретной таблицы. Это удобно для параллельных операций, мониторинга и локальных расчётов.Map-Reduce задачи
Поддерживается модель MapReduceTask: входная задача делится на подзадачи (map-часть), которые параллельно выполняются на узлах, а затем их результаты объединяются в методеreduceAsync(...)
. Это позволяет реализовывать распределённые агрегации и аналитику.
В этой статье мы реализуем два подхода: обычные вычисления и colocated.
Реализация обычных вычислений
Сначала необходимо написать класс с джобой и собрать его в JAR. Так как Ignite 3 работает на Java 11, компиляцию тоже нужно выполнять под эту версию, иначе возникнет ошибка несовместимости байт-кода. Также в 3 версии добавилась поддержка асинхронного и реактивного программирования, поэтому все джобы могут выполняться асинхронно и возвращают CompletableFuture
.
Реализуем метод для проверки числа на простоту. Из зависимостей нам понадобится только ignite-client. Если в Ignite 2 различались thick- и thin-клиенты, то в Ignite 3, архитектура упрощена — используется только лёгкий thin-клиент, поэтому деплой и вызовы джоб стали проще.
Класс с нашей джобой должен имплементировать ComputeJob<T, R>
, где первый параметр это тип данных, который приходит на вход, а второй на выход, и переопределить метод executeAsync.
Полный код будет выглядеть так:
public class PrimeCheckJob implements ComputeJob<Integer, Boolean> {
@Override
public CompletableFuture<Boolean> executeAsync(JobExecutionContext jobExecutionContext, Integer integer) {
if (integer == null) {
return CompletableFuture.completedFuture(false);
}
final boolean result = isPrime(integer);
return CompletableFuture.completedFuture(result);
}
private static boolean isPrime(int n) {
if (n <= 1)
return false;
if (n <= 3)
return true;
if (n % 2 == 0 || n % 3 == 0)
return false;
for (long i = 5; i * i <= n; i += 6) {
if (n % i == 0 || n % (i + 2) == 0)
return false;
}
return true;
}
}
Далее нужно скомпилировать это в JAR файл, и загрузить в наш кластер. Загрузку выполним через Ignite CLI. Для удобства поднимем отдельный контейнер, в который примонтируем папку с JAR-файлом и запустим CLI (в реальном проекте лучше заранее определить место, в которое складываем джобы, и не поднимать лишний контейнер).
В моём случае команда будет выглядеть вот так:
docker run -it --rm \
--network ignite3-compute-demo_default \
-v /Users/apanaev/Downloads/ignite3-compute-demo/jobs/primeCheck/build/libs:/du \
apacheignite/ignite:3.0.0 cli
Затем нас встретит Ignite CLI и спросит к какому узлу подключиться. У меня команда выглядит так:
connect http://ignite3:10300

После подключения нужно задеплоить нашу джобу, для этого выполним команду:
cluster unit deploy prime-check-job --version 1.0.0 --path /du/prime-check-job-1.0.0.jar
в которой укажем имя юнита, версию и путь до jar. Проверим, выполнив: cluster unit list

Следующий шаг — написание клиента. Ему также подключим ignite-client и создадим бин:
@Value("${ignite.address}")
private String igniteAddress;
@Bean
public IgniteClient igniteClient() {
return IgniteClient.builder()
.addresses(igniteAddress)
.build();
}
Далее создадим сервис и метод который будет вызывать джобу:
public Boolean primeCheck(Integer number) {
JobDescriptor<Integer, Boolean> job = JobDescriptor.<Integer, Boolean>builder("ru.madela.PrimeCheckJob")
.units(new DeploymentUnit("prime-check-job", "1.0.0"))
.build();
JobTarget jobTarget = JobTarget.anyNode(igniteClient.clusterNodes());
return igniteClient.compute().execute(jobTarget, job, number);
}
В JobDescriptor передадим название пакета, где хранится код и DeploymentUnit
, в котором пропишем название и версию юнита, которые указывали при деплое, пробросим метод в контроллер и попробуем вызвать:


Джоба выполнилась успешно.
Подводя итог, обычные вычисления можно рассматривать как отправную точку работы с распределёнными задачами. Они удобны тогда, когда ключевой целью является равномерное распределение нагрузки по кластеру, а данные не привязаны к конкретным узлам. Это простой и универсальный механизм, который подойдёт для большинства сценариев общего назначения, где скорость доступа к данным не является критичным фактором.
Colocated вычисления и таблицы
Теперь реализуем коллокационные вычисления, например, будем выявлять транзакции для одного пользователя, между которыми прошло менее 30 секунд за последние 10 минут.
Для начала создадим таблицу, с данными которой и будут происходить вычисления. Создать таблицу можно похожим на JPA способом, через аннотации. В Ignite 3, термин “cache
” заменен на “table
”, модель унифицирована: данные представлены в виде таблиц, доступных как через SQL, так и через K/V API:
@Getter
@Setter
@NoArgsConstructor
@Table(
value = "tx",
zone = @Zone(
value = "zone_tx",
replicas = 1,
partitions = 16,
storageProfiles = "default"
),
colocateBy = { @ColumnRef("account_id") },
indexes = {
@Index(
value = "ix_tx_acc_dt",
columns = { @ColumnRef("account_id"), @ColumnRef("dt") }
)
}
)
@Accessors(chain = true)
public class TransactionModel {
@Id
private UUID id;
@Id
@Column("account_id")
private UUID accountId;
@Column(nullable = false)
private BigDecimal amount;
@Column(value = "dt", nullable = false)
private LocalDateTime dateTime;
}
Аннотации @Getter, @Setter, @NoArgsConstructor и @Accessors предоставляются библиотекой Lombok, а @Id и @Column достаточно очевидны, поэтому подробнее рассмотрим @Table:
Аннотация @Table
Указывает, что класс маппится на таблицу в хранилище Ignite.
value = "tx" — имя таблицы в базе данных. В данном случае таблица хранит транзакции.
Зона хранения (@Zone)
Описывает конфигурацию зоны хранения, где будет размещена таблица.
value = "zone_tx" — имя зоны.
replicas = 1 — количество копий данных для отказоустойчивости. При сбое одного узла данные доступны с другого. В Ignite 2 перебалансировка могла приводить к лишнему трафику и рискам split-brain. В Ignite 3 для решения этой проблемы используется Raft, который обеспечивает согласованность и устойчивость кластера. Т.к. у нас только один узел, ставим 1.
partitions = 16 — число партиций. Чем больше партиций, тем выше параллелизм, но и накладные расходы больше.
storageProfiles = "default" — профиль хранения, определяющий, где и как хранятся данные.
В Ignite 3 есть три типа хранения:
Хранить данные персистентно с кеш слоем Page Memory с B+Tree структурой (только теперь не H2, а Apache Calcite и MVCC, что позволяет выполнять транзакции и сложные аналитические запросы прямо в SQL).
Полностью в in-memory режиме.
Новый движок RocksDB основанный на LSM Tree (Дефолтный).
Коллокация (colocateBy)
Определяет признак коллокации — т.е. какие записи будут храниться на одном узле.
@ColumnRef("account_id") — все транзакции одного аккаунта будут храниться вместе. Это важно для Colocated computations: можно выполнять расчёты по аккаунту без лишних сетевых пересылок.
Индексы (@Index)
Создает индекс для ускоренного доступа к данным.
value = "ix_tx_acc_dt" — имя индекса.
columns = { account_id, dt } — составной индекс по аккаунту и дате.
Теперь создадим и наполним таблицу тестовыми данными. Есть несколько способов писать в Ignite:
Table API - Предоставляет базовые CRUD операции.
Criterion Queries - Более сложные запросы, когда нужно задать дополнительные критерии.
SQL API - полноценный sql.
JDBC - позволяет использовать JDBC Template или аналоги.
Мы будем использовать первый вариант. Для этого нам нужен бин RecordView<R>
. Будем заполнять таблицу данными при старте приложения и выводить их.
Код бина будет выглядеть вот так:
@Bean
public RecordView<TransactionModel> transactionRecordView(IgniteClient client) {
client.catalog().createTable(TransactionModel.class);
Table t = client.tables().table("tx");
RecordView<TransactionModel> transactionModelRecordView = t.recordView(TransactionModel.class);
fillTable(transactionModelRecordView);
return transactionModelRecordView;
}
private void fillTable(RecordView<TransactionModel> transactionModelRecordView) {
UUID uuidTrue = UUID.randomUUID();
UUID uuidFalse = UUID.randomUUID();
LocalDateTime now = LocalDateTime.now();
log.info("uuid for true: {}", uuidTrue);
log.info("uuid for false: {}", uuidFalse);
List<TransactionModel> batch = List.of(
new TransactionModel()
.setId(UUID.randomUUID())
.setAccountId(uuidTrue)
.setAmount(new BigDecimal("12"))
.setDateTime(now),
new TransactionModel()
.setId(UUID.randomUUID())
.setAccountId(uuidTrue)
.setAmount(new BigDecimal("13"))
.setDateTime(now.plusSeconds(2)),
new TransactionModel()
.setId(UUID.randomUUID())
.setAccountId(uuidTrue)
.setAmount(new BigDecimal("14"))
.setDateTime(now.plusSeconds(4)),
new TransactionModel()
.setId(UUID.randomUUID())
.setAccountId(uuidFalse)
.setAmount(new BigDecimal("12"))
.setDateTime(now),
new TransactionModel()
.setId(UUID.randomUUID())
.setAccountId(uuidFalse)
.setAmount(new BigDecimal("13"))
.setDateTime(now.plusSeconds(31)),
new TransactionModel()
.setId(UUID.randomUUID())
.setAccountId(uuidFalse)
.setAmount(new BigDecimal("14"))
.setDateTime(now.plusSeconds(62))
);
transactionModelRecordView.upsertAll(null, batch);
}
Теперь можно приступить к реализации job. Чтобы не добавлять лишних зависимостей и конфигураций, воспользуемся SQL API. Также посмотрим, как сделать свой маршаллер. Для этого на вход будем получать accountId
, а на выход, объект состоящий из accountId
и листа повторяющихся транзакций.
Выходной объект:
public class RapidRepeatJobResponse {
private UUID accountId;
private Set<UUID> transactionIdList;
public RapidRepeatJobResponse() {
}
public void setAccountId(UUID accountId) {
this.accountId = accountId;
}
public Set<UUID> getTransactionIdList() {
return transactionIdList;
}
public void setTransactionIdList(Set<UUID> transactionIdList) {
this.transactionIdList = transactionIdList;
}
public UUID getAccountId() {
return accountId;
}
}
Теперь нужно реализовать маршалинг, для этого подключаем jackson-databind
и напишем:
public class RapidRepeatResponseMarshaller implements ByteArrayMarshaller<RapidRepeatJobResponse> {
private static final ObjectMapper M = new ObjectMapper();
@Override
public byte[] marshal(RapidRepeatJobResponse obj) {
try {
return M.writeValueAsBytes(obj);
} catch (Exception e) {
throw new RuntimeException("Marshal error", e);
}
}
@Override
public RapidRepeatJobResponse unmarshal(byte[] bytes) {
try {
return M.readValue(bytes, RapidRepeatJobResponse.class);
} catch (Exception e) {
throw new RuntimeException("Unmarshal error", e);
}
}
}
Также не забудем переопределить resultMarshaller
в джобе:
@Override
public ByteArrayMarshaller<RapidRepeatJobResponse> resultMarshaller() {
return new RapidRepeatResponseMarshaller();
}
С помощью SQL API будем выбирать транзакции, которые подходят под наши условия:
private static final int WINDOW_MIN = 10;
private static final long REPEAT_SEC = 30;
@Override
public CompletableFuture<RapidRepeatJobResponse> executeAsync(JobExecutionContext context, UUID accountId) {
LocalDateTime now = LocalDateTime.now();
LocalDateTime from = now.minus(Duration.ofMinutes(WINDOW_MIN));
Set<UUID> suspiciousIds = new LinkedHashSet<>();
var ses = context.ignite().sql();
try (var rs = ses.execute(
null,
"SELECT id, dt " +
"FROM tx " +
"WHERE account_id = ? " +
"AND dt BETWEEN ? AND ? " +
"ORDER BY dt",
accountId,
from,
now
)) {
LocalDateTime prevDt = null;
UUID prevId = null;
while (rs.hasNext()) {
var row = rs.next();
LocalDateTime curDt = row.datetimeValue("dt");
UUID curId = row.uuidValue("id");
if (prevDt != null && Duration.between(prevDt, curDt).getSeconds() < REPEAT_SEC) {
suspiciousIds.add(prevId);
suspiciousIds.add(curId);
}
prevDt = curDt;
prevId = curId;
}
}
RapidRepeatJobResponse resp = new RapidRepeatJobResponse();
resp.setAccountId(accountId);
resp.setTransactionIdList(suspiciousIds);
return CompletableFuture.completedFuture(resp);
}
Задеплоим джобу в Ignite и вернёмся к нашей основной программе, напишем сервис для этой джобы. Подключим джобу как зависимость в gradle. Теперь нам доступен класс с джобой, результирующий класс и маршаллер, указываем это в параметрах дескриптора, также у JobTarget
меняем anyNode
на colocated, в параметрах указываем название таблицы и ключ:
public RapidRepeatJobResponse findRapidRepeats(UUID accountId) {
JobDescriptor<UUID, RapidRepeatJobResponse> rapidRepeatJob =
JobDescriptor.builder(RapidRepeatJob.class)
.units(List.of(new DeploymentUnit("rapid-repeat-job", "1.0.0")))
.resultClass(RapidRepeatJobResponse.class)
.resultMarshaller(new RapidRepeatResponseMarshaller())
.build();
JobTarget target = JobTarget.colocated("tx", accountKey(accountId));
return igniteClient.compute().execute(target, rapidRepeatJob, accountId);
}
private Tuple accountKey(UUID accountId) {
return Tuple.create()
.set("ACCOUNT_ID", accountId)
.set("id", new UUID(0L, 0L));
}
Пробросим наш сервис в контроллер и протестируем.


Программа успешно отработала и вернула ожидаемый результат.
В отличие от обычных вычислений, коллокейтед-задачи фокусируются на эффективности работы с данными. Такой подход минимизирует сетевой трафик и обеспечивает тесную привязку вычислений к месту хранения информации. Colocated-вычисления особенно полезны для задач аналитики и обработки больших объёмов транзакций, где критично работать именно там, где находятся данные, чтобы получить максимальную производительность.
Заключение
Если Ignite 2 чаще воспринимался как «продвинутый кэш», то Ignite 3 демонстрирует куда более широкий спектр применения. Появление Raft, RocksDB, Calcite и MVCC указывает на движение в сторону распределённой базы данных. Но не стоит забывать и про вычислительную часть: в Ignite 3 распределённые вычисления стали естественным продолжением архитектуры, что делает платформу удобной не только для хранения, но и для обработки данных на лету. Подробнее про изменения можно найти тут или тут.
Для разработчика это означает меньше ограничений и больше гибкости — можно строить системы, которые одновременно отвечают за данные и умеют над ними считать. Ignite 3 можно рассматривать как шаг от удобного инструмента к полноценному фундаменту для высоконагруженных распределённых систем.
Полный код тут.