Pull to refresh

CompletableFuture. Глубокое погружение

Level of difficultyHard
Reading time20 min
Views20K

java.util.concurrent.CompletableFuture - класс не новый. Он предстал перед нами во всём своём величии в 2014-м году вместе с выпуском Java 8. Много лет с тех пор прошло, а проще он не стал.

Мы в компании называем их "фьючи". На Хабре было много материала по отдельным частям их функциональности, но я решил поставить перед собой более серьёзную задачу - постараться разобрать внутреннее устройство и многие неочевидные нюансы работы с этим классом.

Оглавление

  1. Введение

  2. Что внутри

  3. Кто и как завершает фьючи

  4. Как работает композиция

  5. Какой поток выполнит ваш код

  6. В каком порядке выполняются независимые операции

  7. Что на самом деле делают методы get/join

  8. Исключения

  9. anyOf

  10. allOf

  11. Исходный код Дага Ли

  12. Заключение

1. Введение

CompletableFuture - это класс, реализующий как интерфейс Future ("старый"), так и интерфейс CompletionStage ("новый"). В рамках данной статьи я рассмотрю почти все методы первого и часть методов второго. Моей целью не стоит копировать javadoc, с документацией читатель всегда может ознакомиться самостоятельно. Тем не менее, какие-то комментарии по поведению методов я всё же буду давать, надеюсь достаточные.

Данный класс, по своей сути, представляет собой способ управления результатом выполнения асинхронных операций. Сюда включается:

  • Ожидание окончания операции (успешного или неуспешного).
    Это методы get* и join.

  • Уведомление о завершении операции.
    Это методы complete*.

  • Композиция - создание новых объектов CompletableFuture на основе существующих. Иными словами - зависимые операции. Это методы then*, when*, handle* и т.д. Много их.

  • Прочие мелочи.

Например, можно себе представить такой код. В нём показан самый базовый сценарий.

// Переданная лямбда будет выполнена в другом потоке.
CompletableFuture<Void> asyncOp = CompletableFuture.runAsync(
    () -> doAsyncOperation()
);

// Пока она выполняется, текущий поток может заняться чем-то ещё.
doSomethingElse();

// А после дождаться выполнения асинхронной операции.
asyncOp.join();

Фьючи и соответствующие им операции связаны только программным кодом. Нет никакого класса *Task, нет явно выделенного потока, который соответствовал бы операции. За всем программист должен следить самостоятельно, особенно если он сам аллоцировал фьючу.

2. Что внутри

Несмотря на то, как много возможностей даёт этот класс (в нём десятки самых разных методов в API), устроен он весьма просто. Всего два поля:

volatile Object result;       // Either the result or boxed AltResult
volatile Completion stack;    // Top of Treiber stack of dependent actions

Для незавершённых операций поле result равно null. Для завершённых есть несколько вариантов:

  • NIL - специальное значение, обозначающее завершённую операцию, результат выполнения которой равен null.

  • Инстанс java.util.concurrent.CompletableFuture.AltResult, содержащий внутри себя объект-исключение, если операция была отменена или завершена с ошибкой. Подробно об исключениях будет позже.

  • Непосредственно результат выполнения операции, не обёрнутый ни в какие объекты, т.е. тот объект, который создал пользователь.

Никакого флага вроде cancelled здесь нет. Фьюча, отменённая с помощью вызова метода cancel, содержит обычный AltResult с CancellationException. Т.е. вариантов для result ровно 3.

Критерий null / не null - это способ проверить, завершена фьюча или нет. Метод isDone буквально это и делает. Аналогично работает и isCompletedExceptionally, он проверяет что результат является инстансом AltResult и не равен NIL (NIL сам по себе равен new AltResult(null)). isCancelled в дополнении к этому проверяет тип исключения.

stack содержит коллекцию действий, которые нужно выполнить по завершению фьючи. О том, что это за действия и какой поток их выполняет, будет немного позже.

3. Кто и как завершает фьючи

Фьюча может завершиться по множеству причин. В первую очередь - разработчик может самостоятельно вызвать complete / completeExceptionally / cancel.

Вторая возможная причина - завершился код, ассоциированный с инстансом. Например, выполнился Runnable, переданный в CompletableFuture#runAsync. То есть ситуация, в которой разработчик не вручную аллоцировал фьючу.

