Anleitung zum Java Phaser

1. Übersicht

In diesem Artikel betrachten wir das Phaser- Konstrukt aus dem Paket java.util.concurrent . Es ist ein sehr ähnliches Konstrukt wie CountDownLatch , mit dem wir die Ausführung von Threads koordinieren können. Im Vergleich zum CountDownLatch verfügt es über einige zusätzliche Funktionen.

Der Phaser ist eine Barriere, auf der die dynamische Anzahl von Threads warten muss, bevor die Ausführung fortgesetzt wird. Im CountDownLatch kann diese Nummer nicht dynamisch konfiguriert werden und muss beim Erstellen der Instanz angegeben werden.

2. Phaser API

Mit dem Phaser können wir eine Logik erstellen, in der Threads auf der Barriere warten müssen, bevor sie mit dem nächsten Ausführungsschritt fortfahren können .

Wir können mehrere Ausführungsphasen koordinieren und für jede Programmphase eine Phaser- Instanz wiederverwenden . Jede Phase kann eine andere Anzahl von Threads haben, die darauf warten, zu einer anderen Phase überzugehen. Wir werden uns später ein Beispiel für die Verwendung von Phasen ansehen.

Um an der Koordination teilnehmen zu können, muss sich der Thread bei der Phaser- Instanz registrieren () . Beachten Sie, dass dies nur die Anzahl der registrierten Parteien erhöht und wir nicht überprüfen können, ob der aktuelle Thread registriert ist. Wir müssten die Implementierung in Unterklassen unterteilen, um dies zu unterstützen.

Der Thread signalisiert, dass er an der Barriere angekommen ist, indem er arretAndAwaitAdvance () aufruft , eine blockierende Methode. Wenn die Anzahl der angekommenen Parteien gleich der Anzahl der registrierten Parteien ist, wird die Ausführung des Programms fortgesetzt und die Phasennummer erhöht. Wir können die aktuelle Phasennummer durch Aufrufen der Methode getPhase () abrufen .

Wenn der Thread seine Arbeit beendet hat, sollten wir die Methode arretAndDeregister () aufrufen , um zu signalisieren, dass der aktuelle Thread in dieser bestimmten Phase nicht mehr berücksichtigt werden soll.

3. Implementieren der Logik mithilfe der Phaser- API

Angenommen, wir möchten mehrere Aktionsphasen koordinieren. Drei Threads verarbeiten die erste Phase und zwei Threads verarbeiten die zweite Phase.

Wir werden eine LongRunningAction- Klasse erstellen , die die Runnable- Schnittstelle implementiert :

class LongRunningAction implements Runnable { private String threadName; private Phaser ph; LongRunningAction(String threadName, Phaser ph) { this.threadName = threadName; this.ph = ph; ph.register(); } @Override public void run() { ph.arriveAndAwaitAdvance(); try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } ph.arriveAndDeregister(); } }

Wenn unsere Aktionsklasse instanziiert wird, registrieren wir uns mit der Methode register () bei der Phaser- Instanz . Dadurch wird die Anzahl der Threads mit diesem bestimmten Phaser erhöht .

Der Aufruf von arretAndAwaitAdvance () bewirkt, dass der aktuelle Thread auf der Barriere wartet. Wie bereits erwähnt, wird die Ausführung fortgesetzt, wenn die Anzahl der angekommenen Parteien der Anzahl der registrierten Parteien entspricht.

Nach Abschluss der Verarbeitung hebt sich der aktuelle Thread durch Aufrufen der Methode arretAndDeregister () ab .

Erstellen wir einen Testfall, in dem wir drei LongRunningAction- Threads starten und die Barriere blockieren. Als nächstes erstellen wir nach Abschluss der Aktion zwei zusätzliche LongRunningAction- Threads, die die Verarbeitung der nächsten Phase durchführen.

Beim Erstellen einer Phaser- Instanz aus dem Hauptthread übergeben wir 1 als Argument. Dies entspricht dem Aufrufen der Methode register () aus dem aktuellen Thread. Wir tun dies, weil beim Erstellen von drei Arbeitsthreads der Hauptthread ein Koordinator ist und daher für den Phaser vier Threads registriert sein müssen:

