Leitfaden zu Akka Streams

1. Übersicht

In diesem Artikel werden wir uns die Akka-Streams- Bibliothek ansehen , die auf dem Akka-Actor-Framework basiert und dem Manifest für reaktive Streams entspricht. Mit der Akka Streams-API können wir auf einfache Weise Datentransformationsflüsse aus unabhängigen Schritten zusammenstellen.

Darüber hinaus erfolgt die gesamte Verarbeitung reaktiv, nicht blockierend und asynchron.

2. Maven-Abhängigkeiten

Um zu beginnen, müssen wir die Bibliotheken akka-stream und akka-stream-testkit zu unserer pom.xml hinzufügen :

 com.typesafe.akka akka-stream_2.11 2.5.2   com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. Akka Streams API

Um mit Akka Streams arbeiten zu können, müssen wir die wichtigsten API-Konzepte kennen:

  • Quelle - der Einstiegspunkt für die Verarbeitung in der Akka-Stream- Bibliothek - Wir können eine Instanz dieser Klasse aus mehreren Quellen erstellen. Beispielsweise können wir die single () -Methode verwenden, wenn wir eine Quelle aus einem einzelnen String erstellen möchten, oder wir können eine Quelle aus einer Iterable von Elementenerstellen
  • Flow - der Hauptverarbeitungsbaustein - jede Flow- Instanz hat einen Eingabe- und einen Ausgabewert
  • Materializer - Wir können einen verwenden, wenn unser Flow einige Nebenwirkungen wie das Protokollieren oder Speichern von Ergebnissen haben soll . Am häufigsten übergeben wir den NotUsed- Alias ​​als Materializer , um anzuzeigen , dass unser Flow keine Nebenwirkungen haben sollte
  • Sink- Operation - Wenn wir einen Flow erstellen, wird dieser erst ausgeführt, wenn wir eine Sink- Operation darauf registrieren. Dies ist eine Terminal-Operation, die alle Berechnungen im gesamten Flow auslöst

4. Erstellen von Flows in Akka-Streams

Beginnt sie mit einem einfachen Beispiel Gebäude, in dem wir zeigen , wie man schaffen und mehr kombiniert Fluss s - einen Strom von ganzen Zahlen zu verarbeiten und die durchschnittlichen Bewegungsfenster von Integer - Paaren aus dem Strom berechnen.

Wir werden ein Semikolon getrennte analysieren String von ganzen Zahlen als Eingabe für unsere erstellen akka-Stream - Quelle für das Beispiel.

4.1. Verwenden eines Flusses zum Analysieren der Eingabe

Lassen Sie uns zunächst erstellen DataImporter Klasse , die eine Instanz des nehmen ActorSystem , dass wir später unsere erstellen verwenden Fluss :

