Привет, Хабр!
Когда Java 8 впервые подарила нам Stream API, мы все обрадовались ленивому функциональному стилю работы с коллекциями. Но хочется иногда большего. Вот, например, задача — пропустить из потока только по одному элементу каждого размера строки. Или разбить бесконечный стрим на окна фиксированного размера и обработать первые несколько таких окон. Казалось бы, чем проще: .filter
, .map
, .distinct
… Но именно этих операций не хватало. До Java 24 набор промежуточных операций в Stream
был фиксированным — map
, filter
, flatMap
, distinct
, sorted
и т. д. — и расширить его было нельзя. Приходилось придумывать сложные ухищрения (например, оборачивать объекты в record с особым equals
или собирать через collect
, а потом резать список), чтобы решить элементарные задачи. В итоге код становился громоздким и неинтуитивным.
Stream Gatherers (или просто накопители/gatherers) — это средство, которое появилось в Java 24, как раз задуманное чтобы заполнить эти пробелы. По аналогии с тем, как Collector
даёт возможность для создания собственных терминирующих операций (collect
), так и интерфейс Gatherer
призван для создания своих промежуточных операций. Метод Stream.gather(Gatherer)
появился в java.util.stream.Stream
, и каждый такой gatherer определяет, как из потока элементов получается новый поток элементов (вполне возможно с другим размером).
В общих чертах, gatherer описывает четыре шага обработки: (1) initializer
— создать инициализирующее состояние (любого типа), (2) integrator
— обрабатывать по одному элементу из входного потока, возможно обновляя состояние и выталкивая что‑то во выходной поток, (3) combiner
— как объединять два состояния (для параллельной обработки), и (4) finisher
— финальная обработка после окончания входного потока. Именно эта механика (очень похожая на Collector
, только применительно к промежуточной операции) позволяет делать всё что угодно: от группировки элементов в батчи до уникализации с любым критерием или инкрементального сканирования. Специалисты из JDK так описали Gatherers: они могут преобразовывать поток «1→1», «1→много», «много→1» и «много→много», могут отсекать бесконечные потоки, как только получится результат, и даже работать в параллели при наличии combiner
.
В стандартной библиотеке Java есть несколько готовых реализации Gatherer
в классе Gatherers
. Например:
Gatherers.windowFixed(int size)
— нарезает поток на непересекающиеся окна фиксированного размера.Gatherers.windowSliding(int size)
— даёт скользящие окна: каждое новое окно сдвигается на один элемент вперёд.Gatherers.fold(Supplier<R> init, BiFunction<R,T,R> folder)
— свёртка, похожая наreduce
, когда надо последовательность элементов свернуть в один результат.Gatherers.scan(Supplier<R> init, BiFunction<R,T,R> scanner)
— префиксный скан (накопление с выводом промежуточных результатов).Gatherers.mapConcurrent(int threads, Function<T,R> mapper)
— параллельныйmap
, исполняемый на виртуальных потоках.
Каждый из этих накопителей — это обычный Gatherer
, который скрыто создаёт состояние и выполняет интеграцию. Посмотрим на пару примеров использования готовых gatherers:
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8);
int k = 3;
// Пример Fixed Window: группируем по 3 элемента
List<List<Integer>> fixed = list.stream()
.gather(Gatherers.windowFixed(k))
.toList();
// fixed = [[1, 2, 3], [4, 5, 6], [7, 8]]
С помощью windowFixed(3)
взяли список 1…8
и собрали его в списки по 3 элемента (которых получилось два полных и один неполный). Такое поведение удобно, например, если нужно обрабатывать элементы батчами. То же самое и для скользящего окна:
List<List<Integer>> sliding = list.stream()
.gather(Gatherers.windowSliding(k))
.toList();
// sliding = [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, 8]]
Теперь каждое новое подсписок смещён на один элемент вперёд относительно предыдущего. Вы видите, что в обоих случаях результат — снова поток списков, который мы собираем в коллекцию.
Другой пример — свёртка (fold). Допустим, надо склеить из потока строк одну большую строку:
List<String> words = List.of("Hello", " ", "Habr", "!");
List<String> concatenated = words.stream()
.gather(Gatherers.fold(() -> "", (acc, elem) -> acc + elem))
.toList();
// concatenated = ["Hello Habr!"]
Gatherers.fold
принимает лямбду инициализации (пустая строка) и функцию прибавления элемента к аккумулятору. Результат — единственный элемент, конечная строка.
Или пример scan
, когда нужно видеть все промежуточные суммы. Из потока чисел получаем поток сумм:
List<Integer> numbers = List.of(1, 2, 3, 4);
List<Integer> prefixSums = numbers.stream()
.gather(Gatherers.scan(() -> 0, (acc, elem) -> acc + elem))
.toList();
// prefixSums = [1, 3, 6, 10]
Вначале аккумулятор 0, а дальше он накапливает и выдаёт промежуточные результаты: 1, 1+2=3, 3+3=6, 6+4=10.
Разумеется, mapConcurrent
позволяет делать map
с заданным уровнем параллелизма (сохраняя порядок), но я здесь не привожу код, потому что он похож на обычный map
, только под капотом запускается на виртуальных потоках (сохранение порядка гарантируется самим Gatherer).
Теперь самое интересное — как создать свой собственный.
Интерфейс Gatherer<T,A,R>
похож на Collector
, но отличается тем, что посредине конвейера: он берёт элементы типа T
и производит элементы типа R
, имея внутреннее состояние типа A
. Самый простой способ — использовать статические фабрики Gatherer.of(...)
или Gatherer.ofSequential(...)
, где вы даёте лямбды для инициализатора, интегратора, комбинирования и финализатора. Рассмотрим известную задачу: distinctBy — оставить в потоке только первый встретившийся элемент каждого ключа. Раньше для этого приходилось возиться с Set
вне стрима или специальным Collector
.
Можно написать так:
static <T, K> Gatherer<T, ?, T> distinctBy(Function<? super T, ? extends K> keyExtractor) {
Objects.requireNonNull(keyExtractor);
return Gatherer.of(
HashSet<K>::new, // инициализатор: создаёт пустое множество ключей
(seen, element, downstream) -> { // интегратор: видит новый элемент
K key = keyExtractor.apply(element);
if (seen.add(key)) { // если ключ новый
return downstream.push(element); // – пропускаем элемент дальше
}
return true; // иначе – пропускаем без вывода
},
(left, right) -> { // combiner (для parallel): объединяем множества
left.addAll(right);
return left;
},
(seen, downstream) -> {
/* finisher: ничего не делаем, так как всё уже выдано */
}
);
}
// Использование:
List<String> result = Stream.of("foo", "bar", "baz", "quux")
.gather(distinctBy(String::length))
.toList();
// result == ["foo", "quux"]
Написали distinctBy
так, чтобы первое встретившееся слово каждой длины проскочило в выходной поток. Состояние seen
— это HashSet
ранее встреченных длин. Метод integrator
добавляет длину в seen
; если add
вернул true
, значит этого ключа ещё не было, и мы делаем downstream.push(element)
(то есть выпускаем элемент наружу). Если add
вернул false
, продолжаем без вывода. Обратите внимание: интегратор возвращает boolean
. Если возвращает false
, то последующий код в стриме может прекратить выполнение (аналог Stream#anyMatch
и т. п.), но здесь мы всегда возвращаем true
или вызываем push
и возвращаем её результат — это значит продолжаем.
После такого Gatherer.of(...)
получаем обычный объект Gatherer
, который можем передать в .gather(...)
. Результат — всё работает как будто встроенная операция: коротко и понятно.
А если вы используете параллельный стрим, то для правильной параллели нужен combiner
. В примере мы реализовали combiner
и просто объединили множества ключей, так что gatherer «parallel‑safe». Если combiner не задан, то даже на параллельном стриме gatherer будет работать последовательно (то есть Stream.gather
станет неотличим от последовательного для этого этапа). При этом кратко: если нужно — указываем как комбинировать состояния. В distinctBy
это просто объединение множеств.
Ну и еще полезный приём: gatherers можно композировать. Например, сначала собрать в окна, а потом прогонять накопление по каждой группе.
В заключение скажу так: Stream Gatherers открывают новую степень свободы в построении цепочек стримов. Это как если бы collect
научилось работать на ходу — на входе потока. Теперь ту логику, что раньше приходилось реализовывать перелопачиванием коллектора или внешним состоянием, мы можем чисто формально определить в Gatherer
.
Практика покажет, как быстро наберётся библиотека таких операций. Уже сегодня в JDK есть несколько нужных накапливателей, а скоро наверняка появятся готовые решения для частых шаблонов. А главное — теперь мы сами можем написать любой нужный Gatherer
(конкурентный, ленивый, с досрочным завершением и т. д.) и вшить его в конвейер stream
.
Если взглянуть на появление Stream Gatherers шире, то становится понятно: экосистема Java развивается именно в сторону практичных инструментов, позволяющих решать прикладные задачи без лишней сложности. Такой же подход лежит в основе обучения в рамках курса Java Developer. Professional.
В ближайшее время можно бесплатно присоединиться к открытым занятиям, где разберём реальные примеры использования технологий:
26 августа в 20:00 — лучшие практики Kafka и Schema Registry в backend‑разработке на Java.
10 сентября в 20:00 — создание потоковых приложений с использованием Kafka Streams.
17 сентября в 20:00 — система сбора информации о деятельности эмитентов облигаций.
Также доступно бесплатное вступительное тестирование, которое поможет оценить уровень знаний и навыков перед началом обучения.
Кроме того, на странице курса доступна секция с отзывами — вы можете ознакомиться с опытом других специалистов, уже прошедших обучение, и понять, насколько программа подходит именно вам.
А чтобы узнать больше о курсах по Java и получить доступ к записям открытых уроков, переходите в телеграм-бот.