Anleitung zur ConcurrentSkipListMap

1. Übersicht

In diesem kurzen Artikel werden wir uns die ConcurrentSkipListMap- Klasse aus dem Paket java.util.concurrent ansehen.

Mit diesem Konstrukt können wir threadsichere Logik ohne Sperren erstellen. Es ist ideal für Probleme, wenn wir einen unveränderlichen Schnappschuss der Daten erstellen möchten, während andere Threads noch Daten in die Karte einfügen.

Wir werden das Problem lösen, einen Strom von Ereignissen zu sortieren und eine Momentaufnahme der Ereignisse zu erhalten, die in den letzten 60 Sekunden mit diesem Konstrukt eingetroffen sind .

2. Stream-Sortierlogik

Nehmen wir an, wir haben einen Strom von Ereignissen, die ständig von mehreren Threads ausgehen. Wir müssen in der Lage sein, Ereignisse der letzten 60 Sekunden sowie Ereignisse, die älter als 60 Sekunden sind, aufzunehmen.

Definieren wir zunächst die Struktur unserer Ereignisdaten:

public class Event { private ZonedDateTime eventTime; private String content; // standard constructors/getters }

Wir möchten unsere Ereignisse mithilfe des eventTime- Felds sortieren . Um dies mithilfe der ConcurrentSkipListMap zu erreichen, müssen wir beim Erstellen einer Instanz einen Komparator an seinen Konstruktor übergeben:

ConcurrentSkipListMap events = new ConcurrentSkipListMap( Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

Wir werden alle angekommenen Ereignisse anhand ihrer Zeitstempel vergleichen. Wir verwenden die compareLong () -Methode und übergeben die Extraktionsfunktion, die einen langen Zeitstempel von der ZonedDateTime benötigen kann.

Wenn unsere Ereignisse eintreffen, müssen wir sie nur mit der put () -Methode zur Karte hinzufügen . Beachten Sie, dass für diese Methode keine explizite Synchronisierung erforderlich ist:

public void acceptEvent(Event event) { events.put(event.getEventTime(), event.getContent()); }

Die ConcurrentSkipListMap übernimmt die Sortierung der darunter liegenden Ereignisse mithilfe des Komparators , der im Konstruktor an sie übergeben wurde.

Die bemerkenswertesten Vorteile der ConcurrentSkipListMap sind die Methoden, mit denen eine unveränderliche Momentaufnahme der Daten auf sperrenfreie Weise erstellt werden kann. Um alle Ereignisse abzurufen , die innerhalb der letzten Minute eingetroffen sind, können wir die tailMap () -Methode verwenden und die Zeit übergeben, ab der wir Elemente abrufen möchten:

public ConcurrentNavigableMap getEventsFromLastMinute() { return events.tailMap(ZonedDateTime.now().minusMinutes(1)); } 

Es werden alle Ereignisse der letzten Minute zurückgegeben. Es wird ein unveränderlicher Schnappschuss sein, und das Wichtigste ist, dass andere Schreibthreads der ConcurrentSkipListMap neue Ereignisse hinzufügen können, ohne dass eine explizite Sperrung erforderlich ist.

Wir können jetzt alle Ereignisse abrufen , die in einer Minute später eingetroffen sind - mithilfe der headMap () -Methode:

public ConcurrentNavigableMap getEventsOlderThatOneMinute() { return events.headMap(ZonedDateTime.now().minusMinutes(1)); }

Dies gibt eine unveränderliche Momentaufnahme aller Ereignisse zurück, die älter als eine Minute sind. Alle oben genannten Methoden gehören zur EventWindowSort- Klasse, die wir im nächsten Abschnitt verwenden werden.

3. Testen der Sortierstromlogik

Nachdem wir unsere Sortierlogik mithilfe der ConcurrentSkipListMap implementiert haben , können wir sie jetzt testen, indem wir zwei Writer-Threads erstellen , die jeweils einhundert Ereignisse senden:

ExecutorService executorService = Executors.newFixedThreadPool(3); EventWindowSort eventWindowSort = new EventWindowSort(); int numberOfThreads = 2; Runnable producer = () -> IntStream .rangeClosed(0, 100) .forEach(index -> eventWindowSort.acceptEvent( new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString())) ); for (int i = 0; i < numberOfThreads; i++) { executorService.execute(producer); } 

Jeder Thread ruft die acceptEvent () -Methode auf und sendet die Ereignisse mit eventTime von jetzt an "jetzt minus hundert Sekunden".

In der Zwischenzeit können wir die Methode getEventsFromLastMinute () aufrufen, die den Snapshot von Ereignissen zurückgibt , die sich innerhalb des Ein-Minuten-Fensters befinden:

ConcurrentNavigableMap eventsFromLastMinute = eventWindowSort.getEventsFromLastMinute();

Die Anzahl der Ereignisse in eventsFromLastMinute variiert in jedem Testlauf in Abhängigkeit von der Geschwindigkeit, mit der die Produzenten-Threads die Ereignisse an EventWindowSort senden. Wir können behaupten, dass der zurückgegebene Snapshot kein einziges Ereignis enthält, das älter als eine Minute ist:

long eventsOlderThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count(); assertEquals(eventsOlderThanOneMinute, 0);

Und dass der Schnappschuss mehr als null Ereignisse enthält, die sich innerhalb des Ein-Minuten-Fensters befinden:

long eventYoungerThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1))) .count(); assertTrue(eventYoungerThanOneMinute > 0);

Unser getEventsFromLastMinute () verwendet die darunter liegende tailMap () .

Testen wir jetzt getEventsOlderThatOneMinute () , das die headMap () -Methode aus der ConcurrentSkipListMap verwendet:

ConcurrentNavigableMap eventsFromLastMinute = eventWindowSort.getEventsOlderThatOneMinute();

Dieses Mal erhalten wir eine Momentaufnahme von Ereignissen, die älter als eine Minute sind. Wir können behaupten, dass es mehr als null solcher Ereignisse gibt:

long eventsOlderThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1))) .count(); assertTrue(eventsOlderThanOneMinute > 0);

Und als nächstes, dass es kein einziges Ereignis gibt, das innerhalb der letzten Minute stattfindet:

long eventYoungerThanOneMinute = eventsFromLastMinute .entrySet() .stream() .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1))) .count(); assertEquals(eventYoungerThanOneMinute, 0);

Das Wichtigste ist, dass wir den Snapshot von Daten erstellen können, während andere Threads der ConcurrentSkipListMap noch neue Werte hinzufügen .

4. Fazit

In diesem kurzen Tutorial haben wir uns die Grundlagen der ConcurrentSkipListMap sowie einige praktische Beispiele angesehen .

Wir haben die hohe Leistung der ConcurrentSkipListMap genutzt , um einen nicht blockierenden Algorithmus zu implementieren, der uns eine unveränderliche Momentaufnahme von Daten liefert, selbst wenn mehrere Threads gleichzeitig die Karte aktualisieren.

Die Implementierung all dieser Beispiele und Codefragmente finden Sie im GitHub-Projekt. Da es sich um ein Maven-Projekt handelt, sollte es einfach zu importieren und auszuführen sein.