Der Unterschied zwischen der RxJava-API und der Java 9 Flow-API

1. Einleitung

Die Java Flow API wurde in Java 9 als Implementierung der Reactive Stream Specification eingeführt.

In diesem Tutorial untersuchen wir zunächst reaktive Streams. Anschließend erfahren wir mehr über die Beziehung zu RxJava und Flow API.

2. Was sind reaktive Ströme?

Mit dem Reactive Manifesto wurden Reactive Streams eingeführt, um einen Standard für die asynchrone Stream-Verarbeitung mit nicht blockierendem Gegendruck festzulegen.

Der Umfang der Reactive Stream-Spezifikation besteht darin, einen minimalen Satz von Schnittstellen zu definieren , um diese Ziele zu erreichen:

  • org.reactivestreams.Publisher ist ein Datenanbieter, der Daten für die Abonnenten basierend auf deren Nachfrage veröffentlicht

  • org.reactivestreams.Subscriber ist der Verbraucher von Daten - er kann Daten empfangen, nachdem er einen Herausgeber abonniert hat

  • org.reactivestreams.Subscription wird erstellt, wenn ein Publisher einen Abonnenten akzeptiert

  • org.reactivestreams.Processor ist sowohl Abonnent als auch Herausgeber - er abonniert einen Herausgeber, verarbeitet die Daten und leitet die verarbeiteten Daten an den Abonnenten weiter

Die Flow-API stammt aus der Spezifikation. RxJava geht dem voraus, aber seit 2.0 unterstützt RxJava auch die Spezifikation.

Wir werden uns eingehend mit beiden befassen, aber zuerst sehen wir uns einen praktischen Anwendungsfall an.

3. Anwendungsfall

Für dieses Tutorial verwenden wir einen Live-Stream-Videodienst als Anwendungsfall.

Ein Live-Stream-Video ist im Gegensatz zum On-Demand-Video-Streaming nicht vom Verbraucher abhängig. Daher veröffentlicht der Server den Stream in seinem eigenen Tempo und es liegt in der Verantwortung des Verbrauchers, ihn anzupassen.

In der einfachsten Form besteht unser Modell aus einem Videostream-Publisher und einem Video-Player als Abonnent.

Lassen Sie uns VideoFrame als unser Datenelement implementieren :

