Anleitung zu java.util.concurrent.BlockingQueue

1. Übersicht

In diesem Artikel werden wir uns eines der nützlichsten Konstrukte java.util.concurrent ansehen , um das gleichzeitige Producer-Consumer-Problem zu lösen. Wir werden uns eine API der BlockingQueue- Schnittstelle ansehen und wie Methoden dieser Schnittstelle das Schreiben gleichzeitiger Programme erleichtern.

Später in diesem Artikel zeigen wir ein Beispiel für ein einfaches Programm mit mehreren Producer-Threads und mehreren Consumer-Threads.

2. BlockingQueue- Typen

Wir können zwei Arten von BlockingQueue unterscheiden :

  • unbegrenzte Warteschlange - kann fast unbegrenzt wachsen
  • Begrenzte Warteschlange - mit maximaler Kapazität definiert

2.1. Ungebundene Warteschlange

Das Erstellen unbegrenzter Warteschlangen ist einfach:

BlockingQueue blockingQueue = new LinkedBlockingDeque();

Die Kapazität von blockingQueue wird auf Integer.MAX_VALUE festgelegt. Alle Vorgänge, die der unbegrenzten Warteschlange ein Element hinzufügen, werden niemals blockiert, sodass sie sehr groß werden können.

Das Wichtigste beim Entwerfen eines Produzenten-Konsumenten-Programms mit unbegrenzter BlockingQueue ist, dass Konsumenten Nachrichten so schnell konsumieren können sollten, wie Produzenten Nachrichten zur Warteschlange hinzufügen. Andernfalls könnte sich der Speicher füllen und wir würden eine OutOfMemory- Ausnahme erhalten.

2.2. Begrenzte Warteschlange

Der zweite Typ von Warteschlangen ist die begrenzte Warteschlange. Wir können solche Warteschlangen erstellen, indem wir die Kapazität als Argument an einen Konstruktor übergeben:

BlockingQueue blockingQueue = new LinkedBlockingDeque(10);

Hier haben wir eine blockierende Warteschlange mit einer Kapazität von 10. Dies bedeutet, dass ein Produzent versucht, ein Element zu einer bereits vollen Warteschlange hinzuzufügen, abhängig von einer Methode, mit der es hinzugefügt wurde ( quote () , add () oder put) () ) wird blockiert, bis Platz zum Einfügen eines Objekts verfügbar wird. Andernfalls schlagen die Vorgänge fehl.

Die Verwendung einer begrenzten Warteschlange ist eine gute Möglichkeit, um gleichzeitige Programme zu entwerfen, da diese Vorgänge beim Einfügen eines Elements in eine bereits vollständige Warteschlange warten müssen, bis die Verbraucher aufholen und Speicherplatz in der Warteschlange verfügbar machen. Es gibt uns Drosselung ohne Anstrengung von unserer Seite.

3. BlockingQueue API

Es gibt zwei Arten von Methoden in der BlockingQueue- Schnittstelle : Methoden, die für das Hinzufügen von Elementen zu einer Warteschlange verantwortlich sind, und Methoden, die diese Elemente abrufen. Jede Methode aus diesen beiden Gruppen verhält sich unterschiedlich, wenn die Warteschlange voll / leer ist.

3.1. Elemente hinzufügen

  • add () - gibt true zurück , wenn das Einfügen erfolgreich war, andernfalls wird eine IllegalStateException ausgelöst
  • put () - fügt das angegebene Element in eine Warteschlange ein und wartet bei Bedarf auf einen freien Steckplatz
  • quote () - gibt true zurück , wenn das Einfügen erfolgreich war, andernfalls false
  • Angebot (E e, lange Zeitüberschreitung, TimeUnit-Einheit) - versucht, ein Element in eine Warteschlange einzufügen, und wartet innerhalb eines bestimmten Zeitlimits auf einen verfügbaren Steckplatz

3.2. Elemente abrufen

  • take () - wartet auf ein head-Element einer Warteschlange und entfernt es. Wenn die Warteschlange leer ist, blockiert sie und wartet darauf, dass ein Element verfügbar wird
  • poll (lange Zeitüberschreitung, TimeUnit-Einheit) - Ruft den Kopf der Warteschlange ab und entfernt ihn. Warten Sie gegebenenfalls bis zur angegebenen Wartezeit, bis ein Element verfügbar ist. Gibt nach einer Zeitüberschreitung null zurück

Diese Methoden sind die wichtigsten Bausteine ​​der BlockingQueue- Schnittstelle beim Erstellen von Producer-Consumer-Programmen.

