Einführung in RxKotlin

1. Übersicht

In diesem Tutorial werden wir die Verwendung von Reactive Extensions (Rx) in idiomatischem Kotlin mithilfe der RxKotlin-Bibliothek überprüfen.

RxKotlin ist per se keine Implementierung von Reactive Extensions. Stattdessen handelt es sich hauptsächlich um eine Sammlung von Erweiterungsmethoden. Das heißt, RxKotlin erweitert die RxJava- Bibliothek um eine API, die speziell für Kotlin entwickelt wurde.

Daher verwenden wir Konzepte aus unserem Artikel Einführung in RxJava sowie das Konzept von Flowables, das wir in einem speziellen Artikel vorgestellt haben.

2. RxKotlin-Setup

Um RxKotlin in unserem Maven-Projekt zu verwenden, müssen wir die rxkotlin- Abhängigkeit zu unserer pom.xml hinzufügen :

 io.reactivex.rxjava2 rxkotlin 2.3.0 

Oder für ein Gradle-Projekt zu unserem build.gradle:

implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'

Hier verwenden wir RxKotlin 2.x, das auf RxJava 2 abzielt. Projekte, die RxJava 1 verwenden, sollten RxKotlin 1.x verwenden. Für beide Versionen gelten die gleichen Konzepte.

Beachten Sie, dass RxKotlin von RxJava abhängt, die Abhängigkeit jedoch nicht häufig auf die neueste Version aktualisiert wird. Wir empfehlen daher, die spezifische RxJava-Version, von der wir abhängen, explizit anzugeben, wie in unserem RxJava-Artikel beschrieben.

3. Erstellen von Observable s in RxKotlin

RxKotlin enthält eine Reihe von Erweiterungsmethoden zum Erstellen von Observable- und Flowable- Objekten aus Sammlungen.

Insbesondere hat jeder Array-Typ eine toObservable () -Methode und eine toFlowable () -Methode:

val observable = listOf(1, 1, 2, 3).toObservable() observable.test().assertValues(1, 1, 2, 3)
val flowable = listOf(1, 1, 2, 3).toFlowable() flowable.buffer(2).test().assertValues(listOf(1, 1), listOf(2, 3))

3.1. Abschluss s

RxKotlin bietet auch einige Methoden zum Erstellen von Completable- Instanzen. Insbesondere können wir die Funktionen von Action s, Callable s, Future s und Zero-Arity mit der Erweiterungsmethode toCompletable in Completable konvertieren:

var value = 0 val completable = { value = 3 }.toCompletable() assertFalse(completable.test().isCancelled()) assertEquals(3, value)

4. Beobachtbar und fließfähig zu Map und Multimap

Wenn wir einen haben beobachtbare oder fließfähige , die produziert Pair - Instanzen, können wir sie in einen verwandeln Einzel beobachtbar , die eine erzeugt Karte:

val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4)) val observable = list.toObservable() val map = observable.toMap() assertEquals(mapOf(Pair("a", 4), Pair("b", 2), Pair("c", 3)), map.blockingGet())

Wie wir im vorherigen Beispiel sehen können, überschreibt toMap früher ausgegebene Werte mit späteren Werten, wenn sie denselben Schlüssel haben.

Wenn wir alle mit einem Schlüssel verknüpften Werte in einer Sammlung zusammenfassen möchten , verwenden wir stattdessen toMultimap :

val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4)) val observable = list.toObservable() val map = observable.toMultimap() assertEquals( mapOf(Pair("a", listOf(1, 4)), Pair("b", listOf(2)), Pair("c", listOf(3))), map.blockingGet())

5. Kombinieren von Observable s und Flowable s

Eines der Verkaufsargumente von Rx ist die Möglichkeit, Observable s und Flowable s auf verschiedene Arten zu kombinieren . In der Tat bietet RxJava eine Reihe von sofort einsatzbereiten Operatoren.

Darüber hinaus enthält RxKotlin einige weitere Erweiterungsmethoden zum Kombinieren von Observable s und dergleichen.

5.1. Beobachtbare Emissionen kombinieren

Wenn wir ein Observable haben , das andere Observable s ausgibt, können wir eine der Erweiterungsmethoden in RxKotlin verwenden, um die emittierten Werte miteinander zu kombinieren.

Insbesondere kombiniert mergeAll die Observablen mit flatMap:

val subject = PublishSubject.create
    
     () val observable = subject.mergeAll()
    

Welches wäre das gleiche wie:

val observable = subject.flatMap { it }

