CyclicBarrier in Java

1. Einleitung

CyclicBarriers sind Synchronisationskonstrukte, die mit Java 5 als Teil des Pakets java.util.concurrent eingeführt wurden .

In diesem Artikel werden wir diese Implementierung in einem Parallelitätsszenario untersuchen.

2. Java Concurrency - Synchronizer

Das Paket java.util.concurrent enthält mehrere Klassen, mit deren Hilfe eine Reihe von Threads verwaltet werden können, die miteinander zusammenarbeiten. Einige davon sind:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Tauscher
  • Semaphor
  • SynchronousQueue

Diese Klassen bieten sofort einsatzbereite Funktionen für allgemeine Interaktionsmuster zwischen Threads.

Wenn wir eine Reihe von Threads haben, die miteinander kommunizieren und einem der gängigen Muster ähneln, können wir einfach die entsprechenden Bibliotheksklassen (auch als Synchronisierer bezeichnet ) wiederverwenden, anstatt zu versuchen, mithilfe einer Reihe von Sperren und Bedingungen ein benutzerdefiniertes Schema zu erstellen Objekte und das synchronisierte Schlüsselwort.

Konzentrieren wir uns in Zukunft auf den CyclicBarrier .

3. CyclicBarrier

Ein CyclicBarrier ist ein Synchronisierer, mit dem eine Reihe von Threads darauf warten können, dass sie einen gemeinsamen Ausführungspunkt erreichen, der auch als Barriere bezeichnet wird .

CyclicBarriers werden in Programmen verwendet, in denen eine feste Anzahl von Threads vorhanden ist, die darauf warten müssen, dass sie einen gemeinsamen Punkt erreichen, bevor sie mit der Ausführung fortfahren.

Die Barriere wird als zyklisch bezeichnet, da sie nach dem Lösen der wartenden Threads wiederverwendet werden kann.

4. Verwendung

Der Konstruktor für einen CyclicBarrier ist einfach. Es wird eine einzelne Ganzzahl benötigt, die die Anzahl der Threads angibt, die die await () -Methode für die Barriereinstanz aufrufen müssen, um das Erreichen des gemeinsamen Ausführungspunkts anzuzeigen:

public CyclicBarrier(int parties)

Die Threads, die ihre Ausführung synchronisieren müssen, werden auch als Parteien bezeichnet. Durch Aufrufen der Methode await () können wir registrieren, dass ein bestimmter Thread den Barrierepunkt erreicht hat.

Dieser Aufruf ist synchron und der Thread, der diese Methode aufruft, unterbricht die Ausführung, bis eine bestimmte Anzahl von Threads dieselbe Methode auf der Barriere aufgerufen hat. Diese Situation, in der die erforderliche Anzahl von Threads await () aufgerufen hat, wird als Auslösen der Barriere bezeichnet .

Optional können wir das zweite Argument an den Konstruktor übergeben, bei dem es sich um eine ausführbare Instanz handelt. Dies hat eine Logik, die vom letzten Thread ausgeführt wird, der die Barriere auslöst:

public CyclicBarrier(int parties, Runnable barrierAction)

5. Implementierung

Um CyclicBarrier in Aktion zu sehen, betrachten wir das folgende Szenario:

Es gibt eine Operation, bei der eine feste Anzahl von Threads die entsprechenden Ergebnisse ausführt und in einer Liste speichert. Wenn alle Threads ihre Aktion beendet haben, beginnt einer von ihnen (normalerweise der letzte, der die Barriere auslöst) mit der Verarbeitung der Daten, die von jedem dieser Threads abgerufen wurden.

Implementieren wir die Hauptklasse, in der die gesamte Aktion ausgeführt wird:

public class CyclicBarrierDemo { private CyclicBarrier cyclicBarrier; private List
    
