Einführung in Thread-Pools in Java

1. Einleitung

Dieser Artikel befasst sich mit Thread-Pools in Java - angefangen bei den verschiedenen Implementierungen in der Standard-Java-Bibliothek bis hin zur Guava-Bibliothek von Google.

2. Der Thread-Pool

In Java werden Threads Threads auf Systemebene zugeordnet, die die Ressourcen des Betriebssystems darstellen. Wenn Sie Threads unkontrolliert erstellen, gehen Ihnen diese Ressourcen möglicherweise schnell aus.

Die Kontextumschaltung zwischen Threads erfolgt ebenfalls durch das Betriebssystem - um Parallelität zu emulieren. Eine vereinfachende Ansicht ist: Je mehr Threads Sie erzeugen, desto weniger Zeit verbringt jeder Thread mit der eigentlichen Arbeit.

Das Thread-Pool-Muster hilft, Ressourcen in einer Multithread-Anwendung zu sparen und die Parallelität in bestimmten vordefinierten Grenzen einzudämmen.

Wenn Sie einen Thread-Pool verwenden, schreiben Sie Ihren gleichzeitigen Code in Form paralleler Tasks und senden ihn zur Ausführung an eine Instanz eines Thread-Pools . Diese Instanz steuert mehrere wiederverwendete Threads zum Ausführen dieser Aufgaben.

Mit dem Muster können Sie die Anzahl der von der Anwendung erstellten Threads und ihren Lebenszyklus steuern sowie die Ausführung von Aufgaben planen und eingehende Aufgaben in einer Warteschlange halten.

3. Thread-Pools in Java

3.1. Executors , Executor und ExecutorService

Die Executors- Hilfsklasse enthält verschiedene Methoden zum Erstellen vorkonfigurierter Thread-Pool-Instanzen für Sie. Diese Klassen sind ein guter Anfang - verwenden Sie sie, wenn Sie keine benutzerdefinierte Feinabstimmung vornehmen müssen.

Die Executor- und ExecutorService- Schnittstellen werden verwendet, um mit verschiedenen Thread-Pool-Implementierungen in Java zu arbeiten. Normalerweise sollten Sie Ihren Code von der tatsächlichen Implementierung des Thread-Pools entkoppelt halten und diese Schnittstellen in Ihrer gesamten Anwendung verwenden.

Die Executor- Schnittstelle verfügt über eine einzige Ausführungsmethode , mit der ausführbare Instanzen zur Ausführung gesendet werden können.

Hier ist ein kurzes Beispiel dafür, wie Sie mithilfe der Executors- API eine Executor- Instanz abrufen können, die von einem einzelnen Thread-Pool und einer unbegrenzten Warteschlange für die sequentielle Ausführung von Aufgaben unterstützt wird. Hier führen wir eine einzelne Aufgabe aus, bei der einfach „ Hallo Welt “ auf dem Bildschirm gedruckt wird . Die Aufgabe wird als Lambda (eine Java 8-Funktion) übergeben, von der angenommen wird, dass sie ausführbar ist .

Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> System.out.println("Hello World"));

Die ExecutorService- Schnittstelle enthält eine Vielzahl von Methoden zum Steuern des Fortschritts der Aufgaben und zum Verwalten der Beendigung des Dienstes . Über diese Schnittstelle können Sie die Aufgaben zur Ausführung senden und ihre Ausführung mithilfe der zurückgegebenen Future- Instanz steuern .

Im folgenden Beispiel erstellen wir einen ExecutorService , senden eine Aufgabe und warten dann mit der get- Methode der zurückgegebenen Future , bis die übermittelte Aufgabe abgeschlossen und der Wert zurückgegeben wird:

ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> "Hello World"); // some operations String result = future.get();

Natürlich möchten Sie in einem realen Szenario normalerweise nicht sofort future.get () aufrufen, sondern den Aufruf verschieben, bis Sie tatsächlich den Wert der Berechnung benötigen.

Das Eintragen Verfahren überlastet ist entweder für Runnable oder aufrufbare die beide funktionelle Schnittstellen und kann als Lambda geben wird (beginnend mit Java 8).

