Java 9 Reactive Streams

1. Übersicht

In diesem Artikel werden wir uns die Java 9 Reactive Streams ansehen. Einfach ausgedrückt, können wir die Flow- Klasse verwenden, die die primären Bausteine ​​für die Erstellung der Logik zur Verarbeitung reaktiver Streams enthält.

Reactive Streams ist ein Standard für die asynchrone Stream-Verarbeitung mit nicht blockierendem Gegendruck. Diese Spezifikation ist im Reactive Manifesto definiert und es gibt verschiedene Implementierungen davon, zum Beispiel RxJava oder Akka-Streams.

2. Übersicht über die reaktive API

Um einen Flow zu erstellen , können wir drei Hauptabstraktionen verwenden und diese zu einer asynchronen Verarbeitungslogik zusammensetzen.

Jeder Flow muss Ereignisse verarbeiten, die von einer Publisher-Instanz für ihn veröffentlicht werden . Der Publisher hat eine Methode - subscribe ().

Wenn einer der Abonnenten von ihm veröffentlichte Ereignisse erhalten möchte, muss er den angegebenen Publisher abonnieren .

Der Empfänger von Nachrichten muss die Teilnehmerschnittstelle implementieren. In der Regel ist dies das Ende jeder Flow- Verarbeitung, da die Instanz keine weiteren Nachrichten sendet.

Wir können uns den Abonnenten als Waschbecken vorstellen. Dies hat vier Methoden, die überschrieben werden müssen - onSubscribe (), onNext (), onError () und onComplete (). Wir werden uns diese im nächsten Abschnitt ansehen.

Wenn wir eingehende Nachrichten transformieren und an den nächsten Abonnenten weiterleiten möchten , müssen wir die Prozessorschnittstelle implementieren. Dies fungiert sowohl als Abonnent, weil er Nachrichten empfängt, als auch als Herausgeber, weil er diese Nachrichten verarbeitet und zur weiteren Verarbeitung sendet.

3. Veröffentlichen und Konsumieren von Nachrichten

Angenommen, wir möchten einen einfachen Flow erstellen , in dem ein Publisher Nachrichten veröffentlicht und ein einfacher Abonnent Nachrichten verbraucht, sobald sie eintreffen - einzeln.

Erstellen wir eine EndSubscriber- Klasse. Wir müssen die Abonnentenschnittstelle implementieren. Als nächstes überschreiben wir die erforderlichen Methoden.

Die onSubscribe () -Methode wird aufgerufen, bevor die Verarbeitung beginnt. Die Instanz des Abonnements wird als Argument übergeben. Diese Klasse wird verwendet, um den Nachrichtenfluss zwischen dem Abonnenten und dem Herausgeber zu steuern :

public class EndSubscriber implements Subscriber { private Subscription subscription; public List consumedElements = new LinkedList(); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } }

Wir haben auch eine leere Liste der konsumierten Elemente initialisiert , die in den Tests verwendet werden.

Jetzt müssen wir die verbleibenden Methoden über die Abonnentenschnittstelle implementieren. Die Hauptmethode hier ist onNext () - dies wird immer dann aufgerufen, wenn der Publisher eine neue Nachricht veröffentlicht:

@Override public void onNext(T item) { System.out.println("Got : " + item); subscription.request(1); }

Beachten Sie, dass wir beim Starten des Abonnements in der onSubscribe () -Methode und beim Verarbeiten einer Nachricht die request () -Methode im Abonnement aufrufen müssen, um zu signalisieren, dass der aktuelle Abonnent bereit ist, weitere Nachrichten zu konsumieren.

Zuletzt müssen wir onError () implementieren, das aufgerufen wird, wenn eine Ausnahme in der Verarbeitung ausgelöst wird, sowie onComplete (), das aufgerufen wird, wenn der Publisher geschlossen wird:

@Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); }

Schreiben wir einen Test für den Verarbeitungsablauf . Wir werden die SubmissionPublisher- Klasse verwenden - ein Konstrukt aus java.util.concurrent - das die Publisher- Schnittstelle implementiert .

Wir werden N Elemente an den Publisher senden - die unser EndSubscriber erhalten wird:

@Test public void whenSubscribeToIt_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until( () -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(items) ); }

