Benutzerdefinierte Thread-Pools in parallelen Java 8-Streams

1. Übersicht

Java 8 führte das Konzept der S treams als effiziente Methode zur Durchführung von Massenoperationen mit Daten ein. Parallele Streams können in Umgebungen abgerufen werden, die Parallelität unterstützen.

Diese Streams können eine verbesserte Leistung aufweisen - auf Kosten des Multithreading-Overheads.

In diesem kurzen Tutorial werden wir uns eine der größten Einschränkungen der Stream- API ansehen und sehen, wie ein paralleler Stream alternativ mit einer benutzerdefinierten ThreadPool- Instanz funktioniert - es gibt eine Bibliothek, die dies behandelt.

2. Paralleler Stream

Beginnen wir mit einem einfachen Beispiel - dem Aufrufen der parallelStream- Methode für einen der Collection- Typen -, das einen möglicherweise parallelen Stream zurückgibt :

@Test public void givenList_whenCallingParallelStream_shouldBeParallelStream(){ List aList = new ArrayList(); Stream parallelStream = aList.parallelStream(); assertTrue(parallelStream.isParallel()); }

Die Standardverarbeitung, die in einem solchen Stream stattfindet, verwendet ForkJoinPool.commonPool (), einen Thread-Pool, der von der gesamten Anwendung gemeinsam genutzt wird.

3. Benutzerdefinierter Thread-Pool

Wir können tatsächlich einen benutzerdefinierten ThreadPool übergeben, wenn wir den Stream verarbeiten .

Im folgenden Beispiel kann ein paralleler Stream einen benutzerdefinierten Thread-Pool verwenden , um die Summe der langen Werte von 1 bis einschließlich 1.000.000 zu berechnen:

@Test public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() throws InterruptedException, ExecutionException { long firstNum = 1; long lastNum = 1_000_000; List aList = LongStream.rangeClosed(firstNum, lastNum).boxed() .collect(Collectors.toList()); ForkJoinPool customThreadPool = new ForkJoinPool(4); long actualTotal = customThreadPool.submit( () -> aList.parallelStream().reduce(0L, Long::sum)).get(); assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal); }

Wir haben den ForkJoinPool- Konstruktor mit einer Parallelitätsstufe von 4 verwendet. Einige Experimente sind erforderlich, um den optimalen Wert für verschiedene Umgebungen zu ermitteln. Eine gute Faustregel ist jedoch, einfach die Anzahl basierend auf der Anzahl der Kerne Ihrer CPU auszuwählen.

Als Nächstes haben wir den Inhalt des parallelen Streams verarbeitet und ihn im Aufruf zum Reduzieren zusammengefasst .

Dieses einfache Beispiel zeigt möglicherweise nicht die volle Nützlichkeit der Verwendung eines benutzerdefinierten Thread-Pools , aber die Vorteile werden in Situationen offensichtlich, in denen der gemeinsame Thread-Pool nicht mit lang laufenden Aufgaben verknüpft werden soll (z. B. Verarbeitung von Daten aus einer Netzwerkquelle). oder der gemeinsame Thread-Pool wird von anderen Komponenten in der Anwendung verwendet.

4. Fazit

Wir haben uns kurz angesehen, wie ein paralleler Stream mit einem benutzerdefinierten Thread-Pool ausgeführt wird . In der richtigen Umgebung und bei richtiger Verwendung des Parallelitätsniveaus können in bestimmten Situationen Leistungssteigerungen erzielt werden.

Die vollständigen Codebeispiele, auf die in diesem Artikel verwiesen wird, finden Sie auf Github.