Einführung in RxJava

1. Übersicht

In diesem Artikel konzentrieren wir uns auf die Verwendung von Reactive Extensions (Rx) in Java zum Erstellen und Konsumieren von Datensequenzen.

Auf den ersten Blick ähnelt die API möglicherweise Java 8 Streams, ist jedoch viel flexibler und fließender, was sie zu einem leistungsstarken Programmierparadigma macht.

Wenn Sie mehr über RxJava erfahren möchten, lesen Sie diese Beschreibung.

2. Setup

Um RxJava in unserem Maven-Projekt zu verwenden, müssen wir unserer pom.xml die folgende Abhängigkeit hinzufügen :

 io.reactivex rxjava ${rx.java.version} 

Oder für ein Gradle-Projekt:

compile 'io.reactivex.rxjava:rxjava:x.y.z'

3. Funktionale reaktive Konzepte

Auf der einen Seite ist funktionale Programmierung der Prozess des Erstellens von Software durch Zusammenstellen reiner Funktionen, Vermeiden gemeinsamer Zustände, veränderbarer Daten und Nebenwirkungen.

Auf der anderen Seite, reaktive Programmierung ist ein asynchrones Programmierparadigma mit Datenströmen betreffen , und die Ausbreitung der Veränderung.

Zusammen bildet die funktionale reaktive Programmierung eine Kombination aus funktionalen und reaktiven Techniken, die einen eleganten Ansatz für die ereignisgesteuerte Programmierung darstellen können - mit Werten, die sich im Laufe der Zeit ändern und bei denen der Verbraucher auf die eingehenden Daten reagiert.

Diese Technologie vereint verschiedene Implementierungen ihrer Kernprinzipien. Einige Autoren haben ein Dokument erstellt, das das gemeinsame Vokabular zur Beschreibung des neuen Anwendungstyps definiert.

3.1. Reaktives Manifest

Das Reactive Manifesto ist ein Online-Dokument, das einen hohen Standard für Anwendungen in der Softwareentwicklungsbranche festlegt. Einfach ausgedrückt sind reaktive Systeme:

  • Responsive - Systeme sollten zeitnah reagieren
  • Nachrichtengesteuert - Systeme sollten die asynchrone Nachrichtenübermittlung zwischen Komponenten verwenden, um eine lose Kopplung sicherzustellen
  • Elastische Systeme sollten unter hoher Last reaktionsfähig bleiben
  • Ausfallsicher - Systeme sollten reagieren, wenn einige Komponenten ausfallen

4. Observables

Bei der Arbeit mit Rx sind zwei Schlüsseltypen zu verstehen :

  • Observable stellt jedes Objekt dar, das Daten von einer Datenquelle erhalten kann und dessen Status von Interesse sein kann, so dass andere Objekte ein Interesse registrieren können
  • Ein Beobachter ist jedes Objekt, das benachrichtigt werden möchte, wenn sich der Zustand eines anderen Objekts ändert

Ein Beobachter abonniert eine Observable Sequenz. Die Sequenz sendet nacheinander Elemente an den Beobachter .

Der Beobachter behandelt jeden einzelnen, bevor er den nächsten verarbeitet. Wenn viele Ereignisse asynchron eingehen, müssen sie in einer Warteschlange gespeichert oder gelöscht werden.

In Rx wird ein Beobachter niemals mit einem Artikel außerhalb der Reihenfolge angerufen oder angerufen, bevor der Rückruf für den vorherigen Artikel zurückgegeben wurde.

4.1. Arten von beobachtbaren

Es gibt zwei Arten:

  • Nicht blockierend - Die asynchrone Ausführung wird unterstützt und kann an jedem Punkt im Ereignisstrom abbestellt werden. In diesem Artikel konzentrieren wir uns hauptsächlich auf diese Art von Typ
  • Blockieren - Alle onNext- Beobachteraufrufe sind synchron und es ist nicht möglich, sich mitten in einem Ereignisstrom abzumelden. Mit der Methode toBlocking können wir jederzeit ein Observable in ein Blocking Observable konvertieren:
BlockingObservable blockingObservable = observable.toBlocking();

4.2. Betreiber

