Einführung in den Reaktorkern

1. Einleitung

Reactor Core ist eine Java 8-Bibliothek, die das reaktive Programmiermodell implementiert. Es basiert auf der Reactive Streams-Spezifikation, einem Standard zum Erstellen reaktiver Anwendungen.

Vor dem Hintergrund der nicht reaktiven Java-Entwicklung kann das Reaktivieren eine ziemlich steile Lernkurve sein. Dies wird beim Vergleich mit der Java 8 Stream- API schwieriger , da sie fälschlicherweise als dieselben Abstraktionen auf hoher Ebene angesehen werden können.

In diesem Artikel werden wir versuchen, dieses Paradigma zu entmystifizieren. Wir werden kleine Schritte durch Reactor machen, bis wir ein Bild davon erstellt haben, wie reaktiver Code erstellt wird, und den Grundstein für fortgeschrittenere Artikel legen, die in einer späteren Reihe erscheinen werden.

2. Spezifikation der reaktiven Ströme

Bevor wir uns den Reaktor ansehen, sollten wir uns die Spezifikation für reaktive Ströme ansehen. Dies ist, was Reactor implementiert, und es legt die Grundlage für die Bibliothek.

Reactive Streams ist im Wesentlichen eine Spezifikation für die asynchrone Stream-Verarbeitung.

Mit anderen Worten, ein System, in dem viele Ereignisse asynchron erzeugt und verarbeitet werden. Stellen Sie sich vor, dass Tausende von Aktienaktualisierungen pro Sekunde in eine Finanzanwendung eingehen und dass diese rechtzeitig auf diese Aktualisierungen reagieren müssen.

Eines der Hauptziele ist es, das Problem des Gegendrucks anzugehen. Wenn wir einen Produzenten haben, der Ereignisse schneller an einen Verbraucher sendet, als er sie verarbeiten kann, wird der Verbraucher schließlich mit Ereignissen überfordert sein, denen die Systemressourcen ausgehen.

Gegendruck bedeutet, dass unser Verbraucher dem Hersteller mitteilen kann, wie viele Daten gesendet werden sollen, um dies zu verhindern. Dies ist in der Spezifikation festgelegt.

3. Maven-Abhängigkeiten

Bevor wir beginnen, fügen wir unsere Maven-Abhängigkeiten hinzu:

 io.projectreactor reactor-core 3.3.9.RELEASE   ch.qos.logback logback-classic 1.1.3 

Wir fügen auch Logback als Abhängigkeit hinzu. Dies liegt daran, dass wir die Ausgabe von Reactor protokollieren, um den Datenfluss besser zu verstehen.

4. Erstellen eines Datenstroms

Damit eine Anwendung reaktiv ist, muss sie zunächst einen Datenstrom erzeugen können.

Dies könnte so etwas wie das Beispiel für eine Aktienaktualisierung sein, das wir zuvor gegeben haben. Ohne diese Daten hätten wir nichts zu reagieren, weshalb dies ein logischer erster Schritt ist.

Reactive Core bietet uns zwei Datentypen, mit denen wir dies tun können.

4.1. Fluss

Der erste Weg, dies zu tun, ist mit einem Flux. Es ist ein Stream, der 0..n Elemente ausgeben kann . Versuchen wir, eine einfache zu erstellen:

Flux just = Flux.just(1, 2, 3, 4);

In diesem Fall haben wir einen statischen Strom von vier Elementen.

4.2. Mono

Der zweite Weg, dies zu tun, ist mit einem Mono, das ein Strom von 0..1 Elementen ist. Versuchen wir, eine zu instanziieren:

Mono just = Mono.just(1);

Dies sieht und verhält sich fast genauso wie der Flux , nur dass wir diesmal auf nicht mehr als ein Element beschränkt sind.

4.3. Warum nicht nur Flussmittel?

Bevor Sie weiter experimentieren, sollten Sie hervorheben, warum wir diese beiden Datentypen haben.

Zunächst sollte beachtet werden, dass sowohl ein Flux als auch ein Mono Implementierungen der Reactive Streams Publisher- Schnittstelle sind. Beide Klassen entsprechen der Spezifikation, und wir könnten diese Schnittstelle an ihrer Stelle verwenden:

Publisher just = Mono.just("foo");

Aber wirklich, diese Kardinalität zu kennen, ist nützlich. Dies liegt daran, dass einige Operationen nur für einen der beiden Typen sinnvoll sind und dass sie aussagekräftiger sein können (stellen Sie sich findOne () in einem Repository vor).

