Привет, Хабр! В ежегодных поисках ответа на вопрос «как же удобно и просто распараллелить исполнение задач на Java», я частенько натыкаюсь на вариант основанный на использование Stream API.

Есть много статей о том когда использовать Parallel Stream и где он даст выигрыш в производительности. Хабр и Baeldung плохого не посоветуют. И вроде все хорошо — эвристики в Parallel Stream неплохие. Так и возникает соблазн задешево распараллелить любые задачи:

List<Runnable> tasks = generateExpansiveTasks();
tasks.stream().parallel().forEach(Runnable::run);

Вот и всё! Никаких тебе ExecutorService#invokeAll и join'а к Future или Future#get с его InterruptedException. Декларативно, коротко, читаемо. Казалось бы, хороший план, надежный как швейцарские часы. Но закон Мёрфи никто не отменял. Давайте посмотрим, как всё может пойти не так.

AtomicInteger callsCounter = new AtomicInteger();
List<Runnable> tasks = IntStream.range(0, 100)
  .<Runnable>mapToObj(taskNumber -> () -> {
    if (callsCounter.incrementAndGet() >= 10) {
        throw new RuntimeException();
    }
    try {
        Thread.sleep(1); // simulate intensive work
    } catch (InterruptedException e) {
        System.err.println("!INTERRUPTED!");
        throw new IllegalStateException(e);
    }
}).toList();
try {
    tasks.stream().parallel().forEach(Runnable::run);
} catch (Exception ignore) {
}
System.out.println(callsCounter);

В sequential случае всё предельно просто — callsCounter замрёт на отметке 10. В случае parallel — как повезёт. Может 77, может 42. Но внимательные читатели к этому определенно были готовы — очевидно если больше 10 задач запущено параллельно, то callsCounter будет больше 10. Но одной проверки мало! Давайте посмотрим на callsCounter внимательнее, например 2 раза:

int check1 = 0, check2 = 0;
while (check1 == check2) {
    AtomicInteger callsCounter = new AtomicInteger();
    List<Runnable> tasks = IntStream.range(0, 100)
            .<Runnable>mapToObj(taskNumber -> () -> {
                if (callsCounter.incrementAndGet() >= 10) {
                    throw new RuntimeException();
                }
                try {
                    Thread.sleep(1); // simulate intensive work
                } catch (InterruptedException e) {
                    System.err.println("!INTERRUPTED!");
                    throw new IllegalStateException(e);
                }
            }).toList();
    try {
        tasks.stream().parallel().forEach(Runnable::run);
    } catch (Exception ignore) {
    }
    check1 = callsCounter.get();
    ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);
    check2 = callsCounter.get();
    System.out.println("First: " + check1);
    System.out.println("Second: " + check2);
}

Здесь пора наконец сказать, что Parallel Stream выполняет задачи на common ForkJoinPool. Если быть точнее, то вызывается ForkJoinTask#fork который выберет pool в зависимости от потока, вызывающего терминальную операцию. В простом случае, при помощи awaitQuiescence мы может дождаться, пока все активные задачи в common ForkJoinPool будут завершены. В какой‑то момент мы получим разные значения для счетчика вызовов. Намного чаще расхождение можно получать, если поток вызывающий терминальную операцию не отправлять в сон.

Скрытый текст
int check1 = 0, check2 = 0;
while (check1 == check2) {
    AtomicInteger callsCounter = new AtomicInteger();
    Thread main = Thread.currentThread();
    AtomicBoolean exceptionCaught = new AtomicBoolean();
    List<Runnable> tasks = IntStream.range(0, 100)
            .<Runnable>mapToObj(taskNumber -> () -> {
                if (exceptionCaught.get()) {
                    System.out.println("EXECUTED AFTER EXCEPTION CAUGHT IN CALLER THREAD");
                }
                if (callsCounter.incrementAndGet() >= 10) {
                    throw new RuntimeException();
                }
                int pause = Thread.currentThread() == main? 0 : 10;
                try {
                    Thread.sleep(pause); // simulate intensive work
                } catch (InterruptedException e) {
                    System.err.println("!INTERRUPTED!");
                    throw new IllegalStateException(e);
                }
            }).toList();
    try {
        tasks.stream().parallel().forEach(Runnable::run);
    } catch (Exception ignore) {
        exceptionCaught.set(true);
    }
    check1 = callsCounter.get();
    ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);
    check2 = callsCounter.get();
    System.out.println("First: " + check1);
    System.out.println("Second: " + check2);
}

В сухом остатке, если одна из операций в цепочке бросила Exception, то после возвращения ошибки вызывающему потоку, исполнение уже запущенных задач может всё еще продолжаться.

Допустим, оставшиеся в исполнении задачи не сделают ничего предосудительного. Наверное можно выдохнуть. Все стало понятно? Не совсем. Угадайте какой вариант быстрее для следующего примера:

