Einführung in Exchanger in Java

1. Übersicht

In diesem Tutorial werden wir uns mit java.util.concurrent.Exchanger befassen. Dies ist ein gemeinsamer Punkt für zwei Threads in Java, um Objekte zwischen ihnen auszutauschen.

2. Einführung in den Tauscher

Die Exchanger- Klasse in Java kann verwendet werden, um Objekte zwischen zwei Threads vom Typ T zu teilen . Die Klasse bietet nur einen einzigen überladenen Methodenaustausch (T t) .

Beim Aufrufen wartet Exchange darauf, dass der andere Thread im Paar ihn ebenfalls aufruft. Zu diesem Zeitpunkt stellt der zweite Thread fest, dass der erste Thread mit seinem Objekt wartet. Der Thread tauscht die Objekte aus, die er hält, und signalisiert den Austausch. Jetzt können sie zurückkehren.

Schauen wir uns ein Beispiel an, um den Nachrichtenaustausch zwischen zwei Threads mit Exchanger zu verstehen :

@Test public void givenThreads_whenMessageExchanged_thenCorrect() { Exchanger exchanger = new Exchanger(); Runnable taskA = () -> { try { String message = exchanger.exchange("from A"); assertEquals("from B", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; Runnable taskB = () -> { try { String message = exchanger.exchange("from B"); assertEquals("from A", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture.allOf( runAsync(taskA), runAsync(taskB)).join(); }

Hier haben wir die zwei Threads, die über den gemeinsamen Austauscher Nachrichten untereinander austauschen. Sehen wir uns ein Beispiel an, in dem wir ein Objekt aus dem Hauptthread gegen einen neuen Thread austauschen:

@Test public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException { Exchanger exchanger = new Exchanger(); Runnable runner = () -> { try { String message = exchanger.exchange("from runner"); assertEquals("to runner", message); } catch (InterruptedException e) { Thread.currentThread.interrupt(); throw new RuntimeException(e); } }; CompletableFuture result = CompletableFuture.runAsync(runner); String msg = exchanger.exchange("to runner"); assertEquals("from runner", msg); result.join(); }

Beachten Sie, dass wir zuerst den Runner- Thread starten und später exchange () im Haupt-Thread aufrufen müssen .

Beachten Sie außerdem, dass der Aufruf des ersten Threads möglicherweise eine Zeitüberschreitung aufweist, wenn der zweite Thread den Austauschzeitpunkt nicht erreicht. Wie lange der erste Thread warten soll, kann über den überlasteten Austausch gesteuert werden (T t, langes Timeout, TimeUnit timeUnit).

3. Kein GC-Datenaustausch

Exchanger kann verwendet werden, um Pipeline-Muster zu erstellen, bei denen Daten von einem Thread zum anderen übertragen werden. In diesem Abschnitt erstellen wir einen einfachen Stapel von Threads, die kontinuierlich Daten als Pipeline untereinander übertragen.

@Test public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException { Exchanger
    
      readerExchanger = new Exchanger(); Exchanger
     
       writerExchanger = new Exchanger(); Runnable reader = () -> { Queue readerBuffer = new ConcurrentLinkedQueue(); while (true) { readerBuffer.add(UUID.randomUUID().toString()); if (readerBuffer.size() >= BUFFER_SIZE) { readerBuffer = readerExchanger.exchange(readerBuffer); } } }; Runnable processor = () -> { Queue processorBuffer = new ConcurrentLinkedQueue(); Queue writerBuffer = new ConcurrentLinkedQueue(); processorBuffer = readerExchanger.exchange(processorBuffer); while (true) { writerBuffer.add(processorBuffer.poll()); if (processorBuffer.isEmpty()) { processorBuffer = readerExchanger.exchange(processorBuffer); writerBuffer = writerExchanger.exchange(writerBuffer); } } }; Runnable writer = () -> { Queue writerBuffer = new ConcurrentLinkedQueue(); writerBuffer = writerExchanger.exchange(writerBuffer); while (true) { System.out.println(writerBuffer.poll()); if (writerBuffer.isEmpty()) { writerBuffer = writerExchanger.exchange(writerBuffer); } } }; CompletableFuture.allOf( runAsync(reader), runAsync(processor), runAsync(writer)).join(); }
     
    

Hier haben wir drei Themen: Leser , Prozessor und Schreiber . Zusammen arbeiten sie als eine einzige Pipeline, die Daten zwischen ihnen austauscht.

Der readerExchanger wird zwischen dem Reader und dem Prozessorthread geteilt , während der writerExchanger zwischen dem Prozessor und dem Writer- Thread geteilt wird.

Beachten Sie, dass das Beispiel hier nur zur Demonstration dient. Wir müssen vorsichtig sein, wenn wir mit while (true) Endlosschleifen erstellen . Um den Code lesbar zu halten, haben wir einige Ausnahmen weggelassen.

Dieses Muster des Datenaustauschs während der Wiederverwendung des Puffers ermöglicht eine geringere Speicherbereinigung. Die Exchange-Methode gibt dieselben Warteschlangeninstanzen zurück, sodass für diese Objekte kein GC vorhanden ist. Im Gegensatz zu einer blockierenden Warteschlange erstellt der Austauscher keine Knoten oder Objekte zum Speichern und Freigeben von Daten.

Das Erstellen einer solchen Pipeline ähnelt dem Disrupter-Muster, mit einem wesentlichen Unterschied, dass das Disrupter-Muster mehrere Produzenten und Konsumenten unterstützt, während ein Austauscher zwischen einem Paar von Konsumenten und Produzenten verwendet werden könnte.

4. Fazit

Wir haben also gelernt, was Exchanger in Java ist, wie es funktioniert, und wir haben gesehen, wie die Exchanger- Klasse verwendet wird. Außerdem haben wir eine Pipeline erstellt und den GC-freien Datenaustausch zwischen Threads demonstriert.

Wie immer ist der Code auf GitHub verfügbar.