Третья причина - была закомпличена source фьюча. В API CompletionStage огромное число методов, которые возвращают разработчику новые инстансы, зависимые от ранее имеющегося объекта. Например:

CompletableFuture<Object> source = giveMeTheFuture();

CompletableFuture<Object> dependent = source.thenApply(this::transformResult);

В момент вызова thenApply и других похожих методов создаётся невидимая связь. Объект source начинает внутри себя обладать ссылкой на объект dependent и вызывает на нём (на них) complete / completeExceptionaly когда завершается сам.

Причина номер четыре - таймаут. Если на фьюче вызвать методы orTimeout / completeOnTimeout и подождать необходимое время, то фьюча завершится либо отменой (вызовом cancel), либо переданным в completeOnTimeout значением. Важно помнить, что оба этих метода завершат текущую фьючу вместо того, чтобы создать новую зависимую фьючу и завершать только её.

Когда какой-то код завершает фьючу, внутри неё срабатывает вызов RESULT.compareAndSet(this, null, res); (RESULT - это приватный VarHandle для поля result, лежащий внутри класса CompletableFuture).

Чем хорош данный compareAndSet так это тем, что поле result изменится с null на реальное значение только один раз. Методы complete* / cancel как раз возвращают результат этого CAS-а. В состоянии гонки не получится так, что несколько потоков могли увидеть разный результат исполнения, тут всё честно. Кто первый, тот и выиграл.
Есть конечно методы obtrude*, которыми можно перезатереть result, но пользоваться ими не нужно.

После того, как значение результата проставлено, настаёт время разбираться со всеми зависимыми фьючами (как-то лежащими в поле stack) и завершать уже их.

4. Как работает композиция

Композиции бывают разные, все из них представляют собой разные виды зависимостей и комбинирования результатов выполнения фьючей.

Например, для выполнения операции B необходимо знать результат выполнения операции A - это один из самых простых видов зависимости. Может быть сложнее - новую операцию C нужно запускать только после того, как обе A и B завершены, или же хотя-бы одна из операций A и B завершена, и т.д.

Так или иначе, нам точно известно, что зависимые операции обязаны выполняться последовательно друг за другом, пусть и потенциально в разных потоках.

Композиция фьючей реализуется через модификацию поля stack. Как видно из названия - это стек, но не простой, а тот, который можно модифицировать неблокирующим образом. Всё внутри CompletableFuture построено на CAS-ах и очень аккуратном отношении к возможным гонкам, никаких локов там нет. При создании зависимости происходит модификация поля stack у source фьючи. В голову добавляется новый элемент, который содержит тип новой зависимости и ссылку на dependent фьючу.

Например, рассмотрим простой тип композиции, который я уже упоминал - thenApply(Function<...> fn). Это метод, который делает CompletableFuture функтором, завершая зависимую фьючу значением функции fn, вызванной на результате source фьючи. Опуская пока что несущественные детали, реализация метода thenApply сводится к следующему коду:

CompletableFuture<V> d = newIncompleteFuture();
unipush(new UniApply<T,V>(e, d, this, f));
return d;

Всё просто. Создаём зависимую фьючу, добавляем новый инстанс Completion в стек и возвращаем. Созданный Completion содержит ссылки как source (this), так и на dependent (d) фьючи, ну и на функцию вычисления результата f конечно же. e - это Executor, в котором вызовется функция f, он нам не особо интересен.

Когда настанет время завершать source фьючу, будет выполнена итерация всего содержимого её поля stack (с одновременной очисткой этого поля) и вызов Completion#tryFire на всех найденных зависимостях. Для UniApply этот метод выглядит следующий образом:

final CompletableFuture<V> tryFire(int mode) {
    CompletableFuture<V> d; ...
    Object r; ... Function<? super T,? extends V> f;
    ...
        try {
            ...
                @SuppressWarnings("unchecked") T t = (T) r;
                d.completeValue(f.apply(t));
            ...
        } catch (Throwable ex) {
            d.completeThrowable(ex);
        }
    }
    ...
}

Как видно, оставил я только самое важное. При завершении source фьючи её результат будет передан в функцию f, а то, что на выходе, станет результатом зависимой фьючи d. Цикл замкнулся.

5. Какой поток выполнит ваш код

На мой взгляд - это вопрос, вызывающий наибольшее недопонимание со стороны Java разработчиков. С другой стороны вполне возможно, что многие из вас этот вопрос себе никогда и не задавали.

