Объект Future, который представляет асинхронную задачу, позволяет получить результат задачи с помощью метода get(), который приведет
к блокировки текущего потока до тех пор, пока результат асинхронной задачи не станет доступен. Класс CompletableFuture, который реализует интерфейс Future,
предоставляет альтернативный механизм для получения результата. CompletableFuture использует функцию обратного вызова, которая вызвается с результатом асинхронной задачи, когда он станет доступен.
Можно сказать, что CompletableFuture - это Future, который может быть явно завершен (с установкой его значения и статуса)
Кроме того, CompletableFuture реализует интерфейс CompletionStage, который представляет этап асинхронного вычисления и выполняет действие или вычисляет значение после
завершения другого этапа. Можно сказать, что CompletableFuture позволяет реализовать нечто похожее на промисы (promise) в некоторых языках программирования (как, например, в JavaScript)
Для непосредственного создания объекта CompletableFuture можно использовать конструктор без параметров:
CompletableFuturefuture = new CompletableFuture ();
Класс CompletableFuture<T> является обобщенным и типизируется типом результата. Так, в коде выше предполагается, что результатом асинхронной задачи будет значение типа Integer.
После создания объекта с помощью конструктора его можно настроить на выполнение какой-либо задачи. Однако класс CompletableFuture также представляет ряд методов, которые позволяют создать объект
данного класса с уже добавленной асинхронной задачей, которую следует выполнить. Например, это статический метод CompletableFuture<U>.supplyAsync():
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Этот метод имеет две версии. Обе версии в качестве результата возвращает новый объект CompletableFuture. И обе этих версии в качестве первого параметра принимают
объект Supplier, который и представляет выполняемое действие. Данный интерфейс является функциональным и имеет один метод get(), который возвращает некоторое значение
interface Supplier<T>{
T get();
}
Результат метода get() затем передается в обратный вызов - асинхронную задачу, которая и обрабатывает данный результат и которая устанавливается с помощью дополнительных методов.
Различие между этими обеими версиями состоит лишь в том, как именно выполняется асинхронная задача. Во второй версии вторым параметром передается объект Executor, который и выполняет задачу.
Для первой же версии асинхронная задача выполняется с помощью потока из пула ForkJoinPool (ForkJoinPool представляет специализированный пул потоков, разработанный для параллельного программирования). То есть при выполнении задач
вы можете положиться на ForkJoinPool, либо для большего контроля предоставить свой объект Executor, который выполнит задачу.
Рассмотрим на простейшем примере вычисления факториала:
int number = 5; // исходное число для вычисления факториала
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// если число меньше 1, генерируем исключение
if(number < 1) throw new RuntimeException("Number must be greater than 0");
int n = 1;
int result = 1;
while(n <= number) result *= n++;
return result; // возвращаем результат задачи
});
В данном случае в метод CompletableFuture.supplyAsync() неявно передается объект Supplier в виде лямбда-выражения. Это лямбда-выражение берет внешнюю переменную number и вычисляет ее факториал и возвращает результат - это и будет результат метода
get(). Конечно, можно было бы и явным образом определить объект Supplier:
int number = 5; // исходное число для вычисления факториала
Supplier<Integer> task = () -> {
// если число меньше 1, генерируем исключение
if(number < 1) throw new RuntimeException("Number must be greater than 0");
int n = 1;
int result = 1;
while(n <= number) result *= n++;
return result; // возвращаем результат задачи
};
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(task);
Обратите внимание, что функция, которая представляет интерфейс Supplier, не может генерировать проверяемое исключение (checked excetion). Поэтому в примере выше для генерации исключения
при передаче некорректного числа используется тип RuntimeException.
После создания CompletableFuture и установки выполняемой задачи нам надо добавить обратный вызов, который будет вызываться при завершении CompletableFuture. Для этого в классе
определен большой набор методов. Приведу лишь некоторые:
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
Применяет функцию к результату и возвращает void.
<U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
Применяет функцию к результату.
CompletableFuture<Void> thenRun(Runnable action)
Выполняет Runnable с результатом типа void.
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
Вызывает функцию для результата и выполняет возвращаемый объект Future.
Например, используем метод thenAccept(), который принимает объект интерфейса Consumer. Этот интерфейс имеет метод accept(), который принимает некоторое значение:
void accept(T t)
Таким образом, обратный вызов должен представлять функцию, которая принимает значение произвольного типа, а в качестве возвращаемого типа испольует void
Рассмотрим на примере факториала:
import java.util.concurrent.*;
import java.util.function.Supplier;
class Program{
public static void main(String[] args) throws Exception{
System.out.println("Main thread started...");
int number = 5; // исходное число для вычисления факториала
// определяем задачу, которая вычисляет факториал
Supplier<Integer> task = () -> {
if(number < 1) throw new RuntimeException("Number must be greater than 0");
int n = 1;
int result = 1;
while(n <= number) {
result *= n++;
}
return result; // возвращаем результат задачи
};
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(task);
// передаем в thenAccept обратный вызов
future.thenAccept(result -> System.out.printf("factorial of %d is %d\n", number, result));
// future.thenAccept - не блокирует основной поток
// и одновременно мы можем выполнять в основном потоке некоторую работу
System.out.println("Main thread works...");
Thread.sleep(2000);
System.out.println("Main thread finished...");
}
}
Рассмотрим на передачу обратного вызова:
future.thenAccept(result -> System.out.printf("factorial of %d is %d\n", number, result));
Здесь в метод передается лямбда-выражение, в котором параметр result представляет результ асинхронной задачи - результат вычисления факториала. Тело лямбда-выражения же представляет вывод результата вычисления.
Если суммировать все по этапам, то программа будет выполняться следующим образом:
System.out.println("Main thread started...")
Основной поток (main) запускается и выводит это сообщение.
CompletableFuture
Передает ранее определенную задачу task (вычисление факториала) в общий пул потоков (по умолчанию ForkJoinPool.commonPool()). Эта задача начинает выполняться параллельно в другом потоке.
future.thenAcceptAsync(result -> System.out.printf("factorial of %d is %d\n", number, result))
Регистрирует обратный вызов (колбэк). Этот колбэк не выполняется немедленно.
Он ждет, пока задача task не завершится и не возвратит неокторый результат.
System.out.println("Main thread works...")
Основной поток не ждет завершения задачи task, а немедленно продолжает работу (в данном случае для
имитации работы выводим сообщение)
Выполнение задачи
В это время (main либо печатает "Main thread works...", либо уже спит) параллельный поток очень быстро вычисляет факториал 5 (это 120). Это занимает доли миллисекунды.
Выполнение колбэка
Сразу после завершения задачи task запускается зарегистрированный с помощью функции thenAccept обратный вызов
(обычно в том же потоке из пула) и выводит: factorial of 5 is 120.
Thread.sleep(2000);
Основной поток "спит" 2000 мс. К этому моменту колбэк почти наверняка уже отработал и вывел результат.
System.out.println("Main thread finished...")
Через 2 секунды основной поток просыпается и выводит сообщение, после чего программа завершается.
Наиболее вероятный порядок вывода будет таким:
Main thread started... Main thread works... factorial of 5 is 120 Main thread finished...
Таким образом, можно обрабатывать результат, как только он станет доступен, не блокируя основой поток.
Но в данном случае стоит отметить, что, поскольку потоки, используемые CompletableFuture по умолчанию (из пула ForkJoinPool.commonPool()), являются
потоками-демонами (daemon threads) - по сути фоновыми потоками, то JVM (Виртуальная машина Java) не ждет, пока они завершатся.
JVM работает до тех пор, пока жив хотя бы один поток, который не является "демоном" . В программе выше единственный поток, который не является демоном - это поток метода main.
Поэтому если убрать Thread.sleep, то поток main очень быстро выполнит методы supplyAsync() и thenAccept(), напечатает "Main thread works..." и
"Main thread finished..." и немедленно завершится.
Как только main завершается, JVM видит, что активных потоков-не-демонов не осталось, и принудительно завершает всю программу. И в этом случае асинхронная задача task и ее колбэк в thenAccept()
просто не успевают выполниться до завершения основного потока. Поэтому именно Thread.sleep(2000) в коде выше "удерживает" основной поток живым, давая асинхронной задаче достаточно времени,
чтобы выполниться и напечатать результат.
Вместо использования задержек мы могли бы явным образом определить объект Executor, который будет выполнять задачи. Например:
import java.util.concurrent.*;
import java.util.function.Supplier;
class Program{
public static void main(String[] args) throws Exception{
System.out.println("Main thread started...");
int number = 5; // исходное число для вычисления факториала
// определяем задачу, которая вычисляет факториал
Supplier<Integer> task = () -> {
if(number < 1) throw new RuntimeException("Number must be greater than 0");
int n = 1;
int result = 1;
while(n <= number) result *= n++;
// имитируем долгую работу
try {Thread.sleep(2000);}
catch(InterruptedException _){}
return result; // возвращаем результат задачи
};
// определяем объект Executor, который будет выполнять задачу
ExecutorService executor = Executors.newCachedThreadPool();
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(task, executor);
future.thenAccept(result -> System.out.printf("factorial of %d is %d\n", number, result));
// future.thenAccept - не блокирует основной поток
// и одновременно мы можем выполнять в основном потоке некоторую работу
System.out.println("Main thread works...");
System.out.println("Main thread finished...");
// закрываем исполнителя
executor.close();
}
}
В принципе здесь выполняются те же самые действия, только теперь я убрал задержку для основного потока и добавил аналогичную задержку в задачу по вычислению факториала, чтобы эта задача завершалась после всех действий в основном потоке метода main.
И также добавлено создание объекта Executor, который передается в метод supplyAsync():
ExecutorService executor = Executors.newCachedThreadPool(); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(task, executor);
В данном случае создаваемый объект Executor будет использовать один из платформенных потоков из пула. Причем далее этот Executor будет выполнять как задачу по вычислению факториала, так и коллбек, который выводит результат на консоль.
В итоге мы скорее всего получим следующий консольный вывод:
Main thread started... Main thread works... Main thread finished... factorial of 5 is 120
Поскольку в асинхронной задаче действует 2-секундная задержка, то скорее всего асинхронная задача факториала будет выполнена после вывода на консоль строки "Main thread finished..." в методе main.
Аналогичным образом мы можем применять и другие методы CompletedFuture для добавления обратных вызовов для обработки результатов. Более того мы можем обрабатывать результаты других обратных вызовов.
Например, возьмем метод thenApply():
<U> CompletableFuture<U> thenApply(Function<? super T, ? extends R> fn)
Параметром метода является объект интерфейса Function - по сути это некоторая функция, которая принимает объект типа T и возвращает объект типа R,
то есть преобразует результат типа T в объект типа R. Либо если быть точнее функция обратного вызова должна соответствовать методу apply() интерфейса Function:
R apply(T t)
Допустим, после вычисления факториала нам надо увеличить его в 2 раза:
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.function.Function;
import java.util.function.Consumer;
class Program{
public static void main(String[] args) throws Exception{
System.out.println("Main thread started...");
int number = 5; // исходное число для вычисления факториала
// определяем задачу, которая вычисляет факториал
Supplier<Integer> factorialTask = () -> {
if(number < 1) throw new RuntimeException("Number must be greater than 0");
int n = 1;
int result = 1;
while(n <= number) result *= n++;
return result;
};
// определяем задачу, которая увеличивает в два раза
Function<Integer, Integer> doubleTask = result -> result * 2;
// определяем задачу, которая выводит на консоль конечный результат
Consumer<Integer> printTask = result -> System.out.printf("Final result: %d\n", result);
// определяем объект Executor, который будет выполнять задачи
ExecutorService executor = Executors.newCachedThreadPool();
// определяем коллбеки
CompletableFuture
.supplyAsync(factorialTask, executor)
.thenApply(doubleTask)
.thenAccept(printTask);
System.out.println("Main thread works...");
System.out.println("Main thread finished...");
executor.close(); // закрываем исполнителя
}
}
Ключевой момент программы - это, конечно, определение цепочки выполняемых задач:
CompletableFuture
.supplyAsync(factorialTask, executor)
.thenApply(doubleTask)
.thenAccept(printTask);
Рассмотрим поэтапно:
supplyAsync(factorialTask, executor)
Определяет асинхронную задачу, которая выполняется - это задача factorialTask, которая вычисляет факториала. Эта задача представляет тип Supplier<Integer> и поэтому возвращает значение типа Integer
thenApply(doubleTask)
Применяет к результату предыдущей задачи (в нашем случае это вычисление факториала) функцию doubleTask. Эта задача представляет тип Function<Integer, Integer>. То есть функция
получает значение типа Integer (факториал числа - результат задачи factorialTask) и возвращает также значение типа Integer - удвоенное значение факториала.
thenAccept(printTask)
Применяет к результату предыдущей задачи (в нашем случае это результат задачи doubleTask) функцию printTask, которая представляет тип Consumer<Integer>. Эта задача
получает значение типа Integer (результат задачи doubleTask) и ничего не возвращает (фактически возвращает void) и просто выводит результат на консоль.
Консольный вывод программы:
Main thread started... Main thread works... Main thread finished... Final result: 240