Eine Anleitung zu Java SynchronousQueue

1. Übersicht

In diesem Artikel betrachten wir die SynchronousQueue aus dem Paket java.util.concurrent .

Einfach ausgedrückt, ermöglicht diese Implementierung den Thread-sicheren Austausch von Informationen zwischen Threads.

2. API-Übersicht

Die SynchronousQueue hat nur zwei unterstützte Vorgänge: take () und put (), und beide blockieren .

Wenn wir beispielsweise der Warteschlange ein Element hinzufügen möchten, müssen wir die put () -Methode aufrufen . Diese Methode wird blockiert, bis ein anderer Thread die Methode take () aufruft und signalisiert, dass sie bereit ist, ein Element aufzunehmen.

Obwohl die SynchronousQueue eine Schnittstelle einer Warteschlange hat, sollten wir sie als Austauschpunkt für ein einzelnes Element zwischen zwei Threads betrachten, in dem ein Thread ein Element übergibt und ein anderer Thread dieses Element übernimmt.

3. Implementieren von Übergaben mithilfe einer gemeinsam genutzten Variablen

Um zu sehen, warum die SynchronousQueue so nützlich sein kann, implementieren wir eine Logik unter Verwendung einer gemeinsam genutzten Variablen zwischen zwei Threads. Als Nächstes schreiben wir diese Logik mithilfe von SynchronousQueue neu, wodurch unser Code viel einfacher und lesbarer wird.

Angenommen, wir haben zwei Threads - einen Produzenten und einen Konsumenten - und wenn der Produzent einen Wert für eine gemeinsam genutzte Variable festlegt, möchten wir diese Tatsache dem Konsumententhread signalisieren. Als Nächstes ruft der Consumer-Thread einen Wert aus einer gemeinsam genutzten Variablen ab.

Wir werden den CountDownLatch verwenden , um diese beiden Threads zu koordinieren, um zu verhindern, dass der Verbraucher auf einen Wert einer gemeinsam genutzten Variablen zugreift, der noch nicht festgelegt wurde.

Wir werden eine sharedState- Variable und einen CountDownLatch definieren , die zur Koordinierung der Verarbeitung verwendet werden:

ExecutorService executor = Executors.newFixedThreadPool(2); AtomicInteger sharedState = new AtomicInteger(); CountDownLatch countDownLatch = new CountDownLatch(1);

Der Produzent speichert eine zufällige Ganzzahl in der Variable sharedState und führt die Methode countDown () für countDownLatch aus, um dem Verbraucher zu signalisieren, dass er einen Wert aus dem sharedState abrufen kann:

Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); sharedState.set(producedElement); countDownLatch.countDown(); };

Der Verbraucher wartet mit der Methode await () auf den countDownLatch . Wenn der Produzent signalisiert, dass die Variable gesetzt wurde, ruft der Konsument sie aus dem sharedState ab:

Runnable consumer = () -> { try { countDownLatch.await(); Integer consumedElement = sharedState.get(); } catch (InterruptedException ex) { ex.printStackTrace(); } };

Zu guter Letzt starten wir unser Programm:

executor.execute(producer); executor.execute(consumer); executor.awaitTermination(500, TimeUnit.MILLISECONDS); executor.shutdown(); assertEquals(countDownLatch.getCount(), 0);

Es wird die folgende Ausgabe erzeugt:

Saving an element: -1507375353 to the exchange point consumed an element: -1507375353 from the exchange point

Wir können sehen, dass dies eine Menge Code ist, um eine so einfache Funktionalität wie den Austausch eines Elements zwischen zwei Threads zu implementieren. Im nächsten Abschnitt werden wir versuchen, es besser zu machen.

4. Implementieren von Handoffs mithilfe der SynchronousQueue

Lassen Sie uns nun die gleiche Funktionalität wie im vorherigen Abschnitt implementieren, jedoch mit einer SynchronousQueue. Es hat einen doppelten Effekt, da wir es zum Austauschen des Status zwischen Threads und zum Koordinieren dieser Aktion verwenden können, sodass wir außer SynchronousQueue nichts anderes verwenden müssen .

Zunächst definieren wir eine Warteschlange:

ExecutorService executor = Executors.newFixedThreadPool(2); SynchronousQueue queue = new SynchronousQueue();

Der Produzent ruft eine put () -Methode auf, die blockiert, bis ein anderer Thread ein Element aus der Warteschlange nimmt:

Runnable producer = () -> { Integer producedElement = ThreadLocalRandom .current() .nextInt(); try { queue.put(producedElement); } catch (InterruptedException ex) { ex.printStackTrace(); } };

Der Verbraucher ruft dieses Element einfach mit der Methode take () ab :

Runnable consumer = () -> { try { Integer consumedElement = queue.take(); } catch (InterruptedException ex) { ex.printStackTrace(); } };

Als nächstes starten wir unser Programm:

executor.execute(producer); executor.execute(consumer); executor.awaitTermination(500, TimeUnit.MILLISECONDS); executor.shutdown(); assertEquals(queue.size(), 0);

Es wird die folgende Ausgabe erzeugt:

Saving an element: 339626897 to the exchange point consumed an element: 339626897 from the exchange point

Wir können sehen, dass eine SynchronousQueue als Austauschpunkt zwischen den Threads verwendet wird. Dies ist viel besser und verständlicher als das vorherige Beispiel, in dem der gemeinsam genutzte Status zusammen mit einem CountDownLatch verwendet wurde.

5. Schlussfolgerung

In diesem kurzen Tutorial haben wir uns das SynchronousQueue- Konstrukt angesehen. Wir haben ein Programm erstellt, das Daten zwischen zwei Threads unter Verwendung des gemeinsam genutzten Status austauscht, und dieses Programm dann neu geschrieben, um das SynchronousQueue- Konstrukt zu nutzen. Dies dient als Austauschpunkt, der den Produzenten- und den Konsumententhread koordiniert.

Die Implementierung all dieser Beispiele und Codefragmente finden Sie im GitHub-Projekt - dies ist ein Maven-Projekt, daher sollte es einfach zu importieren und auszuführen sein, wie es ist.