ExecutorService executorService = Executors.newCachedThreadPool(); Phaser ph = new Phaser(1); assertEquals(0, ph.getPhase());

Die Phase nach der Initialisierung ist gleich Null.

Die Phaser- Klasse hat einen Konstruktor, in dem wir eine übergeordnete Instanz an sie übergeben können. Dies ist in Fällen nützlich, in denen eine große Anzahl von Parteien massive Kosten für Synchronisierungskonflikte verursachen würde. In solchen Situationen können Instanzen von Phasern so eingerichtet werden, dass Gruppen von Unterphasern ein gemeinsames übergeordnetes Element haben .

Als nächstes starten wir drei LongRunningAction- Aktionsthreads, die auf der Barriere warten, bis wir die Methode arretAndAwaitAdvance () vom Hauptthread aus aufrufen .

Denken Sie daran, dass wir unseren Phaser mit 1 initialisiert und register () noch dreimal aufgerufen haben. Jetzt haben drei Aktionsthreads angekündigt, dass sie an der Barriere angekommen sind. Daher ist ein weiterer Aufruf von arretAndAwaitAdvance () erforderlich - der aus dem Hauptthread:

executorService.submit(new LongRunningAction("thread-1", ph)); executorService.submit(new LongRunningAction("thread-2", ph)); executorService.submit(new LongRunningAction("thread-3", ph)); ph.arriveAndAwaitAdvance(); assertEquals(1, ph.getPhase());

Nach Abschluss dieser Phase gibt die Methode getPhase () eine zurück, da das Programm den ersten Ausführungsschritt verarbeitet hat.

Angenommen, zwei Threads sollten die nächste Verarbeitungsphase durchführen. Wir können Phaser nutzen , um dies zu erreichen, da wir so die Anzahl der Threads, die auf die Barriere warten sollen, dynamisch konfigurieren können. Wir starten zwei neue Threads, aber diese werden erst ausgeführt, wenn der Hauptthread (wie im vorherigen Fall) den Befehl arrAndAwaitAdvance () aufruft :

executorService.submit(new LongRunningAction("thread-4", ph)); executorService.submit(new LongRunningAction("thread-5", ph)); ph.arriveAndAwaitAdvance(); assertEquals(2, ph.getPhase()); ph.arriveAndDeregister();

Danach gibt die Methode getPhase () eine Phasennummer von zwei zurück. Wenn wir unser Programm beenden möchten, müssen wir die Methode arretAndDeregister () aufrufen , da der Hauptthread noch im Phaser registriert ist . Wenn die Deregistrierung die Zahl der registrierten Parteien verursacht Null zu werden, der Phaser ist beendet. Alle Aufrufe von Synchronisationsmethoden werden nicht mehr blockiert und kehren sofort zurück.

Durch Ausführen des Programms wird die folgende Ausgabe erzeugt (der vollständige Quellcode mit den Druckzeilenanweisungen befindet sich im Code-Repository):

This is phase 0 This is phase 0 This is phase 0 Thread thread-2 before long running action Thread thread-1 before long running action Thread thread-3 before long running action This is phase 1 This is phase 1 Thread thread-4 before long running action Thread thread-5 before long running action

Wir sehen, dass alle Threads auf die Ausführung warten, bis sich die Barriere öffnet. Die nächste Ausführungsphase wird nur ausgeführt, wenn die vorherige erfolgreich abgeschlossen wurde.

4. Fazit

In diesem Tutorial haben wir uns das Phaser- Konstrukt aus java.util.concurrent angesehen und die Koordinationslogik mit mehreren Phasen mithilfe der Phaser- Klasse implementiert .

Die Implementierung all dieser Beispiele und Codefragmente finden Sie im GitHub-Projekt - dies ist ein Maven-Projekt, daher sollte es einfach zu importieren und auszuführen sein, wie es ist.