public class VideoFrame { private long number; // additional data fields // constructor, getters, setters }

Lassen Sie uns dann unsere Flow API- und RxJava-Implementierungen einzeln durchgehen.

4. Implementierung mit Flow API

Die Flow-APIs in JDK 9 entsprechen der Reactive Streams-Spezifikation. Wenn die Anwendung mit der Flow-API anfänglich N Elemente anfordert, sendet der Herausgeber höchstens N Elemente an den Abonnenten.

Die Flow-API-Schnittstellen befinden sich alle in der Schnittstelle java.util.concurrent.Flow . Sie sind semantisch äquivalent zu ihren jeweiligen Gegenstücken zu Reactive Streams.

Implementieren wir VideoStreamServer als Herausgeber von VideoFrame .

public class VideoStreamServer extends SubmissionPublisher { public VideoStreamServer() { super(Executors.newSingleThreadExecutor(), 5); } }

Wir haben unseren VideoStreamServer von SubmissionPublisher erweitert, anstatt Flow :: Publisher direkt zu implementieren . SubmissionPublisher ist eine JDK-Implementierung von Flow :: Publisher für die asynchrone Kommunikation mit Abonnenten, sodass unser VideoStreamServer in seinem eigenen Tempo gesendet werden kann .

Dies ist auch hilfreich für den Gegendruck und die Pufferbehandlung, da beim Aufruf von SubmissionPublisher :: subscribe eine Instanz von BufferedSubscription erstellt und das neue Abonnement dann zu seiner Abonnementkette hinzugefügt wird. BufferedSubscription kann ausgegebene Elemente bis zu SubmissionPublisher # maxBufferCapacity puffern .

Definieren wir nun VideoPlayer, der einen Stream von VideoFrame verbraucht . Daher muss Flow :: Subscriber implementiert werden .

public class VideoPlayer implements Flow.Subscriber { Flow.Subscription subscription = null; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(VideoFrame item) { log.info("play #{}" , item.getNumber()); subscription.request(1); } @Override public void onError(Throwable throwable) { log.error("There is an error in video streaming:{}" , throwable.getMessage()); } @Override public void onComplete() { log.error("Video has ended"); } }

VideoPlayer abonniert VideoStreamServer. Nach einem erfolgreichen Abonnement wird die VideoPlayer :: onSubscribe- Methode aufgerufen und fordert einen Frame an. VideoPlayer :: onNext empfängt den Frame und fordert einen neuen an. Die Anzahl der angeforderten Frames hängt vom Anwendungsfall und den Abonnentenimplementierungen ab .

Lassen Sie uns zum Schluss die Dinge zusammenfügen:

VideoStreamServer streamServer = new VideoStreamServer(); streamServer.subscribe(new VideoPlayer()); // submit video frames ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); AtomicLong frameNumber = new AtomicLong(); executor.scheduleWithFixedDelay(() -> { streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> { subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber() + " droped because of backpressure")); return true; }); }, 0, 1, TimeUnit.MILLISECONDS); sleep(1000);

5. Implementierung mit RxJava

RxJava ist eine Java-Implementierung von ReactiveX. Das Projekt ReactiveX (oder Reactive Extensions) zielt darauf ab, ein reaktives Programmierkonzept bereitzustellen. Es ist eine Kombination aus dem Observer-Muster, dem Iterator-Muster und der funktionalen Programmierung.

Die neueste Hauptversion für RxJava ist 3.x. RxJava unterstützt Reactive Streams seit Version 2.x mit seiner Flowable- Basisklasse, ist jedoch wichtiger als Reactive Streams mit mehreren Basisklassen wie Flowable , Observable , Single , Completable.

Fließfähig als Komponente zur Einhaltung des reaktiven Stroms ist ein Fluss von 0 bis N Elementen mit Gegendruckbehandlung. Flowable erweitert Publisher von Reactive Streams. Daher akzeptieren viele RxJava-Operatoren Publisher direkt und ermöglichen die direkte Interaktion mit anderen Reactive Streams-Implementierungen.

Lassen Sie uns nun unseren Videostream-Generator erstellen, der ein unendlich fauler Stream ist:

Stream videoStream = Stream.iterate(new VideoFrame(0), videoFrame -> { // sleep for 1ms; return new VideoFrame(videoFrame.getNumber() + 1); });

Dann definieren wir eine Flowable- Instanz, um Frames in einem separaten Thread zu generieren:

Flowable .fromStream(videoStream) .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))

Es ist wichtig zu beachten, dass ein unendlicher Stream für uns ausreicht. Wenn wir jedoch eine flexiblere Methode zum Generieren unseres Streams benötigen, ist Flowable.create eine gute Wahl.

Flowable .create(new FlowableOnSubscribe() { AtomicLong frame = new AtomicLong(); @Override public void subscribe(@NonNull FlowableEmitter emitter) { while (true) { emitter.onNext(new VideoFrame(frame.incrementAndGet())); //sleep for 1 ms to simualte delay } } }, /* Set Backpressure Strategy Here */)

Im nächsten Schritt abonniert VideoPlayer dieses Flowable und beobachtet Elemente in einem separaten Thread.

videoFlowable .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .subscribe(item -> { log.info("play #" + item.getNumber()); // sleep for 30 ms to simualate frame display });

Und schließlich konfigurieren wir die Strategie für den Gegendruck. Wenn wir das Video im Falle eines Bildverlusts stoppen möchten, müssen wir BackpressureOverflowStrategy :: ERROR verwenden, wenn der Puffer voll ist.

Flowable .fromStream(videoStream) .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .onBackpressureBuffer(5, null, BackpressureOverflowStrategy.ERROR) .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) .subscribe(item -> { log.info("play #" + item.getNumber()); // sleep for 30 ms to simualate frame display });

6. Vergleich von RxJava und Flow API

Selbst in diesen beiden einfachen Implementierungen können wir sehen, wie umfangreich die API von RxJava ist, insbesondere für die Pufferverwaltung, die Fehlerbehandlung und die Gegendruckstrategie. Mit seiner fließenden API erhalten wir mehr Optionen und weniger Codezeilen. Betrachten wir nun kompliziertere Fälle.

Angenommen, unser Player kann keine Videobilder ohne Codec anzeigen. Daher müssen wir mit der Flow-API einen Prozessor implementieren , um den Codec zu simulieren und zwischen Server und Player zu sitzen. Mit RxJava können wir dies mit Flowable :: flatMap oder Flowable :: map tun .

Oder stellen wir uns vor, unser Player überträgt auch Live-Übersetzungs-Audio, sodass wir Video- und Audio-Streams von verschiedenen Herausgebern kombinieren müssen. Mit RxJava können wir Flowable :: combinLatest verwenden, aber mit der Flow-API ist dies keine einfache Aufgabe.

Es ist jedoch möglich, einen benutzerdefinierten Prozessor zu schreiben , der beide Streams abonniert und die kombinierten Daten an unseren VideoPlayer sendet. Die Implementierung bereitet jedoch Kopfschmerzen.

7. Warum Flow API?

An dieser Stelle haben wir möglicherweise eine Frage: Welche Philosophie steckt hinter der Flow-API?

Wenn wir im JDK nach Flow-API-Verwendungen suchen, finden wir etwas in java.net.http und jdk.internal.net.http.

Furthermore, we can find adapters in the reactor project or reactive stream package. For example, org.reactivestreams.FlowAdapters has methods for converting Flow API interfaces to Reactive Stream ones and vice-versa. Therefore it helps the interoperability between Flow API and libraries with reactive stream support.

All of these facts help us to understand the purpose of Flow API: It was created to be a group of reactive specification interfaces in JDK without relay on third parties. Moreover, Java expects Flow API to be accepted as standard interfaces for reactive specification and to be used in JDK or other Java-based libraries that implement the reactive specification for middlewares and utilities.

8. Conclusions

In diesem Tutorial finden Sie eine Einführung in Reactive Stream Specification, Flow API und RxJava.

Darüber hinaus haben wir ein praktisches Beispiel für Flow API- und RxJava-Implementierungen für einen Live-Videostream gesehen.

Alle Aspekte von Flow API und RxJava wie Flow :: Processor , Flowable :: map und Flowable :: flatMap oder Gegendruckstrategien werden hier jedoch nicht behandelt.

Wie immer finden Sie den vollständigen Code des Tutorials auf GitHub.