      partialResults = Collections.synchronizedList(new ArrayList()); private Random random = new Random(); private int NUM_PARTIAL_RESULTS; private int NUM_WORKERS; // ... }
    

Diese Klasse ist ziemlich einfach - NUM_WORKERS ist die Anzahl der Threads, die ausgeführt werden sollen, und NUM_PARTIAL_RESULTS ist die Anzahl der Ergebnisse, die jeder der Worker-Threads erzeugen wird.

Schließlich haben wir partielle Ergebnisse , die eine Liste sind, in der die Ergebnisse jedes dieser Arbeitsthreads gespeichert werden. Beachten Sie, dass es sich bei dieser Liste um eine SynchronizedList handelt, da mehrere Threads gleichzeitig darauf schreiben und die add () -Methode in einer einfachen ArrayList nicht threadsicher ist .

Lassen Sie uns nun die Logik jedes Worker-Threads implementieren:

public class CyclicBarrierDemo { // ... class NumberCruncherThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); List partialResult = new ArrayList(); // Crunch some numbers and store the partial result for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) { Integer num = random.nextInt(10); System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num); partialResult.add(num); } partialResults.add(partialResult); try { System.out.println(thisThreadName + " waiting for others to reach barrier."); cyclicBarrier.await(); } catch (InterruptedException e) { // ... } catch (BrokenBarrierException e) { // ... } } } }

Wir werden jetzt die Logik implementieren, die ausgeführt wird, wenn die Barriere ausgelöst wurde.

Um die Dinge einfach zu halten, fügen wir einfach alle Zahlen in die Teilergebnisliste ein:

public class CyclicBarrierDemo { // ... class AggregatorThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); System.out.println( thisThreadName + ": Computing sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each."); int sum = 0; for (List threadResult : partialResults) { System.out.print("Adding "); for (Integer partialResult : threadResult) { System.out.print(partialResult+" "); sum += partialResult; } System.out.println(); } System.out.println(thisThreadName + ": Final result = " + sum); } } }

Der letzte Schritt wäre, den CyclicBarrier zu konstruieren und die Dinge mit einer main () -Methode zu starten :

public class CyclicBarrierDemo { // Previous code public void runSimulation(int numWorkers, int numberOfPartialResults) { NUM_PARTIAL_RESULTS = numberOfPartialResults; NUM_WORKERS = numWorkers; cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread()); System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each"); for (int i = 0; i < NUM_WORKERS; i++) { Thread worker = new Thread(new NumberCruncherThread()); worker.setName("Thread " + i); worker.start(); } } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.runSimulation(5, 3); } } 

Im obigen Code haben wir die zyklische Barriere mit 5 Threads initialisiert, die jeweils 3 Ganzzahlen als Teil ihrer Berechnung erzeugen und diese in der resultierenden Liste speichern.

Sobald die Barriere ausgelöst wurde, führt der letzte Thread, der die Barriere ausgelöst hat, die im AggregatorThread angegebene Logik aus: Fügen Sie alle von den Threads erzeugten Zahlen hinzu.

6. Ergebnisse

Hier ist die Ausgabe einer Ausführung des obigen Programms - jede Ausführung kann zu unterschiedlichen Ergebnissen führen, da die Threads in einer anderen Reihenfolge erzeugt werden können:

Spawning 5 worker threads to compute 3 partial results each Thread 0: Crunching some numbers! Final result - 6 Thread 0: Crunching some numbers! Final result - 2 Thread 0: Crunching some numbers! Final result - 2 Thread 0 waiting for others to reach barrier. Thread 1: Crunching some numbers! Final result - 2 Thread 1: Crunching some numbers! Final result - 0 Thread 1: Crunching some numbers! Final result - 5 Thread 1 waiting for others to reach barrier. Thread 3: Crunching some numbers! Final result - 6 Thread 3: Crunching some numbers! Final result - 4 Thread 3: Crunching some numbers! Final result - 0 Thread 3 waiting for others to reach barrier. Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 0 Thread 2 waiting for others to reach barrier. Thread 4: Crunching some numbers! Final result - 9 Thread 4: Crunching some numbers! Final result - 3 Thread 4: Crunching some numbers! Final result - 5 Thread 4 waiting for others to reach barrier. Thread 4: Computing final sum of 5 workers, having 3 results each. Adding 6 2 2 Adding 2 0 5 Adding 6 4 0 Adding 1 1 0 Adding 9 3 5 Thread 4: Final result = 46 

Wie die obige Ausgabe zeigt, ist Thread 4 derjenige, der die Barriere auslöst und auch die endgültige Aggregationslogik ausführt. Es ist auch nicht erforderlich, dass Threads tatsächlich in der Reihenfolge ausgeführt werden, in der sie gestartet wurden, wie das obige Beispiel zeigt.

7. Fazit

In diesem Artikel haben wir gesehen, was ein CyclicBarrier ist und in welchen Situationen er hilfreich ist.

Wir haben auch ein Szenario implementiert, in dem wir eine feste Anzahl von Threads benötigten, um einen festen Ausführungspunkt zu erreichen, bevor wir mit anderer Programmlogik fortfahren konnten.

Wie immer finden Sie den Code für das Tutorial auf GitHub.