Какой код я имею ввиду? Любой пользовательский код, который был передан как зависимая операция в любой из accept*/run*/then*/handle*/when* и т.д. методов в качестве параметра, например код реализации Function из примера ранее. Где он выполняется:

  • Если был вызван метод с суффиксом Async, например thenApplyAsync, то операция будет выполнена в переданном в метод executor-е. Если executor явно не указан, то операция будет выполнена в CompletableFuture#defaultExecutor. Он, скорее всего, совпадёт с ForkJoinPool.commonPool (подробности в документации).

// Вот этот код:
fut.thenApplyAsync(res -> foo(res), e);

// делает по сути то же самое, что вот этот:
fut.thenCompose(res -> CompletableFuture.runAsync(() -> foo(res), e));
  • Далее речь идёт уже о не Async методах. Если композиция происходит с уже завершённой фьючёй, то переданная операция выполнится синхронно в том же самом потоке, т.е. сразу же. Причина такого поведения проста - раз фьюча уже завершена, то не стоит ожидать, что какой-то другой поток вдруг начнёт разгребать всё, что лежит в stack и выполнять созданную нами операцию.

  • На этом хорошие новости заканчиваются, в остальных случаях у нас мало контроля за тем, какой поток будет выполнять код. Например, если на фьюче был вызван onTimeout, и необходимое время прошло, то ваша операция будет выполнена в потоке с именем CompletableFutureDelayScheduler (при условии, что операция принимает исключение как параметр, иначе она не будет вызвана вообще). Данный поток создаётся самим классом CompletableFuture и является синглтоном с ленивой инициализацией.
    Если не быть внимательным, можно легко завалить этот поток своими операциями, сделав его узким местом в системе.

  • Любой поток, вызывающий complete*, начинает синхронно комплитить все зависимые фьючи, как уже было упомянуто ранее. Если complete* вызван из двух потоков, то только один из вызовов сможет записать результат. Но при этом всё равно оба потока начнут разматывать поле stack и выполнять пользовательские операции, лежащие в нём.

    Это может привести к неожиданным результатам. Например, вернёмся к методу onTimeout. Что если и пользователь, и delayed scheduler, попробуют завершить фьючу практически в одно и то же время? А то, что CompletableFutureDelayScheduler может начать выполнять в себе пользовательский код несмотря на то, что TimeoutException не случился!
    Сперва может показаться, что я ошибаюсь, ведь в коде класса CompletableFuture.Timeout есть защита. Но посмотрите внимательнее, в момент вызова completeExceptionally фьюча всё равно уже может быть завершена.

if (f != null && !f.isDone())
    // Вот тут может быть плохо, если мы в состоянии гонки.
    f.completeExceptionally(new TimeoutException());

Рассмотрим подробнее неожиданность результатов работы onTimeout. Что может быть выведено в консоль при выполнении следующего кода (текущий поток - main)?

CompletableFuture<Object> f = new CompletableFuture<>();
f.whenComplete((res, throwable) ->
    System.out.println(res + " " + Thread.currentThread().getName())
);

f.orTimeout(100, TimeUnit.MILLISECONDS);

Thread.sleep(100);
f.complete("Result");

В теории возможны 4 варианта, потому что комбинаторика:
1. Result main
2. null CompletableFutureDelayScheduler
3. Result CompletableFutureDelayScheduler
4. null main

А на практике? Если вы читали внимательно, то наверное догадываетесь, что и на практике тоже возможны все 4 варианта.

В каких ещё обстоятельствах может запуститься пользовательский код? Любой поток, вызвавший get или join, тоже участвует в кооперативном завершении зависимых фьючей, но только при условии, что ему пришлось ждать значения (фьюча ещё не была завершена в момент вызова). То есть вы не просто блокируете поток, но ещё и начинаете выполнять на нём какой-то рандомный код, о существовании которого могли и не догадываться.
Предварительная проверка вроде if (fut.isDone()) сможет от этого защитить, но тогда не до конца понятно, что писать в else ветке. Какой-нибудь *Async вызов?

Может показаться, что есть и другой выход. Если вместо fut.get() написать fut.thenApply(Function.identity()).get(), то у операнда get не будет зависимых фьючей. Мы точно об этом знаем, поскольку ни у кого другого больше нет доступа к этому инстансу. Его поле stack пустое, а значит никакой левый код исполняться не должен, так ведь?