4. Multithread-Producer-Consumer-Beispiel

Lassen Sie uns ein Programm erstellen, das aus zwei Teilen besteht - einem Produzenten und einem Konsumenten.

Der Produzent erzeugt eine Zufallszahl von 0 bis 100 und fügt diese Zahl in eine BlockingQueue ein . Wir haben 4 Producer-Threads und blockieren mit der put () -Methode, bis in der Warteschlange Speicherplatz verfügbar ist.

Wichtig ist, dass wir verhindern müssen, dass unsere Consumer-Threads auf unbestimmte Zeit darauf warten, dass ein Element in einer Warteschlange angezeigt wird.

Eine gute Technik, um vom Produzenten an den Verbraucher zu signalisieren, dass keine Nachrichten mehr zu verarbeiten sind, besteht darin, eine spezielle Nachricht zu senden, die als Giftpille bezeichnet wird. Wir müssen so viele Giftpillen verschicken, wie wir Verbraucher haben. Wenn ein Verbraucher diese spezielle Giftpillen-Nachricht aus einer Warteschlange nimmt, wird die Ausführung ordnungsgemäß beendet.

Schauen wir uns eine Produzentenklasse an:

public class NumbersProducer implements Runnable { private BlockingQueue numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put(poisonPill); } } }

Unser Producer-Konstruktor verwendet als Argument die BlockingQueue , mit der die Verarbeitung zwischen Producer und Consumer koordiniert wird. Wir sehen, dass die Methode generateNumbers () 100 Elemente in eine Warteschlange stellt. Es braucht auch eine Giftpillen-Nachricht, um zu wissen, welche Art von Nachricht in eine Warteschlange gestellt werden muss, wenn die Ausführung abgeschlossen ist. Diese Nachricht muss giftPillPerProducer mal in eine Warteschlange gestellt werden.

Jeder Verbraucher nimmt ein Element mit der Methode take () aus einer BlockingQueue , sodass es blockiert, bis sich ein Element in einer Warteschlange befindet. Nachdem eine Ganzzahl aus einer Warteschlange entnommen wurde, wird geprüft, ob es sich bei der Nachricht um eine Giftpille handelt. Wenn ja, ist die Ausführung eines Threads abgeschlossen. Andernfalls wird das Ergebnis in der Standardausgabe zusammen mit dem Namen des aktuellen Threads ausgedruckt.

Dies gibt uns einen Einblick in das Innenleben unserer Verbraucher:

public class NumbersConsumer implements Runnable { private BlockingQueue queue; private final int poisonPill; public NumbersConsumer(BlockingQueue queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); if (number.equals(poisonPill)) { return; } System.out.println(Thread.currentThread().getName() + " result: " + number); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

Wichtig ist die Verwendung einer Warteschlange. Wie im Produzenten-Konstruktor wird eine Warteschlange als Argument übergeben. Wir können dies tun, da BlockingQueue ohne explizite Synchronisierung von Threads gemeinsam genutzt werden kann.

Now that we have our producer and consumer, we can start our program. We need to define the queue's capacity, and we set it to 100 elements.

We want to have 4 producer threads and a number of consumers threads will be equal to the number of available processors:

int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue(BOUND); for (int i = 1; i < N_PRODUCERS; i++) { new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { new Thread(new NumbersConsumer(queue, poisonPill)).start(); } new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start(); 

BlockingQueue is created using construct with a capacity. We're creating 4 producers and N consumers. We specify our poison pill message to be an Integer.MAX_VALUE because such value will never be sent by our producer under normal working conditions. The most important thing to notice here is that BlockingQueue is used to coordinate work between them.

Wenn wir das Programm ausführen, fügen 4 Producer-Threads zufällige Ganzzahlen in eine BlockingQueue ein, und die Konsumenten nehmen diese Elemente aus der Warteschlange. Jeder Thread druckt in Standardausgabe den Namen des Threads zusammen mit einem Ergebnis.

5. Schlussfolgerung

Dieser Artikel zeigt eine praktische Verwendung von BlockingQueue und erläutert Methoden, die zum Hinzufügen und Abrufen von Elementen verwendet werden. Außerdem haben wir gezeigt, wie mit BlockingQueue ein Multithread-Programm für Produzenten und Konsumenten erstellt wird, um die Arbeit zwischen Produzenten und Konsumenten zu koordinieren.

Die Implementierung all dieser Beispiele und Codefragmente finden Sie im GitHub-Projekt - dies ist ein Maven-basiertes Projekt, daher sollte es einfach zu importieren und auszuführen sein, wie es ist.