public class DataImporter { private ActorSystem actorSystem; // standard constructors, getters... }

Als nächstes erstellen wir eine parseLine- Methode, die aus unserer begrenzten Eingabezeichenfolge eine Liste von Ganzzahlen generiert . Beachten Sie, dass wir die Java Stream-API hier nur zum Parsen verwenden:

private List parseLine(String line) { String[] fields = line.split(";"); return Arrays.stream(fields) .map(Integer::parseInt) .collect(Collectors.toList()); }

Unser anfänglicher Flow wendet parseLine auf unsere Eingabe an, um einen Flow mit dem Eingabetyp String und dem Ausgabetyp Integer zu erstellen :

private Flow parseContent() { return Flow.of(String.class) .mapConcat(this::parseLine); }

Wenn wir die parseLine () -Methode aufrufen , weiß der Compiler, dass das Argument für diese Lambda-Funktion ein String ist - genau wie der Eingabetyp für unseren Flow .

Beachten Sie, dass wir das verwenden mapConcat () Methode - das entspricht der Java 8 flatMap () Methode - weil wir die abflachen wollen Liste von Integer zurück von parseLine () in einen Fluss von Integer , so dass nachfolgende Schritte in unserer Verarbeitung nicht brauchen mit der Liste umgehen .

4.2. Verwenden eines Flusses zum Durchführen von Berechnungen

Zu diesem Zeitpunkt haben wir unseren Fluss von analysierten ganzen Zahlen. Jetzt müssen wir eine Logik implementieren, die alle Eingabeelemente in Paare gruppiert und einen Durchschnitt dieser Paare berechnet .

Jetzt erstellen wir einen Fluss von Ganzzahlen und gruppieren sie mit der grouped () -Methode .

Als nächstes wollen wir einen Durchschnitt berechnen.

Da wir nicht an der Reihenfolge interessiert sind, in der diese Mittelwerte verarbeitet werden, können wir Durchschnittswerte mithilfe mehrerer Threads parallel berechnen lassen, indem wir die mapAsyncUnordered () -Methode verwenden und die Anzahl der Threads als Argument an diese Methode übergeben.

Die Aktion, die als Lambda an den Flow übergeben wird, muss eine CompletableFuture zurückgeben, da diese Aktion im separaten Thread asynchron berechnet wird:

private Flow computeAverage() { return Flow.of(Integer.class) .grouped(2) .mapAsyncUnordered(8, integers -> CompletableFuture.supplyAsync(() -> integers.stream() .mapToDouble(v -> v) .average() .orElse(-1.0))); }

Wir berechnen Durchschnittswerte in acht parallelen Threads. Beachten Sie, dass wir die Java 8 Stream-API zur Berechnung eines Durchschnitts verwenden.

4.3. Zusammenstellen mehrerer Flows zu einem einzigen Flow

Die Flow- API ist eine fließende Abstraktion, mit der wir mehrere Flow- Instanzen erstellen können , um unser endgültiges Verarbeitungsziel zu erreichen . Wir können granulare Flows haben, bei denen einer beispielsweise JSON analysiert , ein anderer eine Transformation durchführt und ein anderer Statistiken sammelt.

Diese Granularität hilft uns, besser testbaren Code zu erstellen, da wir jeden Verarbeitungsschritt unabhängig testen können.

Wir haben oben zwei Flows erstellt, die unabhängig voneinander arbeiten können. Jetzt wollen wir sie zusammen komponieren.

Zuerst möchten wir unseren Eingabe- String analysieren und als nächstes einen Durchschnitt für einen Strom von Elementen berechnen.

Wir können unsere Flows mit der Methode via () zusammenstellen :

Flow calculateAverage() { return Flow.of(String.class) .via(parseContent()) .via(computeAverage()); }

Wir haben einen Flow mit dem Eingabetyp String und zwei weiteren Flows danach erstellt. Der parseContent () Flow verwendet eine String- Eingabe und gibt eine Ganzzahl als Ausgabe zurück. Die computeAverage () Fließ nimmt dass Integer und berechnet einen Mittelwert zurückzukehrDoppel als Ausgabetyp.

5. Hinzufügen von Sink zum Flow

Wie bereits erwähnt, wird der gesamte Flow bis zu diesem Punkt noch nicht ausgeführt, da er faul ist. Um die Ausführung des Flusses zu starten, müssen wir eine Senke definieren . Der Sink- Vorgang kann beispielsweise Daten in einer Datenbank speichern oder Ergebnisse an einen externen Webdienst senden.

Angenommen, wir haben eine AverageRepository- Klasse mit der folgenden save () -Methode, die Ergebnisse in unsere Datenbank schreibt:

CompletionStage save(Double average) { return CompletableFuture.supplyAsync(() -> { // write to database return average; }); }

Jetzt möchten wir eine Sink- Operation erstellen , die diese Methode verwendet, um die Ergebnisse unserer Flow- Verarbeitung zu speichern . Um unsere Senke zu erstellen , müssen wir zuerst einen Flow erstellen , der das Ergebnis unserer Verarbeitung als Eingabetyp verwendet . Als nächstes möchten wir alle unsere Ergebnisse in der Datenbank speichern.

Auch hier ist uns die Reihenfolge der Elemente egal, daher können wir die save () - Operationen parallel mit der mapAsyncUnordered () -Methode ausführen .

Um eine Senke aus dem Flow zu erstellen, müssen wir toMat () mit Sink.ignore () als erstem Argument und Keep.right () als zweitem aufrufen, da wir einen Status der Verarbeitung zurückgeben möchten:

private Sink
    
      storeAverages() { return Flow.of(Double.class) .mapAsyncUnordered(4, averageRepository::save) .toMat(Sink.ignore(), Keep.right()); }
    

6. Definieren einer Quelle für den Fluss

Das letzte , was wir tun müssen , um zu einer erstellen Quelle aus dem Eingang String . Wir können mit der Methode via () einen berechneAverage () Flow auf diese Quelle anwenden .

Um den Sink zur Verarbeitung hinzuzufügen , müssen wir die Methode runWith () aufrufen und den soeben erstellten Sink storeAverages () übergeben :

CompletionStage calculateAverageForContent(String content) { return Source.single(content) .via(calculateAverage()) .runWith(storeAverages(), ActorMaterializer.create(actorSystem)) .whenComplete((d, e) -> { if (d != null) { System.out.println("Import finished "); } else { e.printStackTrace(); } }); }

Note that when the processing is finished we are adding the whenComplete() callback, in which we can perform some action depending on the outcome of the processing.

7. Testing Akka Streams

We can test our processing using the akka-stream-testkit.

The best way to test the actual logic of the processing is to test all Flow logic and use TestSink to trigger the computation and assert on the results.

In our test, we are creating the Flow that we want to test, and next, we are creating a Source from the test input content:

@Test public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() { // given Flow tested = new DataImporter(actorSystem).calculateAverage(); String input = "1;9;11;0"; // when Source flow = Source.single(input).via(tested); // then flow .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem)) .request(4) .expectNextUnordered(5d, 5.5); }

We are checking that we are expecting four input arguments, and two results that are averages can arrive in any order because our processing is done in the asynchronous and parallel way.

8. Conclusion

In this article, we were looking at the akka-stream library.

We defined a process that combines multiple Flows to calculate moving average of elements. Then, we defined a Source that is an entry point of the stream processing and a Sink that triggers the actual processing.

Finally, we wrote a test for our processing using the akka-stream-testkit.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.