5. Abonnieren eines Streams

Jetzt haben wir einen allgemeinen Überblick darüber, wie ein Datenstrom erzeugt wird. Wir müssen ihn abonnieren, damit er die Elemente ausgibt.

5.1. Elemente sammeln

Verwenden Sie die subscribe () -Methode, um alle Elemente in einem Stream zu sammeln:

List elements = new ArrayList(); Flux.just(1, 2, 3, 4) .log() .subscribe(elements::add); assertThat(elements).containsExactly(1, 2, 3, 4);

Die Daten fließen erst, wenn wir sie abonnieren. Beachten Sie, dass wir auch einige Protokollierungen hinzugefügt haben. Dies ist hilfreich, wenn wir uns ansehen, was hinter den Kulissen passiert.

5.2. Der Fluss der Elemente

Wenn Sie sich angemeldet haben, können wir damit visualisieren, wie die Daten durch unseren Stream fließen:

20:25:19.550 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | request(unbounded) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:25:19.553 [main] INFO reactor.Flux.Array.1 - | onComplete()

Zunächst läuft alles auf dem Haupt-Thread. Lassen Sie uns darauf nicht näher eingehen, da wir später in diesem Artikel einen weiteren Blick auf die Parallelität werfen werden. Es macht die Dinge jedoch einfach, da wir alles in Ordnung bringen können.

Lassen Sie uns nun die Sequenz durchgehen, die wir einzeln protokolliert haben:

  1. onSubscribe () - Dies wird aufgerufen, wenn wir unseren Stream abonnieren
  2. request(unbounded) – When we call subscribe, behind the scenes we are creating a Subscription. This subscription requests elements from the stream. In this case, it defaults to unbounded, meaning it requests every single element available
  3. onNext() – This is called on every single element
  4. onComplete() – This is called last, after receiving the last element. There's actually a onError() as well, which would be called if there is an exception, but in this case, there isn't

This is the flow laid out in the Subscriber interface as part of the Reactive Streams Specification, and in reality, that's what's been instantiated behind the scenes in our call to onSubscribe(). It's a useful method, but to better understand what's happening let's provide a Subscriber interface directly:

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { elements.add(integer); } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

We can see that each possible stage in the above flow maps to a method in the Subscriber implementation. It just happens that the Flux has provided us with a helper method to reduce this verbosity.

5.3. Comparison to Java 8 Streams

It still might appear that we have something synonymous to a Java 8 Stream doing collect:

List collected = Stream.of(1, 2, 3, 4) .collect(toList());

Only we don't.

The core difference is that Reactive is a push model, whereas the Java 8 Streams are a pull model. In a reactive approach, events are pushed to the subscribers as they come in.

The next thing to notice is a Streams terminal operator is just that, terminal, pulling all the data and returning a result. With Reactive we could have an infinite stream coming in from an external resource, with multiple subscribers attached and removed on an ad hoc basis. We can also do things like combine streams, throttle streams and apply backpressure, which we will cover next.

6. Backpressure

The next thing we should consider is backpressure. In our example, the subscriber is telling the producer to push every single element at once. This could end up becoming overwhelming for the subscriber, consuming all of its resources.

Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.

We can modify our Subscriber implementation to apply backpressure. Let's tell the upstream to only send two elements at a time by using request():

Flux.just(1, 2, 3, 4) .log() .subscribe(new Subscriber() { private Subscription s; int onNextAmount; @Override public void onSubscribe(Subscription s) { this.s = s; s.request(2); } @Override public void onNext(Integer integer) { elements.add(integer); onNextAmount++; if (onNextAmount % 2 == 0) { s.request(2); } } @Override public void onError(Throwable t) {} @Override public void onComplete() {} });

Now if we run our code again, we'll see the request(2) is called, followed by two onNext() calls, then request(2) again.

23:31:15.395 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.397 [main] INFO reactor.Flux.Array.1 - | onNext(1) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(3) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onNext(4) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | request(2) 23:31:15.398 [main] INFO reactor.Flux.Array.1 - | onComplete()

Essentially, this is reactive pull backpressure. We are requesting the upstream to only push a certain amount of elements, and only when we are ready.

If we imagine we were being streamed tweets from twitter, it would then be up to the upstream to decide what to do. If tweets were coming in but there are no requests from the downstream, then the upstream could drop items, store them in a buffer, or some other strategy.

