Leitfaden für CompletableFuture

1. Einleitung

Dieses Tutorial ist eine Anleitung zu den Funktionen und Anwendungsfällen der CompletableFuture- Klasse, die als Verbesserung der Java 8 Concurrency-API eingeführt wurde.

2. Asynchrone Berechnung in Java

Asynchrone Berechnungen sind schwer zu begründen. Normalerweise möchten wir uns jede Berechnung als eine Reihe von Schritten vorstellen, aber im Fall einer asynchronen Berechnung sind Aktionen, die als Rückrufe dargestellt werden, entweder über den Code verteilt oder tief ineinander verschachtelt . Es wird noch schlimmer, wenn wir Fehler behandeln müssen, die während eines der Schritte auftreten können.

Die Future- Schnittstelle wurde in Java 5 hinzugefügt, um als Ergebnis einer asynchronen Berechnung zu dienen. Es gab jedoch keine Methoden, um diese Berechnungen zu kombinieren oder mögliche Fehler zu behandeln.

Java 8 führte die CompletableFuture- Klasse ein. Neben der Future- Schnittstelle wurde auch die CompletionStage- Schnittstelle implementiert . Diese Schnittstelle definiert den Vertrag für einen asynchronen Berechnungsschritt, den wir mit anderen Schritten kombinieren können.

CompletableFuture ist gleichzeitig ein Baustein und ein Framework mit etwa 50 verschiedenen Methoden zum Erstellen, Kombinieren und Ausführen asynchroner Berechnungsschritte und zum Behandeln von Fehlern .

Eine so große API kann überwältigend sein, aber diese fallen meist in mehrere klare und unterschiedliche Anwendungsfälle.

3. Verwenden von CompletableFuture als einfache Zukunft

Zunächst implementiert die CompletableFuture- Klasse die Future- Schnittstelle, sodass wir sie als Future- Implementierung verwenden können, jedoch mit zusätzlicher Abschlusslogik .

Beispielsweise können wir eine Instanz dieser Klasse mit einem Konstruktor ohne Argumente erstellen, um ein zukünftiges Ergebnis darzustellen, es an die Verbraucher weiterzugeben und es zu einem späteren Zeitpunkt mit der Methode complete abzuschließen . Die Verbraucher können die get- Methode verwenden, um den aktuellen Thread zu blockieren, bis dieses Ergebnis bereitgestellt wird.

Im folgenden Beispiel haben wir eine Methode, die eine CompletableFuture- Instanz erstellt, dann einige Berechnungen in einem anderen Thread auslöst und die Zukunft sofort zurückgibt .

Wenn die Berechnung abgeschlossen ist, vervollständigt die Methode die Zukunft, indem sie das Ergebnis für die vollständige Methode bereitstellt :

public Future calculateAsync() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }

Um die Berechnung abzuspalten, verwenden wir die Executor- API. Diese Methode zum Erstellen und Abschließen einer CompletableFuture kann zusammen mit jedem Parallelitätsmechanismus oder jeder API verwendet werden, einschließlich Raw-Threads.

Beachten Sie, dass die berechneAsync- Methode eine zukünftige Instanz zurückgibt .

Wir rufen einfach die Methode auf, empfangen die Future- Instanz und rufen die get- Methode auf, wenn wir bereit sind, das Ergebnis zu blockieren.

Beachten Sie auch, dass die get- Methode einige überprüfte Ausnahmen auslöst, nämlich ExecutionException (Kapselung einer Ausnahme, die während einer Berechnung aufgetreten ist) und InterruptedException (eine Ausnahme, die angibt, dass ein Thread, der eine Methode ausführt, unterbrochen wurde):

Future completableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);

Wenn wir das Ergebnis einer Berechnung bereits kennen , können wir die statische Methode completeFuture mit einem Argument verwenden, das ein Ergebnis dieser Berechnung darstellt. Folglich wird die get- Methode der Zukunft niemals blockieren und stattdessen sofort dieses Ergebnis zurückgeben:

Future completableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);

Als alternatives Szenario möchten wir möglicherweise die Ausführung einer Zukunft abbrechen .

4. CompletableFuture mit gekapselter Berechnungslogik

Mit dem obigen Code können wir jeden Mechanismus für die gleichzeitige Ausführung auswählen. Was ist jedoch, wenn wir dieses Boilerplate überspringen und einfach asynchron Code ausführen möchten?

Mit den statischen Methoden runAsync und supplyAsync können wir eine CompletableFuture- Instanz aus den Funktionstypen Runnable und Supplier entsprechend erstellen .

Sowohl Runnable als auch Supplier sind funktionale Schnittstellen, mit denen ihre Instanzen dank der neuen Java 8-Funktion als Lambda-Ausdrücke übergeben werden können.

Die Runnable- Schnittstelle ist dieselbe alte Schnittstelle, die in Threads verwendet wird, und es ist nicht möglich, einen Wert zurückzugeben.

Die Lieferantenschnittstelle ist eine generische Funktionsschnittstelle mit einer einzelnen Methode, die keine Argumente enthält und einen Wert eines parametrisierten Typs zurückgibt.

Auf diese Weise können wir eine Instanz des Lieferanten als Lambda-Ausdruck bereitstellen , der die Berechnung durchführt und das Ergebnis zurückgibt . Es ist so einfach wie:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());

5. Verarbeitungsergebnisse asynchroner Berechnungen

Die allgemeinste Methode, um das Ergebnis einer Berechnung zu verarbeiten, besteht darin, es einer Funktion zuzuführen. Die thenApply- Methode macht genau das; Es akzeptiert eine Funktionsinstanz , verwendet sie zur Verarbeitung des Ergebnisses und gibt eine Zukunft zurück , die einen von einer Funktion zurückgegebenen Wert enthält:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());

Wenn wir in der Future- Kette keinen Wert zurückgeben müssen , können wir eine Instanz der Consumer- Funktionsschnittstelle verwenden. Die einzelne Methode verwendet einen Parameter und gibt void zurück .

In CompletableFuture gibt es eine Methode für diesen Anwendungsfall . Die thenAccept- Methode empfängt einen Consumer und übergibt ihm das Ergebnis der Berechnung. Dann gibt der letzte Aufruf von future.get () eine Instanz vom Typ Void zurück :

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();

Wenn wir weder den Wert der Berechnung benötigen noch am Ende der Kette einen Wert zurückgeben möchten, können wir ein runnable Lambda an die thenRun- Methode übergeben. Im folgenden Beispiel drucken wir einfach eine Zeile in der Konsole, nachdem wir future.get () aufgerufen haben:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();

6. Futures kombinieren

The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.

The result of this chaining is itself a CompletableFuture that allows further chaining and combining. This approach is ubiquitous in functional languages and is often referred to as a monadic design pattern.

In the following example we use the thenCompose method to chain two Futures sequentially.

Notice that this method takes a function that returns a CompletableFuture instance. The argument of this function is the result of the previous computation step. This allows us to use this value inside the next CompletableFuture‘s lambda:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());

The thenCompose method, together with thenApply, implement basic building blocks of the monadic pattern. They closely relate to the map and flatMap methods of Stream and Optional classes also available in Java 8.

Both methods receive a function and apply it to the computation result, but the thenCompose (flatMap) method receives a function that returns another object of the same type. This functional structure allows composing the instances of these classes as building blocks.

If we want to execute two independent Futures and do something with their results, we can use the thenCombine method that accepts a Future and a Function with two arguments to process both results:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());

A simpler case is when we want to do something with two Futures‘ results, but don't need to pass any resulting value down a Future chain. The thenAcceptBoth method is there to help:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));

7. Difference Between thenApply() and thenCompose()

