Implementieren eines Ringpuffers in Java

1. Übersicht

In diesem Tutorial erfahren Sie, wie Sie einen Ringpuffer in Java implementieren.

2. Ringpuffer

Ring Buffer (oder Circular Buffer) ist eine begrenzte zirkuläre Datenstruktur, die zum Puffern von Daten zwischen zwei oder mehr Threads verwendet wird . Während wir weiter in einen Ringpuffer schreiben, wird dieser am Ende umlaufen.

2.1. Wie es funktioniert

Ein Ringpuffer wird mithilfe eines Arrays fester Größe implementiert, das sich an den Grenzen umschließt .

Abgesehen von dem Array verfolgt es drei Dinge:

  • der nächste verfügbare Steckplatz im Puffer zum Einfügen eines Elements,
  • das nächste ungelesene Element im Puffer,
  • und das Ende des Arrays - der Punkt, an dem sich der Puffer bis zum Anfang des Arrays dreht

Die Mechanik, wie ein Ringpuffer mit diesen Anforderungen umgeht, hängt von der Implementierung ab. Zum Beispiel zeigt der Wikipedia-Eintrag zu diesem Thema eine Methode mit vier Zeigern.

Wir werden den Ansatz von Disruptors Implementierung des Ringpuffers unter Verwendung von Sequenzen ausleihen.

Das erste, was wir wissen müssen, ist die Kapazität - die feste maximale Größe des Puffers. Als nächstes verwenden wir zwei monoton ansteigende Sequenzen :

  • Schreibsequenz: Beginnend bei -1, inkrementiert um 1, wenn wir ein Element einfügen
  • Lesesequenz: Beginnend bei 0, inkrementiert um 1, wenn wir ein Element verbrauchen

Wir können eine Sequenz einem Index im Array zuordnen, indem wir eine Mod-Operation verwenden:

arrayIndex = sequence % capacity 

Die Mod-Operation umschließt die Sequenz um die Grenzen, um einen Slot im Puffer abzuleiten :

Mal sehen, wie wir ein Element einfügen würden:

buffer[++writeSequence % capacity] = element 

Wir erhöhen die Sequenz vor dem Einfügen eines Elements.

Um ein Element zu konsumieren, führen wir ein Post-Inkrement durch:

element = buffer[readSequence++ % capacity] 

In diesem Fall führen wir ein Post-Inkrement für die Sequenz durch. Wenn Sie ein Element verbrauchen, wird es nicht aus dem Puffer entfernt - es bleibt nur im Array, bis es überschrieben wird .

2.2. Leere und volle Puffer

Während wir uns um das Array wickeln, beginnen wir, die Daten im Puffer zu überschreiben. Wenn der Puffer voll ist, können wir entweder die ältesten Daten überschreiben, unabhängig davon, ob der Leser sie verbraucht hat, oder verhindern, dass die nicht gelesenen Daten überschrieben werden .

Wenn es sich der Leser leisten kann, die Zwischen- oder alten Werte (z. B. einen Börsenticker) zu übersehen, können wir die Daten überschreiben, ohne darauf zu warten, dass sie verbraucht werden. Wenn der Leser hingegen alle Werte verbrauchen muss (wie bei E-Commerce-Transaktionen), sollten wir warten (Blockieren / Besetzt-Warten), bis im Puffer ein Steckplatz verfügbar ist.

Der Puffer ist voll, wenn die Größe des Puffers seiner Kapazität entspricht , wobei seine Größe der Anzahl der ungelesenen Elemente entspricht:

size = (writeSequence - readSequence) + 1 isFull = (size == capacity) 

Wenn die Schreibsequenz hinter der Lesesequenz zurückbleibt, ist der Puffer leer :

isEmpty = writeSequence < readSequence 

Der Puffer gibt einen Nullwert zurück , wenn er leer ist.

2.2. Vorteile und Nachteile

Ein Ringpuffer ist ein effizienter FIFO-Puffer. Es verwendet ein Array mit fester Größe, das im Voraus vorab zugewiesen werden kann und ein effizientes Speicherzugriffsmuster ermöglicht. Alle Pufferoperationen sind konstante Zeit O (1) , einschließlich des Verbrauchs eines Elements, da keine Verschiebung von Elementen erforderlich ist.

Auf der anderen Seite ist die Bestimmung der richtigen Größe des Ringpuffers entscheidend. Beispielsweise können die Schreibvorgänge für eine lange Zeit blockieren, wenn der Puffer zu klein ist und die Lesevorgänge langsam sind. Wir können dynamische Größen verwenden, aber es würde das Verschieben von Daten erfordern, und wir werden die meisten der oben diskutierten Vorteile verpassen.

3. Implementierung in Java

Nachdem wir nun verstanden haben, wie ein Ringpuffer funktioniert, wollen wir ihn in Java implementieren.

3.1. Initialisierung

Definieren wir zunächst einen Konstruktor, der den Puffer mit einer vordefinierten Kapazität initialisiert:

public CircularBuffer(int capacity) { this.capacity = capacity; this.data = (E[]) new Object[capacity]; this.readSequence = 0; this.writeSequence = -1; } 

Dadurch wird ein leerer Puffer erstellt und die Sequenzfelder wie im vorherigen Abschnitt beschrieben initialisiert.

3.3. Angebot

Als Nächstes implementieren wir die Angebotsoperation , die am nächsten verfügbaren Slot ein Element in den Puffer einfügt und bei Erfolg true zurückgibt . Es wird false zurückgegeben, wenn der Puffer keinen leeren Steckplatz finden kann, dh ungelesene Werte nicht überschrieben werden können .

Lassen Sie uns die Angebotsmethode in Java implementieren :

public boolean offer(E element) { boolean isFull = (writeSequence - readSequence) + 1 == capacity; if (!isFull) { int nextWriteSeq = writeSequence + 1; data[nextWriteSeq % capacity] = element; writeSequence++; return true; } return false; } 

Wir erhöhen also die Schreibsequenz und berechnen den Index im Array für den nächsten verfügbaren Slot. Dann schreiben wir die Daten in den Puffer und speichern die aktualisierte Schreibsequenz.

Probieren wir es aus:

@Test public void givenCircularBuffer_whenAnElementIsEnqueued_thenSizeIsOne() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); assertTrue(buffer.offer("Square")); assertEquals(1, buffer.size()); } 

3.4. Poll

Finally, we'll implement the poll operation that retrieves and removes the next unread element. The poll operation doesn't remove the element but increments the read sequence.

Let's implement it:

public E poll() { boolean isEmpty = writeSequence < readSequence; if (!isEmpty) { E nextValue = data[readSequence % capacity]; readSequence++; return nextValue; } return null; } 

Here, we're reading the data at the current read sequence by computing the index in the array. Then, we're incrementing the sequence and returning the value, if the buffer is not empty.

Let's test it out:

@Test public void givenCircularBuffer_whenAnElementIsDequeued_thenElementMatchesEnqueuedElement() { CircularBuffer buffer = new CircularBuffer(defaultCapacity); buffer.offer("Triangle"); String shape = buffer.poll(); assertEquals("Triangle", shape); } 

4. Producer-Consumer Problem

We've talked about the use of a ring buffer for exchanging data between two or more threads, which is an example of a synchronization problem called the Producer-Consumer problem. In Java, we can solve the producer-consumer problem in various ways using semaphores, bounded queues, ring buffers, etc.

Let's implement a solution based on a ring buffer.

4.1. volatile Sequence Fields

Our implementation of the ring buffer is not thread-safe. Let's make it thread-safe for the simple single-producer and single-consumer case.

The producer writes data to the buffer and increments the writeSequence, while the consumer only reads from the buffer and increments the readSequence. So, the backing array is contention-free and we can get away without any synchronization.

But we still need to ensure that the consumer can see the latest value of the writeSequence field (visibility) and that the writeSequence is not updated before the data is actually available in the buffer (ordering).

We can make the ring buffer concurrent and lock-free in this case by making the sequence fields volatile:

private volatile int writeSequence = -1, readSequence = 0; 

In the offer method, a write to the volatile field writeSequence guarantees that the writes to the buffer happen before updating the sequence. At the same time, the volatile visibility guarantee ensures that the consumer will always see the latest value of writeSequence.

4.2. Producer

Let's implement a simple producer Runnable that writes to the ring buffer:

public void run() { for (int i = 0; i < items.length;) { if (buffer.offer(items[i])) { System.out.println("Produced: " + items[i]); i++; } } } 

The producer thread would wait for an empty slot in a loop (busy-waiting).

4.3. Consumer

We'll implement a consumer Callable that reads from the buffer:

public T[] call() { T[] items = (T[]) new Object[expectedCount]; for (int i = 0; i < items.length;) { T item = buffer.poll(); if (item != null) { items[i++] = item; System.out.println("Consumed: " + item); } } return items; } 

Der Consumer-Thread wird ohne Druck fortgesetzt, wenn er einen Nullwert aus dem Puffer empfängt .

Schreiben wir unseren Treibercode:

executorService.submit(new Thread(new Producer(buffer))); executorService.submit(new Thread(new Consumer(buffer))); 

Die Ausführung unseres Producer-Consumer-Programms führt zu folgenden Ergebnissen:

Produced: Circle Produced: Triangle Consumed: Circle Produced: Rectangle Consumed: Triangle Consumed: Rectangle Produced: Square Produced: Rhombus Consumed: Square Produced: Trapezoid Consumed: Rhombus Consumed: Trapezoid Produced: Pentagon Produced: Pentagram Produced: Hexagon Consumed: Pentagon Consumed: Pentagram Produced: Hexagram Consumed: Hexagon Consumed: Hexagram 

5. Fazit

In diesem Tutorial haben wir gelernt, wie ein Ringpuffer implementiert wird, und untersucht, wie er zur Lösung des Producer-Consumer-Problems verwendet werden kann.

Wie üblich ist der Quellcode für alle Beispiele auf GitHub verfügbar.