Übersicht über den java.util.concurrent

1. Übersicht

Das Paket java.util.concurrent bietet Tools zum Erstellen gleichzeitiger Anwendungen.

In diesem Artikel geben wir einen Überblick über das gesamte Paket.

2. Hauptkomponenten

Die java.util.concurrent enthält viel zu viele Funktionen, um sie in einem einzigen Artikel zu diskutieren. In diesem Artikel konzentrieren wir uns hauptsächlich auf einige der nützlichsten Dienstprogramme aus diesem Paket wie:

  • Testamentsvollstrecker
  • ExecutorService
  • ScheduledExecutorService
  • Zukunft
  • CountDownLatch
  • CyclicBarrier
  • Semaphor
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Schlösser
  • Phaser

Hier finden Sie auch viele Artikel zu einzelnen Klassen.

2.1. Testamentsvollstrecker

Executor ist eine Schnittstelle, die ein Objekt darstellt, das bereitgestellte Aufgaben ausführt.

Es hängt von der jeweiligen Implementierung ab (von wo aus der Aufruf initiiert wird), ob die Aufgabe in einem neuen oder aktuellen Thread ausgeführt werden soll. Daher können wir mithilfe dieser Schnittstelle den Taskausführungsfluss vom eigentlichen Taskausführungsmechanismus entkoppeln.

Hierbei ist zu beachten, dass Executor nicht unbedingt eine asynchrone Taskausführung benötigt. Im einfachsten Fall kann ein Executor die übermittelte Aufgabe sofort im aufrufenden Thread aufrufen.

Wir müssen einen Aufrufer erstellen, um die Executor-Instanz zu erstellen:

public class Invoker implements Executor { @Override public void execute(Runnable r) { r.run(); } }

Jetzt können wir diesen Aufrufer verwenden, um die Aufgabe auszuführen.

