Filtern von Observablen in RxJava

1. Einleitung

Nach der Einführung in RxJava werden wir uns die Filteroperatoren ansehen.

Insbesondere konzentrieren wir uns auf das Filtern, Überspringen, Zeitfiltern und einige erweiterte Filtervorgänge.

2. Filtern

Bei der Arbeit mit Observable ist es manchmal nützlich, nur eine Teilmenge der ausgegebenen Elemente auszuwählen. Zu diesem Zweck bietet RxJava verschiedene Filterfunktionen .

Schauen wir uns die Filtermethode an .

2.1. Der Filter Operator

Einfach gesagt, die Filter filtert Bediener eine beobachtbare Sicherzustellen , dass die emittierten Produkte angegebene Bedingung entsprechen , die in der Form eines kommt Prädikats .

Mal sehen, wie wir nur die ungeraden Werte aus den ausgegebenen filtern können:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .filter(i -> i % 2 != 0); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 3, 5, 7, 9);

2.2. Der Take- Operator

Beim Filtern mit take führt die Logik zur Emission der ersten n Elemente, während die verbleibenden Elemente ignoriert werden.

Mal sehen , wie wir das Filter Quelle beobachtbare und emittieren nur die ersten zwei Elemente:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.take(3); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

2.3. Der takeWhile- Operator

Bei Verwendung von takeWhile gibt das gefilterte Observable so lange Elemente aus, bis es auf ein erstes Element stößt, das nicht mit dem Prädikat übereinstimmt .

Mal sehen, wie wir takeWhile verwenden können - mit einem Filterprädikat :

Observable sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .takeWhile(i -> i < 4); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

2.4. Der takeFirst- Operator

Wann immer wir nur das erste Element ausgeben möchten, das einer bestimmten Bedingung entspricht, können wir takeFirst () verwenden.

Lassen Sie uns einen kurzen Blick darauf werfen, wie wir das erste Element ausgeben können, das größer als 5 ist:

Observable sourceObservable = Observable .just(1, 2, 3, 4, 5, 7, 6); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .takeFirst(x -> x > 5); filteredObservable.subscribe(subscriber); subscriber.assertValue(7);

2.5. first und firstOrDefault Operatoren

Ein ähnliches Verhalten kann mit der ersten API erreicht werden:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.first(); filteredObservable.subscribe(subscriber); subscriber.assertValue(1);

Wenn wir jedoch einen Standardwert angeben möchten und keine Elemente ausgegeben werden, können wir f irstOrDefault verwenden :

Observable sourceObservable = Observable.empty(); Observable filteredObservable = sourceObservable.firstOrDefault(-1); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.6. Der takeLast- Operator

Wenn wir als Nächstes nur die letzten n Elemente ausgeben möchten, die von einem Observable ausgegeben werden , können wir takeLast verwenden .

Mal sehen, wie es möglich ist, nur die letzten drei Elemente auszugeben:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.takeLast(3); filteredObservable.subscribe(subscriber); subscriber.assertValues(8, 9, 10);

Wir müssen bedenken , dass dies die Emission von jeder Position aus der Quelle verzögert beobachtbare , bis er abgeschlossen ist .

2.7. last und lastOrDefault

Wenn wir nur das letzte Element ausgeben möchten, außer mit takeLast (1) , können wir last verwenden .

Dieses filtert die Observable , nur das letzte Element zu emittieren, die eine Filterung gegebenenfalls überprüft Predicate :

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .last(i -> i % 2 != 0); filteredObservable.subscribe(subscriber); subscriber.assertValue(9);

Wenn das Observable leer ist, können wir lastOrDefault verwenden , das das Observable filtert und den Standardwert ausgibt .

Der Standardwert wird auch ausgegeben , wenn der Operator lastOrDefault verwendet wird und keine Elemente vorhanden sind, die die Filterbedingung überprüfen:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.lastOrDefault(-1, i -> i > 10); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.8. elementAt- und elementAtOrDefault- Operatoren

Mit dem elementAt- Operator können wir ein einzelnes Element auswählen, das von der Quelle Observable ausgegeben wird , und seinen Index angeben :