Die einzelne Methode von Runnable löst keine Ausnahme aus und gibt keinen Wert zurück. Die Callable- Schnittstelle ist möglicherweise praktischer, da wir eine Ausnahme auslösen und einen Wert zurückgeben können.

Um den Compiler auf den Callable- Typ schließen zu lassen , geben Sie einfach einen Wert aus dem Lambda zurück.

Weitere Beispiele für die Verwendung der ExecutorService- Oberfläche und der Zukunft finden Sie unter „Eine Anleitung zum Java ExecutorService“.

3.2. ThreadPoolExecutor

Der ThreadPoolExecutor ist eine erweiterbare Thread-Pool-Implementierung mit vielen Parametern und Hooks zur Feinabstimmung.

Die wichtigsten Konfigurationsparameter, die wir hier diskutieren werden, sind: corePoolSize , maximumPoolSize und keepAliveTime .

Der Pool besteht aus einer festen Anzahl von Kernthreads, die ständig im Inneren gespeichert sind, und einigen übermäßigen Threads, die möglicherweise erzeugt und dann beendet werden, wenn sie nicht mehr benötigt werden. Der Parameter corePoolSize gibt die Anzahl der Kernthreads an , die instanziiert und im Pool gespeichert werden. Wenn eine neue Aufgabe eingeht , wenn alle Kernthreads ausgelastet sind und die interne Warteschlange voll ist, kann der Pool auf MaximumPoolSize anwachsen .

Der Parameter keepAliveTime ist das Zeitintervall, für das die übermäßigen Threads (die über die corePoolSize hinaus instanziiert werden ) im Ruhezustand existieren dürfen. Standardmäßig berücksichtigt der ThreadPoolExecutor nur Nicht-Core-Threads zum Entfernen. Um dieselbe Entfernungsrichtlinie auf Kernthreads anzuwenden, können wir die Methode allowCoreThreadTimeOut (true) verwenden.

Diese Parameter decken eine Vielzahl von Anwendungsfällen ab, die typischsten Konfigurationen sind jedoch in den statischen Executors- Methoden vordefiniert .

Zum Beispiel , newFixedThreadPool schafft ein Verfahren ThreadPoolExecutor mit gleichen corePoolSize und maximumPoolSize Parameterwerte und einen Nullkeepalivetime. Dies bedeutet, dass die Anzahl der Threads in diesem Thread-Pool immer gleich ist:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(2, executor.getPoolSize()); assertEquals(1, executor.getQueue().size());

Im obigen Beispiel instanziieren wir einen ThreadPoolExecutor mit einer festen Threadanzahl von 2. Dies bedeutet, dass wenn die Anzahl der gleichzeitig ausgeführten Aufgaben immer kleiner oder gleich zwei ist, sie sofort ausgeführt werden. Andernfalls werden einige dieser Aufgaben möglicherweise in eine Warteschlange gestellt, um auf ihren Einsatz zu warten .

Wir haben drei Callable- Aufgaben erstellt, die schwere Arbeit imitieren, indem sie 1000 Millisekunden lang schlafen. Die ersten beiden Aufgaben werden sofort ausgeführt, und die dritte Aufgabe muss in der Warteschlange warten. Wir können dies überprüfen, indem wir die Methoden getPoolSize () und getQueue (). Size () unmittelbar nach dem Senden der Aufgaben aufrufen .

Ein weiterer vorkonfigurierter ThreadPoolExecutor kann mit der Executors.newCachedThreadPool () -Methode erstellt werden. Diese Methode empfängt überhaupt keine Anzahl von Threads. Die corePoolSize wird tatsächlich auf 0 gesetzt, und die maximalePoolSize wird für diese Instanz auf Integer.MAX_VALUE gesetzt . Die keepAliveTime beträgt für diese 60 Sekunden.

These parameter values mean that the cached thread pool may grow without bounds to accommodate any number of submitted tasks. But when the threads are not needed anymore, they will be disposed of after 60 seconds of inactivity. A typical use case is when you have a lot of short-living tasks in your application.

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(3, executor.getPoolSize()); assertEquals(0, executor.getQueue().size());