public void execute() { Executor executor = new Invoker(); executor.execute( () -> { // task to be performed }); }

Hierbei ist zu beachten, dass der Executor eine RejectedExecutionException auslöst , wenn er die Task nicht zur Ausführung annehmen kann .

2.2. ExecutorService

ExecutorService ist eine Komplettlösung für die asynchrone Verarbeitung. Es verwaltet eine speicherinterne Warteschlange und plant übermittelte Aufgaben basierend auf der Thread-Verfügbarkeit.

Um ExecutorService verwenden zu können, müssen wir eine ausführbare Klasse erstellen .

public class Task implements Runnable { @Override public void run() { // task details } }

Jetzt können wir die ExecutorService- Instanz erstellen und diese Aufgabe zuweisen. Zum Zeitpunkt der Erstellung müssen wir die Größe des Thread-Pools angeben.

ExecutorService executor = Executors.newFixedThreadPool(10);

Wenn wir eine Single-Threaded- ExecutorService- Instanz erstellen möchten , können wir newSingleThreadExecutor (ThreadFactory threadFactory) verwenden , um die Instanz zu erstellen.

Sobald der Executor erstellt ist, können wir ihn zum Senden der Aufgabe verwenden.

public void execute() { executor.submit(new Task()); }

Wir können auch die ausführbare Instanz erstellen, während wir die Aufgabe senden .

executor.submit(() -> { new Task(); });

Es enthält außerdem zwei sofort einsatzbereite Ausführungsbeendigungsmethoden. Der erste ist shutdown () ; Es wartet, bis alle übermittelten Aufgaben ausgeführt wurden. Die andere Methode ist shutdownNow () whic h sofort alle anstehenden / Ausführung von Aufgaben beendet.

Es gibt auch eine andere Methode awaitTermination (lange Zeitüberschreitung, TimeUnit-Einheit), die zwangsweise blockiert, bis alle Aufgaben ausgeführt wurden, nachdem ein Abschaltereignis ausgelöst wurde oder eine Ausführungszeitüberschreitung aufgetreten ist oder der Ausführungsthread selbst unterbrochen wurde.

try { executor.awaitTermination( 20l, TimeUnit.NANOSECONDS ); } catch (InterruptedException e) { e.printStackTrace(); }

2.3. ScheduledExecutorService

ScheduledExecutorService ist eine ähnliche Schnittstelle wie ExecutorService, kann jedoch regelmäßig Aufgaben ausführen.

Die Methoden von Executor und ExecutorService werden vor Ort ohne künstliche Verzögerung geplant. Null oder ein negativer Wert bedeutet, dass die Anforderung sofort ausgeführt werden muss.

Wir können sowohl die Runnable- als auch die Callable- Schnittstelle verwenden, um die Aufgabe zu definieren.

public void execute() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); Future future = executorService.schedule(() -> { // ... return "Hello world"; }, 1, TimeUnit.SECONDS); ScheduledFuture scheduledFuture = executorService.schedule(() -> { // ... }, 1, TimeUnit.SECONDS); executorService.shutdown(); }

ScheduledExecutorService kann die Aufgabe auch nach einer bestimmten festgelegten Verzögerung planen :

executorService.scheduleAtFixedRate(() -> { // ... }, 1, 10, TimeUnit.SECONDS); executorService.scheduleWithFixedDelay(() -> { // ... }, 1, 10, TimeUnit.SECONDS);

Hier erstellt und führt die Methode schedAtFixedRate (Befehl Runnable, long initialDelay, long period, TimeUnit unit) eine periodische Aktion aus und führt sie aus, die zuerst nach der angegebenen anfänglichen Verzögerung und anschließend mit dem angegebenen Zeitraum bis zum Herunterfahren der Dienstinstanz aufgerufen wird.

The scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) method creates and executes a periodic action that is invoked firstly after the provided initial delay, and repeatedly with the given delay between the termination of the executing one and the invocation of the next one.

2.4. Future

Future is used to represent the result of an asynchronous operation. It comes with methods for checking if the asynchronous operation is completed or not, getting the computed result, etc.

What's more, the cancel(boolean mayInterruptIfRunning) API cancels the operation and releases the executing thread. If the value of mayInterruptIfRunning is true, the thread executing the task will be terminated instantly.

Otherwise, in-progress tasks will be allowed to complete.

We can use below code snippet to create a future instance:

public void invoke() { ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> { // ... Thread.sleep(10000l); return "Hello world"; }); }

We can use following code snippet to check if the future result is ready and fetch the data if the computation is done:

if (future.isDone() && !future.isCancelled()) { try { str = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }

We can also specify a timeout for a given operation. If the task takes more than this time, a TimeoutException is thrown:

try { future.get(10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); }

2.5. CountDownLatch

CountDownLatch (introduced in JDK 5) is a utility class which blocks a set of threads until some operation completes.

A CountDownLatch is initialized with a counter(Integer type); this counter decrements as the dependent threads complete execution. But once the counter reaches zero, other threads get released.

You can learn more about CountDownLatch here.

2.6. CyclicBarrier

CyclicBarrier works almost the same as CountDownLatch except that we can reuse it. Unlike CountDownLatch, it allows multiple threads to wait for each other using await() method(known as barrier condition) before invoking the final task.

We need to create a Runnable task instance to initiate the barrier condition:

public class Task implements Runnable { private CyclicBarrier barrier; public Task(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { LOG.info(Thread.currentThread().getName() + " is waiting"); barrier.await(); LOG.info(Thread.currentThread().getName() + " is released"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }

Now we can invoke some threads to race for the barrier condition:

public void start() { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { // ... LOG.info("All previous tasks are completed"); }); Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); if (!cyclicBarrier.isBroken()) { t1.start(); t2.start(); t3.start(); } }

Here, the isBroken() method checks if any of the threads got interrupted during the execution time. We should always perform this check before performing the actual process.

2.7. Semaphore

The Semaphore is used for blocking thread level access to some part of the physical or logical resource. A semaphore contains a set of permits; whenever a thread tries to enter the critical section, it needs to check the semaphore if a permit is available or not.

If a permit is not available (via tryAcquire()), the thread is not allowed to jump into the critical section; however, if the permit is available the access is granted, and the permit counter decreases.

Once the executing thread releases the critical section, again the permit counter increases (done by release() method).

We can specify a timeout for acquiring access by using the tryAcquire(long timeout, TimeUnit unit) method.

We can also check the number of available permits or the number of threads waiting to acquire the semaphore.

Following code snippet can be used to implement a semaphore:

static Semaphore semaphore = new Semaphore(10); public void execute() throws InterruptedException { LOG.info("Available permit : " + semaphore.availablePermits()); LOG.info("Number of threads waiting to acquire: " + semaphore.getQueueLength()); if (semaphore.tryAcquire()) { try { // ... } finally { semaphore.release(); } } }

We can implement a Mutex like data-structure using Semaphore. More details on this can be found here.

2.8. ThreadFactory

As the name suggests, ThreadFactory acts as a thread (non-existing) pool which creates a new thread on demand. It eliminates the need of a lot of boilerplate coding for implementing efficient thread creation mechanisms.

We can define a ThreadFactory:

public class BaeldungThreadFactory implements ThreadFactory { private int threadId; private String name; public BaeldungThreadFactory(String name) { threadId = 1; this.name = name; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, name + "-Thread_" + threadId); LOG.info("created new thread with id : " + threadId + " and name : " + t.getName()); threadId++; return t; } }

We can use this newThread(Runnable r) method to create a new thread at runtime:

BaeldungThreadFactory factory = new BaeldungThreadFactory( "BaeldungThreadFactory"); for (int i = 0; i < 10; i++) { Thread t = factory.newThread(new Task()); t.start(); }

2.9. BlockingQueue

In asynchronous programming, one of the most common integration patterns is the producer-consumer pattern. The java.util.concurrent package comes with a data-structure know as BlockingQueue – which can be very useful in these async scenarios.

More information and a working example on this is available here.

2.10. DelayQueue

DelayQueue is an infinite-size blocking queue of elements where an element can only be pulled if it's expiration time (known as user defined delay) is completed. Hence, the topmost element (head) will have the most amount delay and it will be polled last.

More information and a working example on this is available here.

2.11. Locks

Not surprisingly, Lock is a utility for blocking other threads from accessing a certain segment of code, apart from the thread that's executing it currently.

The main difference between a Lock and a Synchronized block is that synchronized block is fully contained in a method; however, we can have Lock API’s lock() and unlock() operation in separate methods.

More information and a working example on this is available here.

2.12. Phaser

Phaser ist eine flexiblere Lösung als CyclicBarrier und CountDownLatch. Sie dient als wiederverwendbare Barriere, auf die die dynamische Anzahl von Threads warten muss, bevor die Ausführung fortgesetzt wird. Wir können mehrere Ausführungsphasen koordinieren und für jede Programmphase eine Phaser- Instanz wiederverwenden .

Weitere Informationen und ein funktionierendes Beispiel hierzu finden Sie hier.

3. Fazit

In diesem allgemeinen Übersichtsartikel haben wir uns auf die verschiedenen Dienstprogramme konzentriert, die im Paket java.util.concurrent verfügbar sind .

Wie immer ist der vollständige Quellcode auf GitHub verfügbar.