Observable sourceObservable = Observable .just(1, 2, 3, 5, 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.elementAt(4); filteredObservable.subscribe(subscriber); subscriber.assertValue(7);

Allerdings elementAt wird einen Wurf IndexOutOfBoundException wenn der angegebene Index überschreitet die Anzahl der Elemente emittiert.

Um diese Situation zu vermeiden, können Sie elementAtOrDefault verwenden, das einen Standardwert zurückgibt, falls der Index außerhalb des Bereichs liegt:

Observable sourceObservable = Observable .just(1, 2, 3, 5, 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.elementAtOrDefault(7, -1); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.9. Der ofType- Operator

Wann immer das Observable Objektelemente ausgibt , ist es möglich, sie nach ihrem Typ zu filtern.

Mal sehen, wie wir nur die ausgegebenen Elemente vom Typ String filtern können:

Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.ofType(String.class); filteredObservable.subscribe(subscriber); subscriber.assertValues("two", "five");

3. Überspringen

On the other hand, when we want to filter out or skip some of the items emitted by an Observable, RxJava offers a few operators as a counterpart of the filtering ones, that we've previously discussed.

Let's start looking at the skip operator, the counterpart of take.

3.1. The skip Operator

When an Observable emits a sequence of items, it's possible to filter out or skip some of the firsts emitted items using skip.

For example. let's see how it's possible to skip the first four elements:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.skip(4); filteredObservable.subscribe(subscriber); subscriber.assertValues(5, 6, 7, 8, 9, 10);

3.2. The skipWhile Operator

Whenever we want to filter out all the first values emitted by an Observable that fail a filtering predicate, we can use the skipWhile operator:

Observable sourceObservable = Observable .just(1, 2, 3, 4, 5, 4, 3, 2, 1); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .skipWhile(i -> i < 4); filteredObservable.subscribe(subscriber); subscriber.assertValues(4, 5, 4, 3, 2, 1);

3.3. The skipLast Operator

The skipLast operator allows us to skip the final items emitted by the Observable accepting only those emitted before them.

With this, we can, for example, skip the last five items:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.skipLast(5); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3, 4, 5);

3.4. distinct and distinctUntilChanged Operators

The distinct operator returns an Observable that emits all the items emitted by the sourceObservable that are distinct:

Observable sourceObservable = Observable .just(1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber subscriber = new TestSubscriber(); Observable distinctObservable = sourceObservable.distinct(); distinctObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

However, if we want to obtain an Observable that emits all the items emitted by the sourceObservable that are distinct from their immediate predecessor, we can use the distinctUntilChanged operator:

Observable sourceObservable = Observable .just(1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber subscriber = new TestSubscriber(); Observable distinctObservable = sourceObservable.distinctUntilChanged(); distinctObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 1, 3, 1);

3.5. The ignoreElements Operator

Whenever we want to ignore all the elements emitted by the sourceObservable, we can simply use the ignoreElements:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable ignoredObservable = sourceObservable.ignoreElements(); ignoredObservable.subscribe(subscriber); subscriber.assertNoValues();

4. Time Filtering Operators

When working with observable sequence, the time axis is unknown but sometimes getting timely data from a sequence could be useful.

With this purpose, RxJava offers a few methods that allow us to work with Observable using also the time axis.

Before moving on to the first one, let's define a timed Observable that will emit an item every second:

TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = Observable .just(1, 2, 3, 4, 5, 6) .zipWith(Observable.interval( 0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);

The TestScheduler is a special scheduler that allows advancing the clock manually at whatever pace we prefer.

4.1. sample and throttleLast Operators

The sample operator filters the timedObservable, returning an Observable that emits the most recent items emitted by this API within period time intervals.

Let's see how we can sample the timedObservable, filtering only the last emitted item every 2.5 seconds:

TestSubscriber subscriber = new TestSubscriber(); Observable sampledObservable = timedObservable .sample(2500L, TimeUnit.MILLISECONDS, testScheduler); sampledObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(3, 5, 6);

This kind of behavior can be achieved also using the throttleLast operator.

4.2. The throttleFirst Operator

The throttleFirst operator differs from throttleLast/sample since it emits the first item emitted by the timedObservable in each sampling period instead of the most recently emitted one.

Let's see how we can emit the first items, using a sampling period of 4 seconds:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .throttleFirst(4100L, TimeUnit.SECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(1, 6);

4.3. debounce and throttleWithTimeout Operators

With the debounce operator, it's possible to emit only an item if a particular timespan has passed without emitting another item.

Therefore, if we select a timespan that is greater than the time interval between the emitted items of the timedObservable, it will only emit the last one. On the other hand, if it's smaller, it will emit all the items emitted by the timedObservable.

Let's see what happens in the first scenario:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .debounce(2000L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValue(6);

This kind of behavior can also be achieved using throttleWithTimeout.

4.4. The timeout Operator

The timeout operator mirrors the source Observable, but issue a notification error, aborting the emission of items, if the source Observable fails to emit any items during a specified time interval.

Let's see what happens if we specify a timeout of 500 milliseconds to our timedObservable:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .timeout(500L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertError(TimeoutException.class); subscriber.assertValues(1);

5. Multiple Observable Filtering

When working with Observable, it's definitely possible to decide if filtering or skipping items based on a second Observable.

Before moving on, let's define a delayedObservable, that will emit only 1 item after 3 seconds:

Observable delayedObservable = Observable.just(1) .delay(3, TimeUnit.SECONDS, testScheduler);

Let's start with takeUntil operator.

5.1. The takeUntil Operator

The takeUntil operator discards any item emitted by the source Observable (timedObservable) after a second Observable (delayedObservable) emits an item or terminates:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .skipUntil(delayedObservable); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(4, 5, 6);

5.2. The skipUntil Operator

On the other hand, skipUntil discards any item emitted by the source Observable (timedObservable) until a second Observable (delayedObservable) emits an item:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .takeUntil(delayedObservable); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(1, 2, 3);

6. Conclusion

In this extensive tutorial, we explored the different filtering operators available within RxJava, providing a simple example of each one.

As always, all the code examples in this article can be found over on GitHub.