Handbuch zur Java Parallel Collectors Library

1. Einleitung

Parallel-Collectors ist eine kleine Bibliothek, die eine Reihe von Java Stream-API-Collectors bereitstellt, die die parallele Verarbeitung ermöglichen und gleichzeitig die Hauptmängel von Standard-Parallel-Streams umgehen.

2. Maven-Abhängigkeiten

Wenn wir die Bibliothek verwenden möchten, müssen wir einen einzelnen Eintrag in Mavens Datei pom.xml hinzufügen :

 com.pivovarit parallel-collectors 1.1.0 

Oder eine einzelne Zeile in Gradles Build-Datei:

compile 'com.pivovarit:parallel-collectors:1.1.0'

Die neueste Version finden Sie auf Maven Central.

3. Vorsichtsmaßnahmen für parallele Streams

Parallele Streams waren eines der Highlights von Java 8, erwiesen sich jedoch ausschließlich für die Verarbeitung schwerer CPUs.

Der Grund dafür war die Tatsache, dass parallele Streams intern von einem JVM-weiten gemeinsam genutzten ForkJoinPool unterstützt wurden , der eine begrenzte Parallelität bot und von allen parallelen Streams verwendet wurde, die auf einer einzelnen JVM-Instanz ausgeführt wurden.

Stellen Sie sich zum Beispiel vor, wir haben eine Liste von IDs und möchten sie verwenden, um eine Liste von Benutzern abzurufen, und dass dieser Vorgang teuer ist.

Wir könnten dafür Parallel Streams verwenden:

List ids = Arrays.asList(1, 2, 3); List results = ids.parallelStream() .map(i -> fetchById(i)) // each operation takes one second .collect(Collectors.toList()); System.out.println(results); // [user-1, user-2, user-3]

Und tatsächlich können wir sehen, dass es eine spürbare Beschleunigung gibt. Es wird jedoch problematisch, wenn wir mehrere parallele Blockierungsvorgänge gleichzeitig ausführen. Dies kann den Pool schnell überlasten und zu potenziell großen Latenzen führen. Aus diesem Grund ist es wichtig, Schottwände durch Erstellen separater Thread-Pools zu erstellen, um zu verhindern, dass nicht verwandte Aufgaben die Ausführung des anderen beeinflussen.

Um eine benutzerdefinierte ForkJoinPool- Instanz bereitzustellen , konnten wir den hier beschriebenen Trick nutzen. Dieser Ansatz beruhte jedoch auf einem undokumentierten Hack und war bis JDK10 fehlerhaft. Wir können mehr in der Ausgabe selbst lesen - [JDK8190974].

4. Parallele Kollektoren in Aktion

Parallele Kollektoren sind, wie der Name schon sagt, nur Standard-Stream-API-Kollektoren, mit denen zusätzliche Operationen in der collect () -Phase parallel ausgeführt werden können.

Die ParallelCollectors- Klasse (die die Collectors- Klasse widerspiegelt ) ist eine Fassade, die Zugriff auf die gesamte Funktionalität der Bibliothek bietet.

Wenn wir das obige Beispiel wiederholen wollten, könnten wir einfach schreiben:

ExecutorService executor = Executors.newFixedThreadPool(10); List ids = Arrays.asList(1, 2, 3); CompletableFuture
    
      results = ids.stream() .collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4)); System.out.println(results.join()); // [user-1, user-2, user-3]
    

Das Ergebnis ist das gleiche, wir konnten jedoch unseren benutzerdefinierten Thread-Pool bereitstellen, unsere benutzerdefinierte Parallelitätsstufe angeben und das Ergebnis in einer CompletableFuture- Instanz verpackt ankommen, ohne den aktuellen Thread zu blockieren.

Standardparallele Streams hingegen konnten keine davon erreichen.

4.1. ParallelCollectors.parallelToList / ToSet ()

So intuitiv es auch sein mag, wenn wir einen Stream parallel verarbeiten und Ergebnisse in einer Liste oder einem Set sammeln möchten , können wir einfach ParallelCollectors.parallelToList oder parallelToSet verwenden :

List ids = Arrays.asList(1, 2, 3); List results = ids.stream() .collect(parallelToList(i -> fetchById(i), executor, 4)) .join();

4.2. ParallelCollectors.parallelToMap ()

Wenn wir Stream- Elemente wie bei der Stream-API in einer Map- Instanz sammeln möchten , müssen wir zwei Mapper bereitstellen:

List ids = Arrays.asList(1, 2, 3); Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4)) .join(); // {1=user-1, 2=user-2, 3=user-3}

Wir können auch eine benutzerdefinierte bieten Map Instanz Lieferant :

Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4)) .join(); 

Und eine benutzerdefinierte Konfliktlösungsstrategie:

List ids = Arrays.asList(1, 2, 3); Map results = ids.stream() .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4)) .join();

4.3. ParallelCollectors.parallelToCollection ()

Ähnlich wie oben können wir unseren benutzerdefinierten Abholungslieferanten übergeben, wenn wir Ergebnisse erhalten möchten, die in unserem benutzerdefinierten Container verpackt sind:

List results = ids.stream() .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4)) .join();

4.4. ParallelCollectors.parallelToStream ()

Wenn das oben Genannte nicht ausreicht, können wir tatsächlich eine Stream- Instanz erhalten und dort die benutzerdefinierte Verarbeitung fortsetzen:

Map
    
      results = ids.stream() .collect(parallelToStream(i -> fetchById(i), executor, 4)) .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length()))) .join();
    

4.5. ParallelCollectors.parallel ()

Mit diesem können wir die Ergebnisse in der Reihenfolge ihrer Fertigstellung streamen:

ids.stream() .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4)) .forEach(System.out::println); // user-1 // user-3 // user-2 

In diesem Fall können wir erwarten, dass der Kollektor jedes Mal andere Ergebnisse zurückgibt, da wir eine zufällige Verarbeitungsverzögerung eingeführt haben.

4.6. ParallelCollectors.parallelOrdered ()

Diese Funktion ermöglicht das Streaming von Ergebnissen wie oben, behält jedoch die ursprüngliche Reihenfolge bei:

ids.stream() .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4)) .forEach(System.out::println); // user-1 // user-2 // user-3 

In diesem Fall behält der Kollektor die Reihenfolge immer bei, ist jedoch möglicherweise langsamer als oben beschrieben.

5. Limitations

At the point of writing, parallel-collectors don't work with infinite streams even if short-circuiting operations are used – it's a design limitation imposed by Stream API internals. Simply put, Streams treat collectors as non-short-circuiting operations so the stream needs to process all upstream elements before getting terminated.

The other limitation is that short-circuiting operations don't interrupt the remaining tasks after short-circuiting.

6. Conclusion

Wir haben gesehen, wie die Parallel-Collectors-Bibliothek die parallele Verarbeitung mithilfe von benutzerdefinierten Java Stream API- Collectors und CompletableFutures ermöglicht , um benutzerdefinierte Thread-Pools, Parallelität und den nicht blockierenden Stil von CompletableFutures zu verwenden.

Wie immer sind Code-Schnipsel auf GitHub verfügbar.

Weitere Informationen finden Sie in der Parallel-Collectors-Bibliothek auf GitHub, im Blog des Autors und im Twitter-Konto des Autors.