Beachten Sie, dass wir die Methode close () für die Instanz des EndSubscriber aufrufen. Es wird bei jedem Abonnenten des angegebenen Herausgebers der unten stehende Rückruf onComplete () aufgerufen .

Wenn Sie dieses Programm ausführen, wird die folgende Ausgabe erzeugt:

Got : 1 Got : x Got : 2 Got : x Got : 3 Got : x Done

4. Transformation von Nachrichten

Angenommen, wir möchten eine ähnliche Logik zwischen einem Publisher und einem Abonnenten erstellen , aber auch eine Transformation anwenden.

Wir werden das schaffen TransformProcessor Klasse, die Geräte - Prozessor und erweitert SubmissionPublisher - wie dies beide sein wird P ublisher und S ubscriber.

Wir übergeben eine Funktion , die Eingaben in Ausgaben umwandelt:

public class TransformProcessor extends SubmissionPublisher implements Flow.Processor { private Function function; private Flow.Subscription subscription; public TransformProcessor(Function function) { super(); this.function = function; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { submit(function.apply(item)); subscription.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { close(); } }

Lassen Sie uns jetzt einen schnellen Test schreiben mit einem Verarbeitungsablauf , bei dem der Verlag veröffentlicht String - Elemente.

Unser TransformProcessor analysiert den String als Ganzzahl - was bedeutet, dass hier eine Konvertierung erfolgen muss:

@Test public void whenSubscribeAndTransformElements_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); TransformProcessor transformProcessor = new TransformProcessor(Integer::parseInt); EndSubscriber subscriber = new EndSubscriber(); List items = List.of("1", "2", "3"); List expectedResult = List.of(1, 2, 3); // when publisher.subscribe(transformProcessor); transformProcessor.subscribe(subscriber); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expectedResult) ); }

Beachten Sie, dass beim Aufrufen der Methode close () auf dem Basis- Publisher die Methode onComplete () auf dem TransformProcessor aufgerufen wird.

Beachten Sie, dass alle Publisher in der Verarbeitungskette auf diese Weise geschlossen werden müssen.

5. Steuern der Nachfrage nach Nachrichten mithilfe des Abonnements

Angenommen, wir möchten nur das erste Element aus dem Abonnement verwenden, eine Logik anwenden und die Verarbeitung beenden. Wir können die request () -Methode verwenden, um dies zu erreichen.

Lassen Sie uns unseren EndSubscriber so ändern, dass er nur N Nachrichten verbraucht. Wir werden diese Nummer als Konstruktorargument howMuchMessagesConsume übergeben :

public class EndSubscriber implements Subscriber { private AtomicInteger howMuchMessagesConsume; private Subscription subscription; public List consumedElements = new LinkedList(); public EndSubscriber(Integer howMuchMessagesConsume) { this.howMuchMessagesConsume = new AtomicInteger(howMuchMessagesConsume); } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { howMuchMessagesConsume.decrementAndGet(); System.out.println("Got : " + item); consumedElements.add(item); if (howMuchMessagesConsume.get() > 0) { subscription.request(1); } } //... }

Wir können Elemente so lange anfordern, wie wir möchten.

Schreiben wir einen Test, in dem wir nur ein Element aus dem angegebenen Abonnement verwenden möchten :

@Test public void whenRequestForOnlyOneElement_thenShouldConsumeOne() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(1); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); List expected = List.of("1"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expected) ); }

Although the publisher is publishing six elements, our EndSubscriber will consume only one element because it signals demand for processing only that single one.

By using the request() method on the Subscription, we can implement a more sophisticated back-pressure mechanism to control the speed of the message consumption.

6. Conclusion

In this article, we had a look at the Java 9 Reactive Streams.

We saw how to create a processing Flow consisting of a Publisher and a Subscriber. We created a more complex processing flow with the transformation of elements using Processors.

Schließlich haben wir das Abonnement verwendet , um die Nachfrage des Abonnenten nach Elementen zu steuern .

Die Implementierung all dieser Beispiele und Codefragmente finden Sie im GitHub-Projekt - dies ist ein Maven-Projekt, daher sollte es einfach zu importieren und auszuführen sein, wie es ist.