public class Main {
    public static void main(String[] args) throws Exception {
        assert ForkJoinPool.getCommonPoolParallelism() == 8;
        System.out.println(ForkJoinPool.commonPool());
        List<Integer> sleepMillis = IntStream.range(0, 98).boxed()
          .collect(toCollection(ArrayList::new));
        sleepMillis.add(1000);
        sleepMillis.add(2000);
        Instant start = Instant.now();
        sleepMillis.stream().parallel().forEach(Main::sleepMillis);
        System.out.println("Parallel stream took:" + Duration.between(start, Instant.now()));
        ExecutorService es = Executors.newFixedThreadPool(8);
        start = Instant.now();
        sleepMillis.stream()
                .<Runnable>map(num -> () -> sleepMillis(num))
                .forEach(es::execute);
        es.shutdown();
        es.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Executor service took:" + Duration.between(start, Instant.now()));
    }

    private static void sleepMillis(int ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
}

Интуитивно может казаться, что время выполнения должно быть примерно одинаково. Или даже небольшой выигрыш в случае Parallel Stream, ведь там исполнению помогает main поток. Однако на деле может получится наоборот.

Parallel stream took:PT3.4147801S
Executor service took:PT2.5742424S

Дело в том, что Parrallel Stream берет за минимальную единицу работы берет не отдельный вызов consumer#accept, а последовательность вызовов над частью элементов коллекции элементов коллекции. Детальнее можно посмотреть например в классе java.util.stream.ForEachOps Соответственно, в данном случае 2 самые долгие задачи будут выполнены последовательно в одном thread'е. В какой‑то мере этот аспект можно нивелировать за счет использования random shuffle.

Увы, но и это не всё. Долгие блокирующие методы могут заблокировать common ForkJoinPool. Для того чтобы понять к каким последствиям приведет его блокировка, достаточно сказать, что common Pool используется в async методах CompletableFuture, если executor не указан явно. Для примера рассмотрим следующий метод:

private static Duration measureCompletableFutureAsyncDelay() {
    CompletableFuture<Long> cf = new CompletableFuture<>();
    CompletableFuture<Duration> asyncStage = cf
            .thenApplyAsync(start -> Duration.ofNanos(System.nanoTime() - start));
    long completionStartTime = System.nanoTime();
    cf.complete(completionStartTime);
    return asyncStage.join();
}

Если common pool не занят, то начало выполнения thenApplyAsync callback'а будет отложено на период от нескольких микросекунд до миллисекунд. Но если common pool занят, до выполнения *Async callback'ов может пройти достаточно долгое время.

Код для экспериментов
public class Main {
  public static void main(String[] args) throws Exception {
    int pp = Integer.parseInt(System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "-1"));
    int expectedPp = 2;
    if (pp != expectedPp) {
      System.err.println("Please restart with -Djava.util.concurrent.ForkJoinPool.common.parallelism=" + expectedPp);
      throw new IllegalStateException("BAD PARALLELISM");
    }
    System.out.println("First completable callback future delay: " + measureCompletableFutureAsyncDelay());
    System.out.println("Second completable callback future delay: " + measureCompletableFutureAsyncDelay());
    CountDownLatch fjpExhausted = new CountDownLatch(pp);
    Future<Duration> streamDuration = startLongTasksInParallelStream(fjpExhausted);
    fjpExhausted.await();
    System.out.println("Third completable callback future delay: " + measureCompletableFutureAsyncDelay());
    System.out.println("Stream duration: " + streamDuration.get());
  }

  private static Duration measureCompletableFutureAsyncDelay() {
    CompletableFuture<Long> cf = new CompletableFuture<>();
    CompletableFuture<Duration> asyncStage = cf
        .thenApplyAsync(start -> Duration.ofNanos(System.nanoTime() - start));
    long completionStartTime = System.nanoTime();
    cf.complete(completionStartTime);
    return asyncStage.join();
  }

  private static Future<Duration> startLongTasksInParallelStream(CountDownLatch fjpExhausted) {
    ThreadFactory tf = Executors.defaultThreadFactory();
    ThreadFactory dtf = runnable -> {
      Thread thread = tf.newThread(runnable);
      thread.setDaemon(true);
      return thread;
    };
    ExecutorService service = Executors.newSingleThreadExecutor(dtf);
    Future<Duration> res = service.submit(() -> {
      long startTime = System.nanoTime();
      IntStream.range(0, 100).parallel().forEach(num -> {
        if (Thread.currentThread() instanceof ForkJoinWorkerThread) {
          fjpExhausted.countDown();
        }
        expensiveProcess(num);
      });
      return Duration.ofNanos(System.nanoTime() - startTime);
    });
    service.shutdown();
    return res;
  }

  private static void expensiveProcess(Object unusedArg) {
    try {
      Thread.sleep(1000); //simulate hard IO/blocking work
    } catch (Exception e) {
      throw new IllegalStateException(e);
    }
  }
}

В этом случае в качестве частичного решения можно подставлять костыли в виде ForkJoinPool#managedBlock для операций в Parallel Stream, однако это уже совсем другая история...

Подводя итог, обязательно используйте Parallel Stream, если:

  • вы ищите серебряную пулю;

  • вы не хотите координировать завершение запущенных задач при ошибках, ведь так интересно посмотреть на проде какие side effect'ы успеет совершить исполняемый код;

  • вам очень хочется заблокировать common ForkJoinPool и посмотреть к чему это приведет.

И конечно:

Народная мудрость. Image by Kandinsky AI 4.1.
Народная мудрость. Image by Kandinsky AI 4.1.


Комментарии (0)