Scheduler in RxJava

1. Übersicht

In diesem Artikel konzentrieren wir uns auf verschiedene Arten von Schedulern , die wir zum Schreiben von Multithreading-Programmen verwenden, die auf den Methoden subscribeOn und compareOn von RxJava Observable basieren .

Scheduler bieten die Möglichkeit anzugeben, wo und wann Aufgaben im Zusammenhang mit dem Betrieb einer Observable- Kette ausgeführt werden sollen.

Wir können einen Scheduler aus den Factory-Methoden erhalten, die in der Klasse Scheduler beschrieben sind.

2. Standard-Threading-Verhalten

Standardmäßig ist Rx Single-Threaded, was bedeutet, dass ein Observable und die Kette von Operatoren, die wir auf es anwenden können, seine Beobachter über denselben Thread benachrichtigen, für den seine subscribe () -Methode aufgerufen wird.

Die Methoden watchOn und subscribeOn verwenden als Argument einen Scheduler, der, wie der Name schon sagt, ein Tool ist, mit dem wir einzelne Aktionen planen können.

Wir erstellen unsere Implementierung eines Schedulers mithilfe der Methode create Worker , die einen Scheduler.Worker zurückgibt . Ein Worker akzeptiert Aktionen und führt sie nacheinander in einem einzelnen Thread aus.

In gewisser Weise ein Arbeiter ist ein S cheduler selbst, aber wir werden nicht auf sie als verweisen Scheduler um Verwirrung zu vermeiden.

2.1. Planen einer Aktion

Wir können einen Job in jedem Scheduler planen, indem wir einen neuen Worker erstellen und einige Aktionen planen:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> result += "action"); Assert.assertTrue(result.equals("action"));

Die Aktion wird dann in die Warteschlange des Threads gestellt, dem der Worker zugewiesen ist.

2.2. Aktion abbrechen

Scheduler.Worker erweitert das Abonnement . Wenn Sie die Abmeldemethode für einen Worker aufrufen , wird die Warteschlange geleert und alle ausstehenden Aufgaben werden abgebrochen. Wir können das am Beispiel sehen:

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += "First_Action"; worker.unsubscribe(); }); worker.schedule(() -> result += "Second_Action"); Assert.assertTrue(result.equals("First_Action"));

Die zweite Aufgabe wird nie ausgeführt, da die vorherige Aufgabe den gesamten Vorgang abgebrochen hat. Aktionen, die gerade ausgeführt wurden, werden unterbrochen.

3. Schedulers.newThread

Dieser Scheduler einfach startet einen neuen Thread jedes Mal , es über angefordert wird subscribeOn () oder observeOn () .

Es ist kaum eine gute Wahl, nicht nur wegen der Latenz beim Starten eines Threads, sondern auch, weil dieser Thread nicht wiederverwendet wird:

Observable.just("Hello") .observeOn(Schedulers.newThread()) .doOnNext(s -> result2 += Thread.currentThread().getName() ) .observeOn(Schedulers.newThread()) .subscribe(s -> result1 += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result1.equals("RxNewThreadScheduler-1")); Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

Wenn der Worker fertig ist, wird der Thread einfach beendet. Dieser Scheduler kann nur verwendet werden, wenn Aufgaben grobkörnig sind: Die Ausführung nimmt viel Zeit in Anspruch, es gibt jedoch nur sehr wenige, sodass Threads wahrscheinlich überhaupt nicht wiederverwendet werden.

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(3000); Assert.assertTrue(result.equals( "RxNewThreadScheduler-1_Start_End_worker_"));

Als wir den Worker auf einem NewThreadScheduler planten, stellten wir fest, dass der Worker an einen bestimmten Thread gebunden war.

4. Schedulers.immediate

Schedulers.immediate ist ein spezieller Scheduler, der eine Aufgabe innerhalb des Client-Threads nicht asynchron, sondern blockierend aufruft und nach Abschluss der Aktion zurückgibt:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(500); Assert.assertTrue(result.equals( "main_Start_worker__End"));

Tatsächlich hat das Abonnieren eines Observable über den unmittelbaren Scheduler in der Regel den gleichen Effekt wie das Nicht-Abonnieren eines bestimmten S- Chedulers:

Observable.just("Hello") .subscribeOn(Schedulers.immediate()) .subscribe(s -> result += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result.equals("main"));

5. Schedulers.trampoline

Der Trampolin- Scheduler ist dem Sofort- Scheduler sehr ähnlich, da er auch Aufgaben im selben Thread plant und so effektiv blockiert.

Die bevorstehende Aufgabe wird jedoch ausgeführt, wenn alle zuvor geplanten Aufgaben abgeschlossen sind:

Observable.just(2, 4, 6, 8) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Observable.just(1, 3, 5, 7, 9) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Thread.sleep(500); Assert.assertTrue(result.equals("246813579"));

Sofort ruft eine bestimmte Aufgabe sofort auf, während das Trampolin darauf wartet, dass die aktuelle Aufgabe beendet ist.

Die Trampolin ‚s Arbeiter führt jede Aufgabe auf den Thread, der die erste Aufgabe geplant. Der erste Aufruf zum Planen wird blockiert, bis die Warteschlange geleert ist:

Scheduler scheduler = Schedulers.trampoline(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "Start"; worker.schedule(() -> { result += "_middleStart"; worker.schedule(() -> result += "_worker_" ); result += "_middleEnd"; }); result += "_mainEnd"; }); Thread.sleep(500); Assert.assertTrue(result .equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Schedulers.from

Scheduler sind intern komplexer als Executors von java.util.concurrent - daher war eine separate Abstraktion erforderlich.

Da sie sich konzeptionell ziemlich ähnlich sind, gibt es nicht überraschend einen Wrapper, der Executor mithilfe der from- factory-Methode in Scheduler verwandeln kann :

private ThreadFactory threadFactory(String pattern) { return new ThreadFactoryBuilder() .setNameFormat(pattern) .build(); } @Test public void givenExecutors_whenSchedulerFrom_thenReturnElements() throws InterruptedException { ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched-A-%d")); Scheduler schedulerA = Schedulers.from(poolA); ExecutorService poolB = newFixedThreadPool( 10, threadFactory("Sched-B-%d")); Scheduler schedulerB = Schedulers.from(poolB); Observable observable = Observable.create(subscriber -> { subscriber.onNext("Alfa"); subscriber.onNext("Beta"); subscriber.onCompleted(); });; observable .subscribeOn(schedulerA) .subscribeOn(schedulerB) .subscribe( x -> result += Thread.currentThread().getName() + x + "_", Throwable::printStackTrace, () -> result += "_Completed" ); Thread.sleep(2000); Assert.assertTrue(result.equals( "Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

SchedulerB wird für einen kurzen Zeitraum verwendet, plant jedoch kaum eine neue Aktion auf SchedulerA , die die gesamte Arbeit erledigt. Daher werden mehrere subscribeOn-Methoden nicht nur ignoriert, sondern verursachen auch einen geringen Overhead.

7. Schedulers.io

This Scheduler is similar to the newThread except for the fact that already started threads are recycled and can possibly handle future requests.

This implementation works similarly to ThreadPoolExecutor from java.util.concurrent with an unbounded pool of threads. Every time a new worker is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused:

Observable.just("io") .subscribeOn(Schedulers.io()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxIoScheduler-2"));

We need to be careful with unbounded resources of any kind – in case of slow or unresponsive external dependencies like web services, ioscheduler might start an enormous number of threads, leading to our very own application becoming unresponsive.

In practice, following Schedulers.io is almost always a better choice.

8. Schedulers.computation

Computation Scheduler by default limits the number of threads running in parallel to the value of availableProcessors(), as found in the Runtime.getRuntime() utility class.

So we should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code.

It uses an unbounded queue in front of every thread, so if the task is scheduled, but all cores are occupied, it will be queued. However, the queue just before each thread will keep growing:

Observable.just("computation") .subscribeOn(Schedulers.computation()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxComputationScheduler-1"));

If for some reason, we need a different number of threads than the default, we can always use the rx.scheduler.max-computation-threads system property.

By taking fewer threads we can ensure that there is always one or more CPU cores idle, and even under heavy load, computation thread pool does not saturate the server. It's simply not possible to have more computation threads than cores.

9. Schedulers.test

This Scheduler is used only for testing purposes, and we'll never see it in production code. Its main advantage is the ability to advance the clock, simulating time passing by arbitrarily:

List letters = Arrays.asList("A", "B", "C"); TestScheduler scheduler = Schedulers.test(); TestSubscriber subscriber = new TestSubscriber(); Observable tick = Observable .interval(1, TimeUnit.SECONDS, scheduler); Observable.from(letters) .zipWith(tick, (string, index) -> index + "-" + string) .subscribeOn(scheduler) .subscribe(subscriber); subscriber.assertNoValues(); subscriber.assertNotCompleted(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); subscriber.assertNoErrors(); subscriber.assertValueCount(1); subscriber.assertValues("0-A"); scheduler.advanceTimeTo(3, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); subscriber.assertValueCount(3); assertThat( subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C"));

10. Default Schedulers

Some Observable operators in RxJava have alternate forms that allow us to set which Scheduler the operator will use for its operation. Others don't operate on any particular Scheduler or operate on a particular default Scheduler.

For example, the delay operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different Scheduler:

ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched1-")); Scheduler schedulerA = Schedulers.from(poolA); Observable.just('A', 'B') .delay(1, TimeUnit.SECONDS, schedulerA) .subscribe(i -> result+= Thread.currentThread().getName() + i + " "); Thread.sleep(2000); Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

Without supplying a custom schedulerA, all operators below delay would use the computation Scheduler.

Other important operators that support custom Schedulers are buffer, interval, range, timer, skip, take, timeout, and several others. If we don't provide a Scheduler to such operators, computation scheduler is utilized, which is a safe default in most cases.

11. Conclusion

In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus Schedulers are needed.

Das Beherrschen von Schedulern ist wichtig, um skalierbaren und sicheren Code mit RxJava zu schreiben. Der Unterschied zwischen SubscribeOn und ObservOn ist besonders wichtig bei hoher Last, bei der jede Aufgabe genau dann ausgeführt werden muss, wenn wir es erwarten.

Last but not least müssen wir sicher sein, dass die nachgeschalteten Scheduler mit der von Schedulers upstrea m generierten Anzeige Schritt halten können . Weitere Informationen finden Sie in diesem Artikel zum Gegendruck.

Die Implementierung all dieser Beispiele und Codefragmente finden Sie im GitHub-Projekt - dies ist ein Maven-Projekt, daher sollte es einfach zu importieren und auszuführen sein, wie es ist.