Einführung in Kotlin Coroutines

1. Übersicht

In diesem Artikel werden wir uns Coroutinen aus der Kotlin-Sprache ansehen. Einfach ausgedrückt, Coroutinen ermöglichen es uns, asynchrone Programme auf sehr flüssige Weise zu erstellen , und sie basieren auf dem Konzept der Continuation-Passing- Programmierung.

Die Kotlin-Sprache gibt uns grundlegende Konstrukte, kann aber mit der Kotlinx-Coroutines-Core- Bibliothek auf nützlichere Coroutinen zugreifen . Wir werden uns diese Bibliothek ansehen, sobald wir die Grundbausteine ​​der Kotlin-Sprache verstanden haben.

2. Erstellen einer Coroutine mit BuildSequence

Erstellen wir eine erste Coroutine mit der Funktion buildSequence .

Und implementieren wir einen Fibonacci-Sequenzgenerator mit dieser Funktion:

val fibonacciSeq = buildSequence { var a = 0 var b = 1 yield(1) while (true) { yield(a + b) val tmp = a + b a = b b = tmp } }

Die Signatur einer Ertragsfunktion lautet:

public abstract suspend fun yield(value: T)

Das Schlüsselwort suspend bedeutet, dass diese Funktion blockiert werden kann. Eine solche Funktion kann eine buildSequence- Coroutine aussetzen .

Suspending-Funktionen können als Standard-Kotlin-Funktionen erstellt werden. Wir müssen uns jedoch bewusst sein, dass wir sie nur innerhalb einer Coroutine aufrufen können. Andernfalls wird ein Compilerfehler angezeigt.

Wenn wir den Aufruf innerhalb der buildSequence angehalten haben, wird dieser Aufruf in den dedizierten Status in der Statusmaschine umgewandelt. Eine Coroutine kann wie jede andere Funktion übergeben und einer Variablen zugewiesen werden.

In der FibonacciSeq- Coroutine haben wir zwei Aufhängepunkte. Erstens, wenn wir Yield (1) aufrufen, und zweitens, wenn wir Yield (a + b) aufrufen .

Wenn diese Ertragsfunktion zu einem blockierenden Aufruf führt, blockiert der aktuelle Thread ihn nicht. Es kann einen anderen Code ausführen. Sobald die angehaltene Funktion ihre Ausführung beendet hat, kann der Thread die Ausführung der fibonacciSeq- Coroutine fortsetzen.

Wir können unseren Code testen, indem wir einige Elemente aus der Fibonacci-Sequenz übernehmen:

val res = fibonacciSeq .take(5) .toList() assertEquals(res, listOf(1, 1, 2, 3, 5))

3. Hinzufügen der Maven-Abhängigkeit für Kotlinx-Coroutinen

Schauen wir uns die Kotlinx-Coroutines- Bibliothek an, die nützliche Konstrukte enthält, die auf grundlegenden Coroutines aufbauen.

Fügen wir die Abhängigkeit zur Kotlinx-Coroutines-Core- Bibliothek hinzu. Beachten Sie, dass wir auch das jcenter- Repository hinzufügen müssen :

 org.jetbrains.kotlinx kotlinx-coroutines-core 0.16    central //jcenter.bintray.com  

4. Asynchrone Programmierung mit der Start () C oroutine

Die kotlinx-coroutines- Bibliothek fügt viele nützliche Konstrukte hinzu, mit denen wir asynchrone Programme erstellen können. Angenommen, wir haben eine teure Berechnungsfunktion, die einen String an die Eingabeliste anfügt:

suspend fun expensiveComputation(res: MutableList) { delay(1000L) res.add("word!") }

Wir können eine Start- Coroutine verwenden, die diese Suspend-Funktion nicht blockierend ausführt. Wir müssen einen Thread-Pool als Argument an sie übergeben.

Die Startfunktion gibt eine Jobinstanz zurück, für die wir eine join () -Methode aufrufen können , um auf die Ergebnisse zu warten:

@Test fun givenAsyncCoroutine_whenStartIt_thenShouldExecuteItInTheAsyncWay() { // given val res = mutableListOf() // when runBlocking { val promise = launch(CommonPool) { expensiveComputation(res) } res.add("Hello,") promise.join() } // then assertEquals(res, listOf("Hello,", "word!")) }

Um unseren Code testen zu können, übergeben wir die gesamte Logik an die runBlocking- Coroutine - ein blockierender Aufruf. Daher kann unsere assertEquals () synchron nach dem Code innerhalb der runBlocking () -Methode ausgeführt werden.

Beachten Sie, dass in diesem Beispiel die Methode launch () zwar zuerst ausgelöst wird, es sich jedoch um eine verzögerte Berechnung handelt. Der Hauptthread wird fortgesetzt, indem die Zeichenfolge "Hallo" an die Ergebnisliste angehängt wird.

Nach der Verzögerung von einer Sekunde, die in der Funktion teuerComputation () eingeführt wird, wird das "Wort!" Der String wird an das Ergebnis angehängt.

5. Coroutinen sind sehr leicht

Stellen wir uns eine Situation vor, in der wir 100000 Operationen asynchron ausführen möchten. Das Laichen einer so hohen Anzahl von Threads ist sehr kostspielig und führt möglicherweise zu einer OutOfMemoryException.

Glücklicherweise ist dies bei Verwendung der Coroutinen nicht der Fall. Wir können so viele Blockierungsvorgänge ausführen, wie wir möchten. Unter der Haube werden diese Vorgänge von einer festen Anzahl von Threads ohne übermäßige Thread-Erstellung ausgeführt:

@Test fun givenHugeAmountOfCoroutines_whenStartIt_thenShouldExecuteItWithoutOutOfMemory() { runBlocking { // given val counter = AtomicInteger(0) val numberOfCoroutines = 100_000 // when val jobs = List(numberOfCoroutines) { launch(CommonPool) { delay(1000L) counter.incrementAndGet() } } jobs.forEach { it.join() } // then assertEquals(counter.get(), numberOfCoroutines) } }

Note that we're executing 100,000 coroutines and each run adds a substantial delay. Nevertheless, there is no need to create too many threads because those operations are executed in an asynchronous way using thread from the CommonPool.

6. Cancellation and Timeouts

Sometimes, after we have triggered some long-running asynchronous computation, we want to cancel it because we're no longer interested in the result.

When we start our asynchronous action with the launch() coroutine, we can examine the isActive flag. This flag is set to false whenever the main thread invokes the cancel() method on the instance of the Job:

@Test fun givenCancellableJob_whenRequestForCancel_thenShouldQuit() { runBlocking { // given val job = launch(CommonPool) { while (isActive) { println("is working") } } delay(1300L) // when job.cancel() // then cancel successfully } }

This is a very elegant and easy way to use the cancellation mechanism. In the asynchronous action, we only need to check if the isActive flag is equal to false and cancel our processing.

When we're requesting some processing and are not sure how much time that computation will take, it's advisable to set the timeout on such an action. If the processing does not finish within the given timeout, we'll get an exception, and we can react to it appropriately.

For example, we can retry the action:

@Test(expected = CancellationException::class) fun givenAsyncAction_whenDeclareTimeout_thenShouldFinishWhenTimedOut() { runBlocking { withTimeout(1300L) { repeat(1000) { i -> println("Some expensive computation $i ...") delay(500L) } } } }

If we do not define a timeout, it's possible that our thread will be blocked forever because that computation will hang. We cannot handle that case in our code if the timeout is not defined.

7. Running Asynchronous Actions Concurrently

Let's say that we need to start two asynchronous actions concurrently and wait for their results afterward. If our processing takes one second and we need to execute that processing twice, the runtime of synchronous blocking execution will be two seconds.

It would be better if we could run both those actions in separate threads and wait for those results in the main thread.

We can leverage the async() coroutine to achieve this by starting processing in two separate threads concurrently:

@Test fun givenHaveTwoExpensiveAction_whenExecuteThemAsync_thenTheyShouldRunConcurrently() { runBlocking { val delay = 1000L val time = measureTimeMillis { // given val one = async(CommonPool) { someExpensiveComputation(delay) } val two = async(CommonPool) { someExpensiveComputation(delay) } // when runBlocking { one.await() two.await() } } // then assertTrue(time < delay * 2) } }

After we submit the two expensive computations, we suspend the coroutine by executing the runBlocking() call. Once results one and two are available, the coroutine will resume, and the results are returned. Executing two tasks in this way should take around one second.

We can pass CoroutineStart.LAZY as the second argument to the async() method, but this will mean the asynchronous computation will not be started until requested. Because we are requesting computation in the runBlocking coroutine, it means the call to two.await() will be made only once the one.await() has finished:

@Test fun givenTwoExpensiveAction_whenExecuteThemLazy_thenTheyShouldNotConcurrently() { runBlocking { val delay = 1000L val time = measureTimeMillis { // given val one = async(CommonPool, CoroutineStart.LAZY) { someExpensiveComputation(delay) } val two = async(CommonPool, CoroutineStart.LAZY) { someExpensiveComputation(delay) } // when runBlocking { one.await() two.await() } } // then assertTrue(time > delay * 2) } }

The laziness of the execution in this particular example causes our code to run synchronously. That happens because when we call await(), the main thread is blocked and only after task one finishes task two will be triggered.

We need to be aware of performing asynchronous actions in a lazy way as they may run in a blocking way.

8. Conclusion

In this article, we looked at basics of Kotlin coroutines.

We saw that buildSequence is the main building block of every coroutine. We described how the flow of execution in this Continuation-passing programming style looks.

Finally, we looked at the kotlinx-coroutines library that ships a lot of very useful constructs for creating asynchronous programs.

Die Implementierung all dieser Beispiele und Codefragmente finden Sie im GitHub-Projekt.