Ein Operator ist eine Funktion, die ein O bservable (die Quelle) als erstes Argument verwendet und ein anderes Observable (das Ziel) zurückgibt . Dann wendet es für jedes Element, das das Quell-Observable ausgibt, eine Funktion auf dieses Element an und gibt dann das Ergebnis auf dem Ziel- Observable aus .

Operatoren können miteinander verkettet werden, um komplexe Datenflüsse zu erstellen, die Ereignisse anhand bestimmter Kriterien filtern. Es können mehrere Operatoren auf dasselbe Observable angewendet werden .

Es ist nicht schwierig, in eine Situation zu gelangen, in der ein Observable Gegenstände schneller emittiert, als ein Bediener oder Beobachter sie verbrauchen kann. Weitere Informationen zum Gegendruck finden Sie hier.

4.3. Observable erstellen

Der Basisoperator erstellt lediglich eine Observable , die vor Abschluss eine einzelne generische Instanz ausgibt, die Zeichenfolge "Hallo". Wenn wir Informationen aus einem Observable abrufen möchten , implementieren wir eine Beobachterschnittstelle und rufen dann subscribe für das gewünschte Observable auf:

Observable observable = Observable.just("Hello"); observable.subscribe(s -> result = s); assertTrue(result.equals("Hello"));

4.4. OnNext, OnError und OnCompleted

Es gibt drei Methoden auf der Beobachterschnittstelle , die wir kennenlernen möchten:

  1. OnNext wird bei jeder Veröffentlichung eines neuen Ereignisses im angehängten Observable von unserem Beobachter aufgerufen . Dies ist die Methode, mit der wir für jedes Ereignis eine Aktion ausführen
  2. OnCompleted wird aufgerufen, wenn die mit einem Observable verknüpfte Ereignisfolge abgeschlossen ist, was darauf hinweist, dass wir keine weiteren onNext- Aufrufe für unseren Beobachter erwarten sollten
  3. OnError wird aufgerufen, wenn während des RxJava- Framework-Codes oder unseres Ereignisbehandlungscodes eine nicht behandelte Ausnahme ausgelöst wird

Der Rückgabewert für die Observables- Abonnementmethode ist eine Abonnementschnittstelle :

String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from(letters); observable.subscribe( i -> result += i, //OnNext Throwable::printStackTrace, //OnError () -> result += "_Completed" //OnCompleted ); assertTrue(result.equals("abcdefg_Completed"));

5. Beobachtbare Transformationen und bedingte Operatoren

5.1. Karte

Der m ap-Operator transformiert von einem Observable ausgegebene Elemente, indem er auf jedes Element eine Funktion anwendet.

Nehmen wir an, es gibt ein deklariertes Array von Zeichenfolgen, das einige Buchstaben aus dem Alphabet enthält, und wir möchten sie im Großbuchstabenmodus drucken:

Observable.from(letters) .map(String::toUpperCase) .subscribe(letter -> result += letter); assertTrue(result.equals("ABCDEFG"));

Die flatMap kann verwendet werden, um Observables zu reduzieren , wenn verschachtelte Observables vorliegen .

Weitere Details zum Unterschied zwischen map und flatMap finden Sie hier.

Angenommen, wir haben eine Methode, die ein Observable aus einer Liste von Zeichenfolgen zurückgibt . Jetzt drucken wir für jede Zeichenfolge aus einem neuen Observable die Liste der Titel basierend auf dem, was der Abonnent sieht:

Observable getTitle() { return Observable.from(titleList); } Observable.just("book1", "book2") .flatMap(s -> getTitle()) .subscribe(l -> result += l); assertTrue(result.equals("titletitle"));

5.2. Scan

The scan operator applies a function to each item emitted by an Observable sequentially and emits each successive value.

It allows us to carry forward state from event to event:

String[] letters = {"a", "b", "c"}; Observable.from(letters) .scan(new StringBuilder(), StringBuilder::append) .subscribe(total -> result += total.toString()); assertTrue(result.equals("aababc"));

5.3. GroupBy

Group by operator allows us to classify the events in the input Observable into output categories.

Let's assume that we created an array of integers from 0 to 10, then apply group by that will divide them into the categories even and odd:

Observable.from(numbers) .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD") .subscribe(group -> group.subscribe((number) -> { if (group.getKey().toString().equals("EVEN")) { EVEN[0] += number; } else { ODD[0] += number; } }) ); assertTrue(EVEN[0].equals("0246810")); assertTrue(ODD[0].equals("13579"));

5.4. Filter

The operator filter emits only those items from an observable that pass a predicate test.

So let's filter in an integer array for the odd numbers:

Observable.from(numbers) .filter(i -> (i % 2 == 1)) .subscribe(i -> result += i); assertTrue(result.equals("13579"));

5.5. Conditional Operators

DefaultIfEmpty emits item from the source Observable, or a default item if the source Observable is empty:

Observable.empty() .defaultIfEmpty("Observable is empty") .subscribe(s -> result += s); assertTrue(result.equals("Observable is empty"));

The following code emits the first letter of the alphabet ‘a' because the array letters is not empty and this is what it contains in the first position:

Observable.from(letters) .defaultIfEmpty("Observable is empty") .first() .subscribe(s -> result += s); assertTrue(result.equals("a"));

TakeWhile operator discards items emitted by an Observable after a specified condition becomes false:

Observable.from(numbers) .takeWhile(i -> i  sum[0] += s); assertTrue(sum[0] == 10);

Of course, there more others operators that could cover our needs like Contain, SkipWhile, SkipUntil, TakeUntil, etc.

6. Connectable Observables

A ConnectableObservable resembles an ordinary Observable, except that it doesn't begin emitting items when it is subscribed to, but only when the connect operator is applied to it.

In this way, we can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items:

String[] result = {""}; ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); connectable.subscribe(i -> result[0] += i); assertFalse(result[0].equals("01")); connectable.connect(); Thread.sleep(500); assertTrue(result[0].equals("01"));