The queue size in the example above will always be zero because internally a SynchronousQueue instance is used. In a SynchronousQueue, pairs of insert and remove operations always occur simultaneously, so the queue never actually contains anything.

The Executors.newSingleThreadExecutor() API creates another typical form of ThreadPoolExecutor containing a single thread. The single thread executor is ideal for creating an event loop. The corePoolSize and maximumPoolSize parameters are equal to 1, and the keepAliveTime is zero.

Tasks in the above example will be executed sequentially, so the flag value will be 2 after the task's completion:

AtomicInteger counter = new AtomicInteger(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { counter.set(1); }); executor.submit(() -> { counter.compareAndSet(1, 2); });

Additionally, this ThreadPoolExecutor is decorated with an immutable wrapper, so it cannot be reconfigured after creation. Note that also this is the reason we cannot cast it to a ThreadPoolExecutor.

3.3. ScheduledThreadPoolExecutor

The ScheduledThreadPoolExecutor extends the ThreadPoolExecutor class and also implements the ScheduledExecutorService interface with several additional methods:

  • schedule method allows to execute a task once after a specified delay;
  • scheduleAtFixedRate method allows to execute a task after a specified initial delay and then execute it repeatedly with a certain period; the period argument is the time measured between the starting times of the tasks, so the execution rate is fixed;
  • scheduleWithFixedDelay method is similar to scheduleAtFixedRate in that it repeatedly executes the given task, but the specified delay is measured between the end of the previous task and the start of the next; the execution rate may vary depending on the time it takes to execute any given task.

The Executors.newScheduledThreadPool() method is typically used to create a ScheduledThreadPoolExecutor with a given corePoolSize, unbounded maximumPoolSize and zero keepAliveTime. Here's how to schedule a task for execution in 500 milliseconds:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); executor.schedule(() -> { System.out.println("Hello World"); }, 500, TimeUnit.MILLISECONDS);

The following code shows how to execute a task after 500 milliseconds delay and then repeat it every 100 milliseconds. After scheduling the task, we wait until it fires three times using the CountDownLatch lock, then cancel it using the Future.cancel() method.

CountDownLatch lock = new CountDownLatch(3); ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); ScheduledFuture future = executor.scheduleAtFixedRate(() -> { System.out.println("Hello World"); lock.countDown(); }, 500, 100, TimeUnit.MILLISECONDS); lock.await(1000, TimeUnit.MILLISECONDS); future.cancel(true);

3.4. ForkJoinPool

ForkJoinPool is the central part of the fork/join framework introduced in Java 7. It solves a common problem of spawning multiple tasks in recursive algorithms. Using a simple ThreadPoolExecutor, you will run out of threads quickly, as every task or subtask requires its own thread to run.

In a fork/join framework, any task can spawn (fork) a number of subtasks and wait for their completion using the join method. The benefit of the fork/join framework is that it does not create a new thread for each task or subtask, implementing the Work Stealing algorithm instead. This framework is thoroughly described in the article “Guide to the Fork/Join Framework in Java”

Let’s look at a simple example of using ForkJoinPool to traverse a tree of nodes and calculate the sum of all leaf values. Here’s a simple implementation of a tree consisting of a node, an int value and a set of child nodes:

static class TreeNode { int value; Set children; TreeNode(int value, TreeNode... children) { this.value = value; this.children = Sets.newHashSet(children); } }

Now if we want to sum all values in a tree in parallel, we need to implement a RecursiveTask interface. Each task receives its own node and adds its value to the sum of values of its children. To calculate the sum of children values, task implementation does the following:

  • streams the children set,
  • maps over this stream, creating a new CountingTask for each element,
  • executes each subtask by forking it,
  • collects the results by calling the join method on each forked task,
  • sums the results using the Collectors.summingInt collector.
