Anleitung zu CountDownLatch in Java

1. Einleitung

In diesem Artikel geben wir eine Anleitung zur CountDownLatch- Klasse und zeigen anhand einiger praktischer Beispiele, wie sie verwendet werden kann.

Im Wesentlichen können wir mithilfe eines CountDownLatch einen Thread blockieren lassen, bis andere Threads eine bestimmte Aufgabe abgeschlossen haben.

2. Verwendung in der gleichzeitigen Programmierung

Einfach ausgedrückt, ein CountDownLatch hat ein Gegenfeld, das Sie verringern können , wie wir benötigen. Wir können es dann verwenden, um einen aufrufenden Thread zu blockieren, bis er auf Null heruntergezählt wurde.

Wenn wir eine parallele Verarbeitung durchführen, können wir den CountDownLatch mit demselben Wert für den Zähler instanziieren wie eine Reihe von Threads, über die wir arbeiten möchten. Dann könnten wir einfach countdown () aufrufen, nachdem jeder Thread beendet wurde, um sicherzustellen, dass ein abhängiger Thread, der await () aufruft , blockiert wird, bis die Worker-Threads beendet sind.

3. Warten auf den Abschluss eines Thread-Pools

Probieren Sie dieses Muster aus, indem Sie einen Worker erstellen und mithilfe eines CountDownLatch- Felds signalisieren, wann es abgeschlossen ist:

public class Worker implements Runnable { private List outputScraper; private CountDownLatch countDownLatch; public Worker(List outputScraper, CountDownLatch countDownLatch) { this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run() { doSomeWork(); outputScraper.add("Counted down"); countDownLatch.countDown(); } }

Erstellen wir dann einen Test, um zu beweisen, dass ein CountDownLatch auf den Abschluss der Worker- Instanzen warten kann :

@Test public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); outputScraper.add("Latch released"); assertThat(outputScraper) .containsExactly( "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released" ); }

Natürlich ist "Latch freigegeben" immer die letzte Ausgabe - da dies von der CountDownLatch- Freigabe abhängt .

Beachten Sie, dass wir die Reihenfolge der Ausführung der Threads nicht garantieren können , wenn wir await () nicht aufrufen , sodass der Test zufällig fehlschlägt.

4. Ein Pool von Themen, die darauf warten, zu beginnen

Wenn wir das vorherige Beispiel genommen haben, aber diesmal Tausende von Threads anstelle von fünf gestartet haben, ist es wahrscheinlich, dass viele der früheren Threads die Verarbeitung abgeschlossen haben, bevor wir bei den späteren überhaupt start () aufgerufen haben. Dies könnte es schwierig machen, ein Parallelitätsproblem zu reproduzieren, da wir nicht in der Lage wären, alle unsere Threads parallel auszuführen.

Um dies zu umgehen, lassen Sie den CountdownLatch anders funktionieren als im vorherigen Beispiel. Anstatt einen übergeordneten Thread zu blockieren, bis einige untergeordnete Threads abgeschlossen sind, können wir jeden untergeordneten Thread blockieren, bis alle anderen gestartet wurden.

Ändern wir unsere run () -Methode so, dass sie vor der Verarbeitung blockiert wird:

public class WaitingWorker implements Runnable { private List outputScraper; private CountDownLatch readyThreadCounter; private CountDownLatch callingThreadBlocker; private CountDownLatch completedThreadCounter; public WaitingWorker( List outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) { this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completedThreadCounter; } @Override public void run() { readyThreadCounter.countDown(); try { callingThreadBlocker.await(); doSomeWork(); outputScraper.add("Counted down"); } catch (InterruptedException e) { e.printStackTrace(); } finally { completedThreadCounter.countDown(); } } }

Lassen Sie uns nun unseren Test so ändern, dass er blockiert, bis alle Arbeiter gestartet sind, die Arbeiter entsperrt und dann blockiert, bis die Arbeiter fertig sind:

@Test public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch readyThreadCounter = new CountDownLatch(5); CountDownLatch callingThreadBlocker = new CountDownLatch(1); CountDownLatch completedThreadCounter = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new WaitingWorker( outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) .limit(5) .collect(toList()); workers.forEach(Thread::start); readyThreadCounter.await(); outputScraper.add("Workers ready"); callingThreadBlocker.countDown(); completedThreadCounter.await(); outputScraper.add("Workers complete"); assertThat(outputScraper) .containsExactly( "Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete" ); }

Dieses Muster ist sehr nützlich, um Parallelitätsfehler zu reproduzieren, da Tausende von Threads gezwungen werden können, eine Logik parallel auszuführen.

5. CountdownLatch vorzeitig beenden

Manchmal kann es vorkommen, dass die Worker fälschlicherweise beendet werden, bevor der CountDownLatch heruntergezählt wird . Dies könnte dazu führen, dass es niemals Null erreicht und warte () niemals endet:

@Override public void run() { if (true) { throw new RuntimeException("Oh dear, I'm a BrokenWorker"); } countDownLatch.countDown(); outputScraper.add("Counted down"); }

Lassen Sie uns unseren früheren Test so ändern, dass er einen BrokenWorker verwendet, um zu zeigen, wie await () für immer blockiert:

@Test public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); }

Dies ist eindeutig nicht das gewünschte Verhalten - es wäre viel besser, wenn die Anwendung fortgesetzt würde, als sie unendlich zu blockieren.

Um dies zu umgehen, fügen wir unserem Aufruf zum Warten () ein Timeout-Argument hinzu .

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS); assertThat(completed).isFalse();

Wie wir sehen können, wird der Test irgendwann eine Zeitüberschreitung aufweisen und wait () wird false zurückgeben .

6. Fazit

In dieser Kurzanleitung haben wir gezeigt, wie wir einen CountDownLatch verwenden können, um einen Thread zu blockieren, bis andere Threads die Verarbeitung abgeschlossen haben.

Wir haben auch gezeigt, wie es zum Debuggen von Parallelitätsproblemen verwendet werden kann, indem sichergestellt wird, dass Threads parallel ausgeführt werden.

Die Implementierung dieser Beispiele finden Sie auf GitHub. Dies ist ein Maven-basiertes Projekt und sollte daher so wie es ist einfach auszuführen sein.