7. Single

Single is like an Observable who, instead of emitting a series of values, emits one value or an error notification.

With this source of data, we can only use two methods to subscribe:

  • OnSuccess returns a Single that also calls a method we specify
  • OnError also returns a Single that immediately notifies subscribers of an error
String[] result = {""}; Single single = Observable.just("Hello") .toSingle() .doOnSuccess(i -> result[0] += i) .doOnError(error -> { throw new RuntimeException(error.getMessage()); }); single.subscribe(); assertTrue(result[0].equals("Hello"));

8. Subjects

A Subject is simultaneously two elements, a subscriber and an observable. As a subscriber, a subject can be used to publish the events coming from more than one observable.

And because it's also observable, the events from multiple subscribers can be reemitted as its events to anyone observing it.

In the next example, we'll look at how the observers will be able to see the events that occur after they subscribe:

Integer subscriber1 = 0; Integer subscriber2 = 0; Observer getFirstObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber1 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber1 completed"); } }; } Observer getSecondObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber2 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber2 completed"); } }; } PublishSubject subject = PublishSubject.create(); subject.subscribe(getFirstObserver()); subject.onNext(1); subject.onNext(2); subject.onNext(3); subject.subscribe(getSecondObserver()); subject.onNext(4); subject.onCompleted(); assertTrue(subscriber1 + subscriber2 == 14)

9. Resource Management

Using operation allows us to associate resources, such as a JDBC database connection, a network connection, or open files to our observables.

Hier präsentieren wir in Kommentaren die Schritte, die wir zur Erreichung dieses Ziels unternehmen müssen, sowie ein Beispiel für die Implementierung:

String[] result = {""}; Observable values = Observable.using( () -> "MyResource", r -> { return Observable.create(o -> { for (Character c : r.toCharArray()) { o.onNext(c); } o.onCompleted(); }); }, r -> System.out.println("Disposed: " + r) ); values.subscribe( v -> result[0] += v, e -> result[0] += e ); assertTrue(result[0].equals("MyResource"));

10. Schlussfolgerung

In diesem Artikel haben wir uns mit der Verwendung der RxJava-Bibliothek und den wichtigsten Funktionen befasst.

Den vollständigen Quellcode für das Projekt einschließlich aller hier verwendeten Codebeispiele finden Sie auf Github.