Das resultierende Observable gibt alle Werte des Quell- Observable s in einer nicht angegebenen Reihenfolge aus.

In ähnlicher Weise verwendet concatAll concatMap (die Werte werden in derselben Reihenfolge wie die Quellen ausgegeben ), während switchLatest switchMap verwendet (Werte werden von der zuletzt ausgegebenen Observable ausgegeben ).

Wie wir bisher gesehen haben, werden alle oben genannten Methoden auch für Flowable- Quellen mit derselben Semantik bereitgestellt .

5.2. Kombinieren von Completable s , Maybe s und Single s

Wenn wir ein Observable haben , das Instanzen von Completable , Maybe oder Single ausgibt , können wir diese mit der entsprechenden mergeAllXs- Methode kombinieren , wie zum Beispiel mergeAllMaybes :

val subject = PublishSubject.create
    
     () val observable = subject.mergeAllMaybes() subject.onNext(Maybe.just(1)) subject.onNext(Maybe.just(2)) subject.onNext(Maybe.empty()) subject.onNext(Maybe.error(Exception("error"))) subject.onNext(Maybe.just(3)) observable.test().assertValues(1, 2).assertError(Exception::class.java)
    

5.3. Die Kombination Iterable s von beobachtbaren s

Für Sammlungen von Observable- oder Flowable- Instanzen verfügt RxKotlin stattdessen über einige andere Operatoren, merge und mergeDelayError . Beide haben den Effekt , dass alle Observable s oder Flowable s zu einem kombiniert werden, der alle Werte nacheinander ausgibt :

val observables = mutableListOf(Observable.just("first", "second")) val observable = observables.merge() observables.add(Observable.just("third", "fourth")) observable.test().assertValues("first", "second", "third", "fourth")

The difference between the two operators — which are directly derived from the same-named operators in RxJava — is their treatment of errors.

The merge method emits errors as soon as they're emitted by the source:

// ... observables.add(Observable.error(Exception("e"))) observables.add(Observable.just("fifth")) // ... observable.test().assertValues("first", "second", "third", "fourth")

Whereas mergeDelayError emits them at the end of the stream:

// ... observables.add(Observable.error(Exception("e"))) observables.add(Observable.just("fifth")) // ... observable.test().assertValues("first", "second", "third", "fourth", "fifth")

6. Handling Values of Different Types

Let's now look at the extension methods in RxKotlin for dealing with values of different types.

These are variants of RxJava methods, that make use of Kotlin's reified generics. In particular, we can:

  • cast emitted values from one type to another, or
  • filter out values that are not of a certain type

So, we could, for example, cast an Observable of Numbers to one of Ints:

val observable = Observable.just(1, 1, 2, 3) observable.cast().test().assertValues(1, 1, 2, 3)

Here, the cast is unnecessary. However, when combining different observables together, we might need it.

With ofType, instead, we can filter out values that aren't of the type we expect:

val observable = Observable.just(1, "and", 2, "and") observable.ofType().test().assertValues(1, 2)

As always, cast and ofType are applicable to both Observables and Flowables.

Furthermore, Maybe supports these methods as well. The Single class, instead, only supports cast.

7. Other Helper Methods

Finally, RxKotlin includes several helper methods. Let's have a quick look.

We can use subscribeBy instead of subscribe – it allows named parameters:

Observable.just(1).subscribeBy(onNext = { println(it) })

Similarly, for blocking subscriptions we can use blockingSubscribeBy.

Additionally, RxKotlin includes some methods that mimic those in RxJava but work around a limitation of Kotlin's type inference.

For example, when using Observable#zip, specifying the zipper doesn't look so great:

Observable.zip(Observable.just(1), Observable.just(2), BiFunction { a, b -> a + b })

So, RxKotlin adds Observables#zip for more idiomatic usage:

Observables.zip(Observable.just(1), Observable.just(2)) { a, b -> a + b }

Notice the final “s” in Observables. Similarly, we have Flowables, Singles, and Maybes.

8. Conclusions

In diesem Artikel haben wir die RxKotlin-Bibliothek gründlich überprüft, die RxJava erweitert, damit die API eher wie idiomatisches Kotlin aussieht.

Weitere Informationen finden Sie auf der RxKotlin GitHub-Seite. Für weitere Beispiele empfehlen wir RxKotlin-Tests.

Die Implementierung all dieser Beispiele und Codefragmente befindet sich im GitHub-Projekt als Maven- und Gradle-Projekt, sodass es einfach zu importieren und auszuführen sein sollte.