In our previous sections, we've shown examples regarding thenApply() and thenCompose(). Both APIs help chain different CompletableFuture calls, but the usage of these 2 functions is different.

7.1. thenApply()

We can use this method to work with a result of the previous call. However, a key point to remember is that the return type will be combined of all calls.

So this method is useful when we want to transform the result of a CompletableFuture call:

CompletableFuture finalResult = compute().thenApply(s-> s + 1);

7.2. thenCompose()

The thenCompose() method is similar to thenApply() in that both return a new Completion Stage. However, thenCompose() uses the previous stage as the argument. It will flatten and return a Future with the result directly, rather than a nested future as we observed in thenApply():

CompletableFuture computeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture finalResult = compute().thenCompose(this::computeAnother);

So if the idea is to chain CompletableFuture methods then it’s better to use thenCompose().

Also, note that the difference between these two methods is analogous to the difference between map() and flatMap().

8. Running Multiple Futures in Parallel

When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.

The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:

CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());

Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture. The limitation of this method is that it does not return the combined results of all Futures. Instead, we have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:

String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);

The CompletableFuture.join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally. This makes it possible to use it as a method reference in the Stream.map() method.

9. Handling Errors

For error handling in a chain of asynchronous computation steps, we have to adapt the throw/catch idiom in a similar fashion.

Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally).

In the following example, we use the handle method to provide a default value when the asynchronous computation of a greeting was finished with an error because no name was provided:

String name = null; // ... CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());

As an alternative scenario, suppose we want to manually complete the Future with a value, as in the first example, but also have the ability to complete it with an exception. The completeExceptionally method is intended for just that. The completableFuture.get() method in the following example throws an ExecutionException with a RuntimeException as its cause:

CompletableFuture completableFuture = new CompletableFuture(); // ... completableFuture.completeExceptionally( new RuntimeException("Calculation failed!")); // ... completableFuture.get(); // ExecutionException

In the example above, we could have handled the exception with the handle method asynchronously, but with the get method we can use the more typical approach of a synchronous exception processing.

10. Async Methods

Most methods of the fluent API in CompletableFuture class have two additional variants with the Async postfix. These methods are usually intended for running a corresponding step of execution in another thread.

The methods without the Async postfix run the next execution stage using a calling thread. In contrast, the Async method without the Executor argument runs a step using the common fork/join pool implementation of Executor that is accessed with the ForkJoinPool.commonPool() method. Finally, the Async method with an Executor argument runs a step using the passed Executor.

Here's a modified example that processes the result of a computation with a Function instance. The only visible difference is the thenApplyAsync method, but under the hood the application of a function is wrapped into a ForkJoinTask instance (for more information on the fork/join framework, see the article “Guide to the Fork/Join Framework in Java”). This allows us to parallelize our computation even more and use system resources more efficiently:

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());

11. JDK 9 CompletableFuture API

Java 9 enhances the CompletableFuture API with the following changes:

  • New factory methods added
  • Support for delays and timeouts
  • Improved support for subclassing

and new instance APIs:

  • Executor defaultExecutor()
  • CompletableFuture newIncompleteFuture()
  • CompletableFuture copy()
  • CompletionStage minimalCompletionStage()
  • CompletableFuture completeAsync(Supplier supplier, Executor executor)
  • CompletableFuture completeAsync(Supplier supplier)
  • CompletableFuture orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)

We also now have a few static utility methods:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • CompletionStage completedStage(U value)
  • CompletionStage failedStage(Throwable ex)
  • CompletableFuture failedFuture(Throwable ex)

Finally, to address timeout, Java 9 has introduced two more new functions:

  • orTimeout()
  • completeOnTimeout()

Here's the detailed article for further reading: Java 9 CompletableFuture API Improvements.

12. Conclusion

In diesem Artikel haben wir die Methoden und typischen Anwendungsfälle der CompletableFuture- Klasse beschrieben.

Der Quellcode für den Artikel ist auf GitHub verfügbar.