Привет, Хабр! В ежегодных поисках ответа на вопрос «как же удобно и просто распараллелить исполнение задач на Java», я частенько натыкаюсь на вариант основанный на использование Stream API.
Есть много статей о том когда использовать Parallel Stream и где он даст выигрыш в производительности. Хабр и Baeldung плохого не посоветуют. И вроде все хорошо — эвристики в Parallel Stream неплохие. Так и возникает соблазн задешево распараллелить любые задачи:
List<Runnable> tasks = generateExpansiveTasks();
tasks.stream().parallel().forEach(Runnable::run);
Вот и всё! Никаких тебе ExecutorService#invokeAll и join'а к Future или Future#get с его InterruptedException. Декларативно, коротко, читаемо. Казалось бы, хороший план, надежный как швейцарские часы. Но закон Мёрфи никто не отменял. Давайте посмотрим, как всё может пойти не так.
AtomicInteger callsCounter = new AtomicInteger();
List<Runnable> tasks = IntStream.range(0, 100)
.<Runnable>mapToObj(taskNumber -> () -> {
if (callsCounter.incrementAndGet() >= 10) {
throw new RuntimeException();
}
try {
Thread.sleep(1); // simulate intensive work
} catch (InterruptedException e) {
System.err.println("!INTERRUPTED!");
throw new IllegalStateException(e);
}
}).toList();
try {
tasks.stream().parallel().forEach(Runnable::run);
} catch (Exception ignore) {
}
System.out.println(callsCounter);
В sequential случае всё предельно просто — callsCounter замрёт на отметке 10. В случае parallel — как повезёт. Может 77, может 42. Но внимательные читатели к этому определенно были готовы — очевидно если больше 10 задач запущено параллельно, то callsCounter будет больше 10. Но одной проверки мало! Давайте посмотрим на callsCounter внимательнее, например 2 раза:
int check1 = 0, check2 = 0;
while (check1 == check2) {
AtomicInteger callsCounter = new AtomicInteger();
List<Runnable> tasks = IntStream.range(0, 100)
.<Runnable>mapToObj(taskNumber -> () -> {
if (callsCounter.incrementAndGet() >= 10) {
throw new RuntimeException();
}
try {
Thread.sleep(1); // simulate intensive work
} catch (InterruptedException e) {
System.err.println("!INTERRUPTED!");
throw new IllegalStateException(e);
}
}).toList();
try {
tasks.stream().parallel().forEach(Runnable::run);
} catch (Exception ignore) {
}
check1 = callsCounter.get();
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);
check2 = callsCounter.get();
System.out.println("First: " + check1);
System.out.println("Second: " + check2);
}
Здесь пора наконец сказать, что Parallel Stream выполняет задачи на common ForkJoinPool. Если быть точнее, то вызывается ForkJoinTask#fork который выберет pool в зависимости от потока, вызывающего терминальную операцию. В простом случае, при помощи awaitQuiescence мы может дождаться, пока все активные задачи в common ForkJoinPool будут завершены. В какой‑то момент мы получим разные значения для счетчика вызовов. Намного чаще расхождение можно получать, если поток вызывающий терминальную операцию не отправлять в сон.
Скрытый текст
int check1 = 0, check2 = 0;
while (check1 == check2) {
AtomicInteger callsCounter = new AtomicInteger();
Thread main = Thread.currentThread();
AtomicBoolean exceptionCaught = new AtomicBoolean();
List<Runnable> tasks = IntStream.range(0, 100)
.<Runnable>mapToObj(taskNumber -> () -> {
if (exceptionCaught.get()) {
System.out.println("EXECUTED AFTER EXCEPTION CAUGHT IN CALLER THREAD");
}
if (callsCounter.incrementAndGet() >= 10) {
throw new RuntimeException();
}
int pause = Thread.currentThread() == main? 0 : 10;
try {
Thread.sleep(pause); // simulate intensive work
} catch (InterruptedException e) {
System.err.println("!INTERRUPTED!");
throw new IllegalStateException(e);
}
}).toList();
try {
tasks.stream().parallel().forEach(Runnable::run);
} catch (Exception ignore) {
exceptionCaught.set(true);
}
check1 = callsCounter.get();
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);
check2 = callsCounter.get();
System.out.println("First: " + check1);
System.out.println("Second: " + check2);
}
В сухом остатке, если одна из операций в цепочке бросила Exception, то после возвращения ошибки вызывающему потоку, исполнение уже запущенных задач может всё еще продолжаться.
Допустим, оставшиеся в исполнении задачи не сделают ничего предосудительного. Наверное можно выдохнуть. Все стало понятно? Не совсем. Угадайте какой вариант быстрее для следующего примера:
public class Main {
public static void main(String[] args) throws Exception {
assert ForkJoinPool.getCommonPoolParallelism() == 8;
System.out.println(ForkJoinPool.commonPool());
List<Integer> sleepMillis = IntStream.range(0, 98).boxed()
.collect(toCollection(ArrayList::new));
sleepMillis.add(1000);
sleepMillis.add(2000);
Instant start = Instant.now();
sleepMillis.stream().parallel().forEach(Main::sleepMillis);
System.out.println("Parallel stream took:" + Duration.between(start, Instant.now()));
ExecutorService es = Executors.newFixedThreadPool(8);
start = Instant.now();
sleepMillis.stream()
.<Runnable>map(num -> () -> sleepMillis(num))
.forEach(es::execute);
es.shutdown();
es.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Executor service took:" + Duration.between(start, Instant.now()));
}
private static void sleepMillis(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
Интуитивно может казаться, что время выполнения должно быть примерно одинаково. Или даже небольшой выигрыш в случае Parallel Stream, ведь там исполнению помогает main поток. Однако на деле может получится наоборот.
Parallel stream took:PT3.4147801S
Executor service took:PT2.5742424S
Дело в том, что Parrallel Stream берет за минимальную единицу работы берет не отдельный вызов consumer#accept, а последовательность вызовов над частью элементов коллекции элементов коллекции. Детальнее можно посмотреть например в классе java.util.stream.ForEachOps Соответственно, в данном случае 2 самые долгие задачи будут выполнены последовательно в одном thread'е. В какой‑то мере этот аспект можно нивелировать за счет использования random shuffle.
Увы, но и это не всё. Долгие блокирующие методы могут заблокировать common ForkJoinPool. Для того чтобы понять к каким последствиям приведет его блокировка, достаточно сказать, что common Pool используется в async методах CompletableFuture, если executor не указан явно. Для примера рассмотрим следующий метод:
private static Duration measureCompletableFutureAsyncDelay() {
CompletableFuture<Long> cf = new CompletableFuture<>();
CompletableFuture<Duration> asyncStage = cf
.thenApplyAsync(start -> Duration.ofNanos(System.nanoTime() - start));
long completionStartTime = System.nanoTime();
cf.complete(completionStartTime);
return asyncStage.join();
}
Если common pool не занят, то начало выполнения thenApplyAsync callback'а будет отложено на период от нескольких микросекунд до миллисекунд. Но если common pool занят, до выполнения *Async callback'ов может пройти достаточно долгое время.
Код для экспериментов
public class Main {
public static void main(String[] args) throws Exception {
int pp = Integer.parseInt(System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "-1"));
int expectedPp = 2;
if (pp != expectedPp) {
System.err.println("Please restart with -Djava.util.concurrent.ForkJoinPool.common.parallelism=" + expectedPp);
throw new IllegalStateException("BAD PARALLELISM");
}
System.out.println("First completable callback future delay: " + measureCompletableFutureAsyncDelay());
System.out.println("Second completable callback future delay: " + measureCompletableFutureAsyncDelay());
CountDownLatch fjpExhausted = new CountDownLatch(pp);
Future<Duration> streamDuration = startLongTasksInParallelStream(fjpExhausted);
fjpExhausted.await();
System.out.println("Third completable callback future delay: " + measureCompletableFutureAsyncDelay());
System.out.println("Stream duration: " + streamDuration.get());
}
private static Duration measureCompletableFutureAsyncDelay() {
CompletableFuture<Long> cf = new CompletableFuture<>();
CompletableFuture<Duration> asyncStage = cf
.thenApplyAsync(start -> Duration.ofNanos(System.nanoTime() - start));
long completionStartTime = System.nanoTime();
cf.complete(completionStartTime);
return asyncStage.join();
}
private static Future<Duration> startLongTasksInParallelStream(CountDownLatch fjpExhausted) {
ThreadFactory tf = Executors.defaultThreadFactory();
ThreadFactory dtf = runnable -> {
Thread thread = tf.newThread(runnable);
thread.setDaemon(true);
return thread;
};
ExecutorService service = Executors.newSingleThreadExecutor(dtf);
Future<Duration> res = service.submit(() -> {
long startTime = System.nanoTime();
IntStream.range(0, 100).parallel().forEach(num -> {
if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
fjpExhausted.countDown();
}
expensiveProcess(num);
});
return Duration.ofNanos(System.nanoTime() - startTime);
});
service.shutdown();
return res;
}
private static void expensiveProcess(Object unusedArg) {
try {
Thread.sleep(1000); //simulate hard IO/blocking work
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
В этом случае в качестве частичного решения можно подставлять костыли в виде ForkJoinPool#managedBlock для операций в Parallel Stream, однако это уже совсем другая история...
Подводя итог, обязательно используйте Parallel Stream, если:
вы ищите серебряную пулю;
вы не хотите координировать завершение запущенных задач при ошибках, ведь так интересно посмотреть на проде какие side effect'ы успеет совершить исполняемый код;
вам очень хочется заблокировать common ForkJoinPool и посмотреть к чему это приведет.
И конечно:
