Anleitung zum Fork / Join Framework in Java

1. Übersicht

Das Fork / Join-Framework wurde in Java 7 vorgestellt. Es bietet Tools zur Beschleunigung der Parallelverarbeitung, indem versucht wird, alle verfügbaren Prozessorkerne zu verwenden. Dies wird durch einen Divide-and-Conquer-Ansatz erreicht .

In der Praxis bedeutet dies, dass sich das Framework zunächst „teilt“ und die Aufgabe rekursiv in kleinere unabhängige Unteraufgaben aufteilt, bis sie einfach genug sind, um asynchron ausgeführt zu werden.

Danach beginnt der Teil „Join“ , in dem die Ergebnisse aller Unteraufgaben rekursiv zu einem einzigen Ergebnis zusammengefügt werden. Bei einer Aufgabe, die void zurückgibt, wartet das Programm einfach, bis jede Unteraufgabe ausgeführt wird.

Um eine effektive parallele Ausführung zu gewährleisten, verwendet das Fork / Join-Framework einen Thread-Pool namens ForkJoinPool , der Worker-Threads vom Typ ForkJoinWorkerThread verwaltet .

2. ForkJoinPool

Der ForkJoinPool ist das Herzstück des Frameworks. Es handelt sich um eine Implementierung des ExecutorService , der Arbeitsthreads verwaltet und uns Tools zur Verfügung stellt, mit denen Informationen zum Status und zur Leistung des Thread-Pools abgerufen werden können.

Worker-Threads können jeweils nur eine Aufgabe ausführen, aber der ForkJoinPool erstellt nicht für jede einzelne Unteraufgabe einen eigenen Thread. Stattdessen hat jeder Thread im Pool eine eigene Warteschlange mit zwei Enden (oder deque, ausgesprochenes Deck ), in der Aufgaben gespeichert werden.

Diese Architektur ist wichtig, um die Arbeitslast des Threads mithilfe des Algorithmus zum Stehlen von Arbeit auszugleichen.

2.1. Algorithmus zum Stehlen von Arbeit

Einfach ausgedrückt - freie Threads versuchen, Arbeit von Deques beschäftigter Threads zu „stehlen“.

Standardmäßig erhält ein Worker-Thread Aufgaben vom Kopf seiner eigenen Deque. Wenn es leer ist, übernimmt der Thread eine Aufgabe vom Ende der Deque eines anderen ausgelasteten Threads oder aus der globalen Eingabewarteschlange, da sich hier wahrscheinlich die größten Arbeiten befinden.

Dieser Ansatz minimiert die Möglichkeit, dass Threads um Aufgaben konkurrieren. Es reduziert auch die Häufigkeit, mit der der Thread nach Arbeit suchen muss, da er zuerst mit den größten verfügbaren Arbeitsblöcken arbeitet.

2.2. ForkJoinPool Instanziierung

In Java 8 können Sie am bequemsten auf die Instanz des ForkJoinPool zugreifen, indem Sie die statische Methode commonPool () verwenden. Wie der Name schon sagt, wird hiermit auf den allgemeinen Pool verwiesen, der ein Standard-Thread-Pool für jede ForkJoinTask ist .

Laut der Dokumentation von Oracle reduziert die Verwendung des vordefinierten gemeinsamen Pools den Ressourcenverbrauch, da dies die Erstellung eines separaten Thread-Pools pro Aufgabe erschwert.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Das gleiche Verhalten kann in Java 7 erreicht werden, indem ein ForkJoinPool erstellt und einem öffentlichen statischen Feld einer Dienstprogrammklasse zugewiesen wird :

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

Jetzt ist es leicht zugänglich:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

Mit den Konstruktoren von ForkJoinPool ist es möglich, einen benutzerdefinierten Thread-Pool mit einem bestimmten Grad an Parallelität, Thread-Factory und Ausnahmebehandlungsroutine zu erstellen. Im obigen Beispiel hat der Pool eine Parallelitätsstufe von 2. Dies bedeutet, dass der Pool 2 Prozessorkerne verwendet.

3. ForkJoinTask

ForkJoinTask ist der Basistyp für Aufgaben, die in ForkJoinPool ausgeführt werden. In der Praxis sollte eine der beiden Unterklassen erweitert werden: die RecursiveAction für ungültige Aufgaben und die RecursiveTask für Aufgaben, die einen Wert zurückgeben.Beide haben eine abstrakte Methode compute (), in der die Logik der Aufgabe definiert ist.

3.1. RecursiveAction - ein Beispiel

Im folgenden Beispiel wird die zu verarbeitende Arbeitseinheit durch eine Zeichenfolge dargestellt, die als Workload bezeichnet wird . Zu Demonstrationszwecken ist die Aufgabe unsinnig: Sie schreibt einfach ihre Eingaben in Großbuchstaben und protokolliert sie.

Um das Forking-Verhalten des Frameworks zu demonstrieren, teilt das Beispiel die Aufgabe auf, wenn die Workload .length () größer als ein angegebener Schwellenwert istVerwenden der Methode createSubtask () .

Der String wird rekursiv in Teilzeichenfolgen unterteilt, wodurch CustomRecursiveTask- Instanzen erstellt werden, die auf diesen Teilzeichenfolgen basieren.

Infolgedessen gibt die Methode eine Liste zurück.

Die Liste wird mit der invokeAll () -Methode an den ForkJoinPool gesendet :

public class CustomRecursiveAction extends RecursiveAction { private String workload = ""; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List createSubtasks() { List subtasks = new ArrayList(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread().getName()); } }

Mit diesem Muster können Sie Ihre eigenen RecursiveAction- Klassen entwickeln . Erstellen Sie dazu ein Objekt, das den Gesamtarbeitsaufwand darstellt, wählen Sie einen geeigneten Schwellenwert aus, definieren Sie eine Methode zum Teilen der Arbeit und definieren Sie eine Methode zum Ausführen der Arbeit.

3.2. RecursiveTask

Für Aufgaben, die einen Wert zurückgeben, ist die Logik hier ähnlich, außer dass das Ergebnis für jede Unteraufgabe in einem einzigen Ergebnis zusammengefasst ist:

public class CustomRecursiveTask extends RecursiveTask { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection createSubtasks() { List dividedTasks = new ArrayList(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a  a * 10) .sum(); } }

In diesem Beispiel wird die Arbeit durch ein Array dargestellt, das im arr- Feld der CustomRecursiveTask- Klasse gespeichert ist . Die Methode createSubtasks () unterteilt die Aufgabe rekursiv in kleinere Teile, bis jeder Teil kleiner als der Schwellenwert ist . Anschließend sendet die invokeAll () -Methode die Unteraufgaben an den gemeinsamen Pool und gibt eine Liste von Future zurück .

Um die Ausführung auszulösen, wird für jede Unteraufgabe die Methode join () aufgerufen.

In diesem Beispiel wird dies mithilfe der Stream-API von Java 8 erreicht . Die sum () -Methode wird als Darstellung der Kombination von Unterergebnissen zum Endergebnis verwendet.

4. Senden von Aufgaben an den ForkJoinPool

Um Aufgaben an den Thread-Pool zu senden, können nur wenige Ansätze verwendet werden.

Die Methode submit () oder execute () (ihre Anwendungsfälle sind dieselben):

forkJoinPool.execute(customRecursiveTask); int result = customRecursiveTask.join();

Die invoke () -Methode gabelt die Aufgabe und wartet auf das Ergebnis. Sie benötigt keine manuelle Verknüpfung:

int result = forkJoinPool.invoke(customRecursiveTask);

Die invokeAll () -Methode ist die bequemste Möglichkeit, eine Folge von ForkJoinTasks an den ForkJoinPool zu senden. Es nimmt Aufgaben als Parameter (zwei Aufgaben, var args oder eine Sammlung), gibt eine Sammlung zukünftiger Objekte in der Reihenfolge zurück, in der sie erstellt wurden.

Alternativ können Sie separate Methoden fork () und join () verwenden. Die fork () -Methode sendet eine Aufgabe an einen Pool, löst jedoch nicht deren Ausführung aus. Zu diesem Zweck muss die Methode join () verwendet werden. Im Fall von RecursiveAction gibt join () nur null zurück . Für RecursiveTask wird das Ergebnis der Ausführung der Aufgabe zurückgegeben:

customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();

In unserem RecursiveTask- Beispiel haben wir die invokeAll () -Methode verwendet, um eine Folge von Unteraufgaben an den Pool zu senden. Dieselbe Aufgabe kann mit fork () und join () ausgeführt werden , dies hat jedoch Konsequenzen für die Reihenfolge der Ergebnisse.

Um Verwirrung zu vermeiden, ist es im Allgemeinen eine gute Idee, die Methode invokeAll () zu verwenden, um mehr als eine Aufgabe an den ForkJoinPool zu senden.

5. Schlussfolgerungen

Die Verwendung des Fork / Join-Frameworks kann die Verarbeitung großer Aufgaben beschleunigen. Um dieses Ergebnis zu erzielen, sollten jedoch einige Richtlinien befolgt werden:

  • Verwenden Sie so wenig Thread-Pools wie möglich. In den meisten Fällen ist es am besten, einen Thread-Pool pro Anwendung oder System zu verwenden
  • Verwenden Sie den allgemeinen Standard-Thread-Pool, wenn keine spezielle Optimierung erforderlich ist
  • Verwenden Sie einen angemessenen Schwellenwert für die Aufteilung von ForkJoinTask in Unteraufgaben
  • Vermeiden Sie Blockierungen in Ihren ForkJoinTasks

Die in diesem Artikel verwendeten Beispiele sind im verknüpften GitHub-Repository verfügbar.