Нет, конечно же не так, это было бы слишком просто! Когда делаешь подобные допущения, нужно обязательно проверять, верны ли они. Что интересно, так это то, что с get в этот раз всё чисто (stack то и правда null) и малину нам может испортить именно вызов thenApply.

Это последний пункт в списке действий, которые могут заставить ваш поток выполнять неожиданные операции. Создание композиции, т.е. вызов fut.then*, fut.when*, fut.handle* и т.д. в тот момент, когда другой поток параллельно комплитит fut. Вам, конечно, должно сильно повезти, чтобы попасть в такую гонку, но это возможно и это не баг. Как это происходит:

  • thenApply проверяет значение поля result, оно равно null;

  • thenApply добавляет инстанс UniApply в stack;

  • далее thenApply снова проверяет значение result и оно оказывается не null. В теории может оказаться, что мы зря меняли stack и что наш UniApply никто больше не обработает (параллельный вызов complete мог к этому моменту уже завершить свою работу).
    Мы в состоянии гонки и обязаны гарантировать, что какой-нибудь поток обработает наш UniApply, а единственный способ это гарантировать - сделать всю работу самостоятельно. Итого - мы начинаем пробегаться по стеку и выполнять операции, которые мы там найдём (в том числе и только что добавленный UniApply), как мы бы делали при вызове get с ожиданием;

  • в числе этих операций вполне может оказаться чья-то зависимая фьюча, код операции для которой мы начнём исполнять.

Пожалуй, последний подход можно заставить работать, если вызов.thenApply(Function.identity()) сделать гарантированно до того, как фьюча будет закомпличена. Например, сразу после инстанцирования. В противном случае остаётся риск непредсказуемого треша.

6. В каком порядке выполняются независимые операции

Итак. Надеюсь мы разобрались в каком потоке будут выполняться операции. Что теперь можно сказать о порядке выполнения?

Ясно одно - зависимые операции выполняются в порядке, диктуемом зависимостями, тут альтернатив быть не может. Рассмотрим независимые операции.

CompletableFuture<Object> f = new CompletableFuture<>();

f.whenComplete((o, throwable) -> System.out.println("Closure 1"));
f.whenComplete((o, throwable) -> System.out.println("Closure 2"));

f.complete("Result");

В теории, у нас есть выбор между 3-мя вариантами вывода:
1. Closure 1, затем Closure 2
2. Closure 2, затем Closure 1
3. Порядок не детерминирован

Наверняка чья-та интуиция скажет, что должно быть 1. На практике справедлив вариант 2. Это легко объяснить себе, если помнить, что внутри CompletableFuture находится стек.

// stack: null
CompletableFuture<Object> fut = new CompletableFuture<>();

// stack: UniWhenComplete(closure 1) -> null
fut.whenComplete((o, throwable) -> System.out.println("Closure 1"));

// stack: UniWhenComplete(closure 2) -> UniWhenComplete(closure 1) -> null
fut.whenComplete((o, throwable) -> System.out.println("Closure 2"));

// Здесь мы проходим по стеку и выполняем операции.
fut.complete("Result");

Стек, как известно, работает по принципу LIFO (last in, first out). Этим и обусловлен порядок обработки. Каждый поток, завершающий фьючу (или помогающий завершению), выполняет примерно следующий цикл. Это ненастоящий код, я набросал его для примера:

while (stack != null) {
    Completion c = removeHead();

    if (c == null) break;

    executeCompletionOperation(c);
}

Если выполнением операций / завершением зависимых фьючей занимаются несколько потоков, каждый из них будет читать из одного и того же стека. В рамках каждого отдельно взятого потока как минимум не должно быть переупорядочивания операций относительно того, как они удалялись из стека.

Далее, если исходить из предположения, что мы не пользуемся orTimeout, не вызываем по несколько раз complete* и не вызываем get/join на незавершённых фьючах и т.д., то кажется, что порядок выполнения операций нам известен и мы можем на него полагаться. Стек так стек, пусть выполняются в порядке, обратном порядку добавления.