7. Operating on a Stream

We can also perform operations on the data in our stream, responding to events as we see fit.

7.1. Mapping Data in a Stream

A simple operation that we can perform is applying a transformation. In this case, let's just double all the numbers in our stream:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribe(elements::add);

map() will be applied when onNext() is called.

7.2. Combining Two Streams

We can then make things more interesting by combining another stream with this one. Let's try this by using zip() function:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .zipWith(Flux.range(0, Integer.MAX_VALUE), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two)) .subscribe(elements::add); assertThat(elements).containsExactly( "First Flux: 2, Second Flux: 0", "First Flux: 4, Second Flux: 1", "First Flux: 6, Second Flux: 2", "First Flux: 8, Second Flux: 3");

Here, we are creating another Flux that keeps incrementing by one and streaming it together with our original one. We can see how these work together by inspecting the logs:

20:04:38.064 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:04:38.065 [main] INFO reactor.Flux.Array.1 - | onNext(1) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 20:04:38.066 [main] INFO reactor.Flux.Range.2 - | onNext(0) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(1) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(2) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onNext(4) 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | onNext(3) 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | onComplete() 20:04:38.067 [main] INFO reactor.Flux.Array.1 - | cancel() 20:04:38.067 [main] INFO reactor.Flux.Range.2 - | cancel()

Note how we now have one subscription per Flux. The onNext() calls are also alternated, so the index of each element in the stream will match when we apply the zip() function.

8. Hot Streams

Currently, we've focused primarily on cold streams. These are static, fixed-length streams that are easy to deal with. A more realistic use case for reactive might be something that happens infinitely.

For example, we could have a stream of mouse movements that constantly needs to be reacted to or a twitter feed. These types of streams are called hot streams, as they are always running and can be subscribed to at any point in time, missing the start of the data.

8.1. Creating a ConnectableFlux

One way to create a hot stream is by converting a cold stream into one. Let's create a Flux that lasts forever, outputting the results to the console, which would simulate an infinite stream of data coming from an external resource:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .publish();

By calling publish() we are given a ConnectableFlux. This means that calling subscribe() won't cause it to start emitting, allowing us to add multiple subscriptions:

publish.subscribe(System.out::println); publish.subscribe(System.out::println);

If we try running this code, nothing will happen. It's not until we call connect(), that the Flux will start emitting:

publish.connect();

8.2. Throttling

If we run our code, our console will be overwhelmed with logging. This is simulating a situation where too much data is being passed to our consumers. Let's try getting around this with throttling:

ConnectableFlux publish = Flux.create(fluxSink -> { while(true) { fluxSink.next(System.currentTimeMillis()); } }) .sample(ofSeconds(2)) .publish();

Here, we've introduced a sample() method with an interval of two seconds. Now values will only be pushed to our subscriber every two seconds, meaning the console will be a lot less hectic.

Of course, there are multiple strategies to reduce the amount of data sent downstream, such as windowing and buffering, but they will be left out of scope for this article.

9. Concurrency

All of our above examples have currently run on the main thread. However, we can control which thread our code runs on if we want. The Scheduler interface provides an abstraction around asynchronous code, for which many implementations are provided for us. Let's try subscribing to a different thread to main:

Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribeOn(Schedulers.parallel()) .subscribe(elements::add);

The Parallel scheduler will cause our subscription to be run on a different thread, which we can prove by looking at the logs. We see the first entry comes from the main thread and the Flux is running in another thread called parallel-1.

20:03:27.505 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:03:27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request(unbounded) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(1) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(2) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(3) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext(4) 20:03:27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete()

Concurrency get's more interesting than this, and it will be worth us exploring it in another article.

10. Conclusion

In diesem Artikel haben wir einen umfassenden Überblick über Reactive Core gegeben. Wir haben erklärt, wie wir Streams veröffentlichen und abonnieren, Gegendruck anwenden, Streams bearbeiten und Daten auch asynchron verarbeiten können. Dies sollte hoffentlich eine Grundlage für das Schreiben reaktiver Anwendungen bilden.

Spätere Artikel in dieser Reihe behandeln fortgeschrittenere Parallelität und andere reaktive Konzepte. Es gibt auch einen anderen Artikel über Reactor with Spring.

Der Quellcode für unsere Anwendung ist auf GitHub verfügbar. Dies ist ein Maven-Projekt, das unverändert ausgeführt werden kann.