public static class CountingTask extends RecursiveTask { private final TreeNode node; public CountingTask(TreeNode node) { this.node = node; } @Override protected Integer compute() { return node.value + node.children.stream() .map(childNode -> new CountingTask(childNode).fork()) .collect(Collectors.summingInt(ForkJoinTask::join)); } }

The code to run the calculation on an actual tree is very simple:

TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); int sum = forkJoinPool.invoke(new CountingTask(tree));

4. Thread Pool's Implementation in Guava

Guava is a popular Google library of utilities. It has many useful concurrency classes, including several handy implementations of ExecutorService. The implementing classes are not accessible for direct instantiation or subclassing, so the only entry point for creating their instances is the MoreExecutors helper class.

4.1. Adding Guava as a Maven Dependency

Add the following dependency to your Maven pom file to include the Guava library to your project. You can find the latest version of Guava library in the Maven Central repository:

 com.google.guava guava 19.0 

4.2. Direct Executor and Direct Executor Service

Sometimes you want to execute the task either in the current thread or in a thread pool, depending on some conditions. You would prefer to use a single Executor interface and just switch the implementation. Although it is not so hard to come up with an implementation of Executor or ExecutorService that executes the tasks in the current thread, it still requires writing some boilerplate code.

Gladly, Guava provides predefined instances for us.

Here's an example that demonstrates the execution of a task in the same thread. Although the provided task sleeps for 500 milliseconds, it blocks the current thread, and the result is available immediately after the execute call is finished:

Executor executor = MoreExecutors.directExecutor(); AtomicBoolean executed = new AtomicBoolean(); executor.execute(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } executed.set(true); }); assertTrue(executed.get());

The instance returned by the directExecutor() method is actually a static singleton, so using this method does not provide any overhead on object creation at all.

You should prefer this method to the MoreExecutors.newDirectExecutorService() because that API creates a full-fledged executor service implementation on every call.

4.3. Exiting Executor Services

Another common problem is shutting down the virtual machine while a thread pool is still running its tasks. Even with a cancellation mechanism in place, there is no guarantee that the tasks will behave nicely and stop their work when the executor service shuts down. This may cause JVM to hang indefinitely while the tasks keep doing their work.

To solve this problem, Guava introduces a family of exiting executor services. They are based on daemon threads that terminate together with the JVM.

These services also add a shutdown hook with the Runtime.getRuntime().addShutdownHook() method and prevent the VM from terminating for a configured amount of time before giving up on hung tasks.

In the following example, we're submitting the task that contains an infinite loop, but we use an exiting executor service with a configured time of 100 milliseconds to wait for the tasks upon VM termination. Without the exitingExecutorService in place, this task would cause the VM to hang indefinitely:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS); executorService.submit(() -> { while (true) { } });

4.4. Listening Decorators

Listening decorators allow you to wrap the ExecutorService and receive ListenableFuture instances upon task submission instead of simple Future instances. The ListenableFuture interface extends Future and has a single additional method addListener. This method allows adding a listener that is called upon future completion.

Sie möchten die ListenableFuture.addListener () -Methode selten direkt verwenden, sie ist jedoch für die meisten Hilfsmethoden in der Futures- Dienstprogrammklasse von wesentlicher Bedeutung . Mit der Futures.allAsList () -Methode können Sie beispielsweise mehrere ListenableFuture- Instanzen in einer einzigen ListenableFuture- Instanz kombinieren , die nach erfolgreichem Abschluss aller kombinierten Futures abgeschlossen wird:

ExecutorService executorService = Executors.newCachedThreadPool(); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService); ListenableFuture future1 = listeningExecutorService.submit(() -> "Hello"); ListenableFuture future2 = listeningExecutorService.submit(() -> "World"); String greeting = Futures.allAsList(future1, future2).get() .stream() .collect(Collectors.joining(" ")); assertEquals("Hello World", greeting);

5. Schlussfolgerung

In diesem Artikel haben wir das Thread-Pool-Muster und seine Implementierungen in der Standard-Java-Bibliothek und in der Guava-Bibliothek von Google erläutert.

Der Quellcode für den Artikel ist auf GitHub verfügbar.