Нет. Если что из этой статьи и стоит извлечь, так это то, что доверять интуиции - опасно. Если в документации не написано, что что-то должно выполняться в конкретном порядке, то оно, скорее всего, будет выполняться в каком угодно порядке. Не всегда в том, в котором это ожидают разработчики, это и означает слово "независимые". Полагаться на недокументированное поведение - нехорошо. Контрпример:

CompletableFuture<Object> fut = new CompletableFuture<>();

CompletableFuture<Object> dep = fut.thenApply(Function.identity());

dep.whenComplete((o, throwable) -> System.out.println("Closure 1"));
dep.whenComplete((o, throwable) -> System.out.println("Closure 2"));

fut.complete("Result");

// Вывод будет следующий:
// > Closure 1
// > Closure 2

Стоило добавить некоторую косвенность, как порядок выполнения операций изменился на противоположный. Но ведь это стек, как, а главное зачем мы выполняем операции начиная с хвоста? Как же LIFO?

Правду можно узнать только в одном источнике - в исходном коде. Разбором стека и выполнением находящихся в нём операций занимается метод postComplete(). Реализация у него следующая:

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; CompletableFuture.Completion h;
    while ((h = f.stack) != null ||
            (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; CompletableFuture.Completion t;
        if (STACK.compareAndSet(f, h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                NEXT.compareAndSet(h, t, null); // try to detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

Авторские именования и форматирование сохранены. Сложно? На самом деле не особо. Данный метод итерируется по стеку текущей фьючи, а так же по стекам всех её зависимых фьючей. И их зависимых фьючей. И их.... но не рекурсивно. Это просто обход дерева зависимостей.

А именно - это поиск в глубину. В качестве стека для хранения встреченных узлов пере-используется то же поле stack фьючи source (т.е. this). Именно здесь может произойти разворачивание стека зависимой фьючи. То есть как "может" - оно обязательно произойдёт, при вызове pushStack.
compareAndSet на строке с комментарием // try to detach нужен для облегчения работы GC и на алгоритм никак не влияет.

Так же стоит понимать, что разворачивание стека может произойти только один раз. Если в примере выше добавить ещё один уровень в дерево зависимостей, создав фьючу dep как

CompletableFuture<Object> dep = fut
        .thenApply(Function.identity())
        .thenApply(Function.identity());

то это уже не повлияет на порядок выполнения зависимых операций. В любом случае, несложно представить, насколько хаотичным может оказаться порядок исполнения всего этого в многопоточной среде. Тут я уже эксперименты проводить не буду.

7. Что на самом деле делают методы get/join

Они ждут. Что вообще происходит, когда поток "ждёт"?

Самый простой вариант - поток буквально переходит в режим ожидания. Делается это методами Unsafe#park / Unsafe#unpark. В каком-то смысле это было бы эквивалентно следующему коду:

Thread thread = Thread.currentThread();
fut.whenComplete((res, throwable) -> LockSupport.unpark(thread));

LockSupport.park();

// Есть и такой метод.
Object res = fut.getNow(null);

Реализация методов get / join действительно делает что-то похожее, она добавляет в stack инстанс класса CompletableFuture.Signaller. Он работает как зависимая фьюча, только без самой фьючи. Операцией, которую он реализует, является пробуждение ждущего потока.

Данное поведение мы наблюдаем в случаях, когда ожидание выполняется не в ForkJoinPool-е. В противном случае мы можем начать выполнять чьи-то ForkJoinTask-и (см. ForkJoinPool#helpAsyncBlocker). Если эти таски очень маленькие, то ничего страшного, а если нет - ожидание может затянуться. По этой же причине метод get, в который передали лимит по времени, может этот лимит заметно превысить, если вдруг нарвётся на особенно длинную таску.

Есть второй способ потенциально превысить лимит по времени - использовать виртуальные потоки. В приведённом выше коде я не использовал Unsafe напрямую. Всё потому, что у виртуальных потоков свой механизм парковки, и класс LockSupport его учитывает. Не буду никого обманывать, я не до конца его пока что понимаю. Но очень похоже, что в момент парковки может начать выполняться код с другого Java потока, но на том же потоке операционной системы. И вот когда этот код закончит исполняться - непонятно, т.е. ситуация похожа на ту, что происходит с ForkJoinPool. С другой стороны, раз я не до конца разобрался, то это всё спекуляции с моей стороны.

Ну и напоследок напомню, что get / join могут комплитить зависимые фьючи, это тоже занимает какое-то время.

8. Исключения

С классом CompletableFuture ассоциирован ряд исключений:

  • ExecutionException используется только в методах get и нужен для совместимости с интерфейсом Future.

  • CancellationException - используется методами cancel, isCancelled и для определения состояния State.CANCELLED.

  • TimeoutException - случается, если пользоваться методом orTimeout. Особого статуса не имеет.

  • CompletionException - используется почти везде.

Первое исключение проще всего понять через описание поведения метода get для фьючи, завершённой исключением:

  • Если фьюча завершена с CancellationException, то get выбросит ExecutionException, в качестве причины указав тот же инстанс CancellationException.

  • Иначе, если фьюча завершена с CompletionException и его cause равен null, то get выбросит ExecutionException, в качестве причины указав тот же инстанс CompletionException.

  • Иначе, если фьюча завершена с CompletionException и его cause не равен null, то get выбросит ExecutionException, в качестве причины указав тот же cause.

  • Иначе, когда фьюча завершена с исключением e, get выбросит ExecutionException с причиной e.

Здесь хорошо видно, что CancellationException обрабатывается особым образом.join ведёт себя намного проще:

  • Если фьюча завершена с CancellationException или CompletionException, то join выбросит его же.

  • Иначе join выбросит CompletionException, в качестве причины указав инстанс исключения, с которым фьюча была завершена.

Если закомплитить фьючу с исключением var foo = new RuntimeException("Foo"), то join выбросит new CompletionException(foo). Если закомплитить с исключением new CompletionException(foo), то метод exceptionNow, к примеру, вернёт просто инстанс foo. Выглядит так, как будто API нас намеренно путает и мешает понять, кто конкретно создал CompletionException, который наблюдается при вызове join.

Есть ли способ точно понять, какое исключение передали в completeExceptionaly? Да, есть. Тот самый реальный гвоздь инстанс исключения будет передан в параметры функции из whenComplete* / handle*. Поэтому внутри таких операций при обработке ошибок лучше проверять как инстанс самого исключения, так и причину CompletionException-а, если вдруг пришёл именно он. В причине может быть как раз то, что вы ищете.

Может ли откуда-то в параметрах лямбды в whenComplete взяться CompletionException, если мы его никогда сами не будем создавать? Да, может. Зависимые фьючи всегда завершаются с инстансом CompletionException, если их исходные фьючи завершены с исключением, либо же если исключение возникло при выполнении самой зависимой операции (Function / Consumer / BiFunction / BiConsumer / ...). Отсюда есть интересное следствие.

CompletableFuture<Object> fut = new CompletableFuture<>();
CompletableFuture<Object> dep = fut.thenApply(Function.identity());

fut.cancel(false);

// > false
System.out.println(dep.isCancelled());

cancel не распространяется на зависимые фьючи, вместо этого он превращается в обычный, совсем уже ничем не примечательный CompletionException.

9. anyOf

Если посмотреть на наследников CompletableFuture.Completion, связанных с зависимыми фьючами, то почти все они будут попадать в одну из категорий: Uni*, Bi* или Or*. У первых - одна source фьюча, у вторых и третьих - две. И только класс AnyOf стоит особняком - там сразу целый массив.

Перед тем, как с ним разбираться, предлагаю вернуться немного назад и разобраться в том, что такое "2 source фьючи". Сделаем это на примере метода fut.thenAcceptBoth(other, action). other - это другая фьюча, а action - это java.util.function.BiConsumer .

Действие сработает только тогда, когда и fut, и other будут завершены. А это значит, что триггером к завершению созданной зависимой фьючи может быть как fut, так и other (зависит от порядка их завершения). У них обоих в поле stack должно что-то появиться. Причём это не может быть один и тот же объект Completion, поскольку он сам является узлом стека (оптимизация), а значит не может явным образом находиться в двух стеках одновременно. Выход простой - в fut.stack положить c = new BiAccept(...), а в other.stack - new CoCompletion(c). Второй объект - это буквально прокси на первый.

Метод thenAcceptBoth эквивалентен операции and. Обе source фьючи завершаются, завершают зависимую фьючу и чистят свои стеки. Все просто и красиво. Не так красиво всё будет с операцией or, которую должен выполнить anyOf. Для неё достаточно, чтобы хотя-бы одна из source фьючей завершилась. Остальные же могут вообще никогда не завершиться, и это нормально. Кстати, CoCompletion для AnyOf не используется.

Что случается, когда мы пишем что-то в stack фьючи, которая потенциально никогда не завершится? Мы получаем потенциальную утечку памяти! Но одно дело, когда созданная зависимая фьюча тоже не завершена - это даже и не утечка, а просто зависимость, которая ждёт своего часа, чтобы исчезнуть. Другое дело когда зависимая фьюча уже завершена, как может быть в случае с anyOf.

Если заглянуть в код AnyOf#tryFire, то будет видно, как эта проблема решается на практике. Весь секрет в вызове CompletableFuture#cleanStack на всех source фьючах, кроме той, которая прямо сейчас занята завершением зависимой фьючи. Например, если у нас был anyOf(a, b, c) и кто-то завершил фьючу a, то при этом сработают b.cleanStack() и c.cleanStack().

cleanStack - метод интересный. Он нужен для того, чтобы удалить из стека ровно один инстанс Completion, который соответствует какой-то завершённой операции. Не обязательно тот, который мы сами туда положили.

В примере выше нам известно, что вызов anyOf(a, b, c) добавил в b.stack элемент AnyOf. И мы знаем, что этот самый AnyOf никогда не приведёт к завершению зависимой фьючи. Если мы его удалим - всё будет хорошо. Что будет если мы удалим не этот инстанс AnyOf, а чей-то другой объект, соответствующий завершённой зависимой фьюче? На самом деле ничего страшного, поскольку количество вызовов cleanStack должно соответствовать количеству мусорных операций, которые нужно из стека рано или поздно удалить. Оперируя исключительно количеством, можно достичь большей скорости удаления, чем если бы каждая зависимая фьюча искала свой конкретный инстанс.

10. allOf

allOf - это второй vararg метод в классе CompletableFuture. И устроен он немного сложнее, чем anyOf, достаточно посмотреть в реализацию.

Он создаёт зависимую фьючу, у которой тоже много исходных фьючей. Но при этом не существует соответствующего ему класса CompletableFuture.AllOf, хранящего в поле весь массив. Вместо этого есть класс CompletableFuture.BiRelay. Почему Bi*?

Всё дело в том, что зависимая фьюча тут не одна, их много, и все они выстраиваются в бинарное дерево зависимостей, которое ещё и сбалансировано:

                   dep5
                /       \
          dep3            dep4
        /      \          /  \ 
   dep1         dep2   src5  src6
   /  \         /  \ 
src1  src2   src3  src4

Если завершаются src5 и src6 - они тригерят завершение dep4.

Если завершаются dep1 и dep2 - они тригерят завершение dep3. Если при этом dep4 уже завершена, то каскадно тригерится завершение dep5.

Стоит уточнить, что уже закомпличенные фьючи в дерево могут и не попасть, алгоритм построения вправе их пропустить. Учитывая при этом исключения, конечно же.

Балансировка дерева гарантирует, что при завершении любой из src*, в итоге будет закомпличено не более, чем O(log N) зависимых dep* фьючей, где N - количество фьючей, переданных в allOf. Это позволяет нам считать, что накладные расходы невелики. Не нужно проверять все N фьючей чтобы узнать, завершены они все или нет. Но и цена у этого есть - аллокация большего числа объектов, чем это необходимо в теории. На мой взгляд, обмен вполне справедливый.

Вероятно, у многих возникал вопрос - почему нет методов, которые принимали бы в качестве параметра не массив, а Collection, например. Ведь anyOf внутри себя всё равно делает args.clone(), т.е. перекладывает исходные значения в новый массив, а allOf вообще не нуждается в ссылке на какую-либо коллекцию. В нём каждый узел имеет ссылки на 2 элемента, представленные двумя полями.

У меня точно возникал. И я был бы рад дать на него ответ, но не могу. На самом деле оба метода anyOf и allOf без особых усилий можно было бы реализовать с параметрами-коллекциями.

Написать свою anyOf, аналогичную уже имеющейся, не выйдет, поскольку это единственный метод, позволяющий сделать зависимую фьючу с более чем двумя исходными. Зато построение дерева для allOf можно сделать руками с помощью thenCombine, например.

Вот вам мой вариант с Iterable в качестве параметра. Количество элементов заранее неизвестно. Балансировка дерева пусть не идеальная, но высота дерева всё равно ограничена O(log N). Можно брать и пользоваться.

public static CompletableFuture<Void> myAllOf(Iterable<CompletableFuture<?>> futs) {
    var stack = new ArrayList<CompletableFuture<?>>();

    int i = 0;
    for (CompletableFuture<?> fut : futs) {
        if (fut.isDone()) {
            if (!fut.isCompletedExceptionally()) {
                continue;
            }
        }

        for (int k = i; (k & 1) != 0; k >>>= 1) {
            fut = stack.removeLast().thenCombine(fut, (l, r) -> null);
        }

        stack.add(fut);
        i++;
    }

    if (i == 0) {
        return CompletableFuture.completedFuture(null);
    }

    CompletableFuture<?> fut = stack.removeLast();

    while (!stack.isEmpty()) {
        fut = stack.removeLast().thenCombine(fut, (l, r) -> null);
    }

    return fut.thenApply(v -> null);
}

Комментировать происходящую тут битовую магию не буду, статья и без того большая. Думаю интереснее будет самостоятельно разобраться с алгоритмом построения дерева. К сожалению, пришлось задействовать дополнительную память в виде стека, но это не страшно, потому что число элементов в нём не будет большим (ограничено высотой результирующего дерева).

Если бы данный алгоритм был реализован в классе CompletableFuture, дополнительный стек вообще бы не пришлось выделять, ведь его можно было бы строить исключительно на создаваемых инстансах BiRelay, хорошо так экономя память. Чуть-чуть сложно, но более чем осуществимо.

11. Исходный код Дага Ли

Сложная часть на этом закончилась. Предлагаю напоследок немного расслабиться.

Если вы хоть раз открывали исходники CompletableFuture, или многих других классов в java.util.concurrent, то наверняка заметили, что они оформлены не по канонам программирования на Java. Постоянные присваивания внутри условий, короткие имена переменных, очень переусложнённые циклы (CompletableFuture#waitingGet) и минимум комментариев. Если бы я попробовал так код написать, меня бы на ревью зачмырили.

Пока я читал исходники, я пришёл к двум выводам. Во-первых, если бы этот класс писали в более привычном всем нам стиле, то код был бы не три тысячи строк, а все десять или даже больше. А так - каждый метод помещается на экран и сложен ровно настолько, чтобы ты мог удержать его в голове. Не факт, что с длинными именами и более прямолинейными конструкциями было бы лучше. А ещё и производительность могла бы пострадать... В подобных низкоуровневых структурах быстро привыкаешь к чтению странного кода, на самом деле, и даже начинаешь его ценить.

Во-вторых, короткие имена переменных - не большая проблема, ведь именование по всему классу используется одно и то же. В основном используются следующие имена:

  • a - source

  • b - second source

  • c, q - completion (потому что и c, и q - это K, ну вы поняли)

  • d - dependent

  • e - executor

  • f - function

  • h - head

  • r - result, как правило равен a.result

  • s - second result, как правило равен b.result

  • t, x - throwable

Для некоторых из этих букв попадаются и другие использования, например иногда f - это future, а t - это tail. Я не упомянул огромное множество чуть более редких имён, ограничившись наиболее популярными, но идея у них всех та же. Может быть этот список поможет вам быстрее понять код, если вдруг решите его почитать. Он правда не настолько сложен, как кажется на первый взгляд.

Так же, пусть в коде методов и мало комментариев, javadoc над классом и Overview комментарий в классе - отличные и советуются к обязательному ознакомлению. Тем более, что они ещё и достаточно короткие.

12. Заключение

CompletableFuture - источник сильной боли для программистов. Хотя сложно говорить за всех, наверное стоит говорить только за себя. Будь то legacy исключения, или невозможность понять какой поток что делает - всё это заставляет меня грустить. Так что данная статья была написана не только для читающих. Врага нужно знать в лицо, и при написании я тоже узнал кое-что новое.

А если перестать драматизировать, то можно заметить, что CompletableFuture - офигенная штука, просто обращаться с ней нужно осторожно. Конечно, я затронул не все темы, но нужно было на чём то остановиться. Надеюсь приведённого материала достаточно для ознакомления, а ответы на все оставшиеся вопросы вы уже сможете найти самостоятельно.

Большое спасибо, что дочитали до конца!

Tags:
Hubs:
Total votes 36: ↑36 and ↓0+36
Comments27

Articles