Java EE 7 Stapelverarbeitung

1. Einleitung

Stellen Sie sich vor, wir müssten Aufgaben wie die Verarbeitung von Gehaltsabrechnungen, die Berechnung von Zinsen und die Erstellung von Rechnungen manuell erledigen. Es würde ziemlich langweilig, fehleranfällig und eine endlose Liste manueller Aufgaben werden!

In diesem Tutorial werfen wir einen Blick auf Java Batch Processing (JSR 352), einen Teil der Jakarta EE-Plattform, und eine hervorragende Spezifikation für die Automatisierung solcher Aufgaben. Es bietet Anwendungsentwicklern ein Modell für die Entwicklung robuster Stapelverarbeitungssysteme, damit sie sich auf die Geschäftslogik konzentrieren können.

2. Maven-Abhängigkeiten

Da JSR 352 nur eine Spezifikation ist, müssen wir seine API und Implementierung wie jberet einbeziehen :

 javax.batch javax.batch-api 1.0.1   org.jberet jberet-core 1.0.2.Final   org.jberet jberet-support 1.0.2.Final   org.jberet jberet-se 1.0.2.Final 

Wir werden auch eine In-Memory-Datenbank hinzufügen, damit wir einige realistischere Szenarien betrachten können.

3. Schlüsselkonzepte

JSR 352 führt einige Konzepte ein, die wir folgendermaßen betrachten können:

Definieren wir zunächst jedes Stück:

  • Links beginnend haben wir den JobOperator . Es verwaltet alle Aspekte der Jobverarbeitung wie Starten, Stoppen und Neustarten
  • Als nächstes haben wir den Job . Ein Job ist eine logische Sammlung von Schritten. Es kapselt einen gesamten Stapelprozess
  • Ein Job enthält zwischen 1 und n Schritte . Jeder Schritt ist eine unabhängige, sequentielle Arbeitseinheit. Ein Schritt besteht aus dem Lesen der Eingabe, dem Verarbeiten dieser Eingabe und dem Schreiben der Ausgabe
  • Und zu guter Letzt haben wir das JobRepository, in dem die laufenden Informationen der Jobs gespeichert sind . Es hilft, die Jobs, ihren Status und ihre Abschlussergebnisse im Auge zu behalten

Die Schritte sind etwas detaillierter als diese. Schauen wir uns das als nächstes an. Zuerst schauen wir uns die Chunk- Schritte und dann die Batchlets an .

4. Erstellen eines Chunks

Wie bereits erwähnt, ist ein Stück eine Art Schritt . Wir werden oft einen Block verwenden, um eine Operation auszudrücken, die immer wieder ausgeführt wird, beispielsweise über eine Reihe von Elementen. Es ist eine Art Zwischenoperation von Java Streams.

Wenn wir einen Block beschreiben, müssen wir angeben, woher die Artikel stammen, wie sie verarbeitet werden und wohin sie anschließend gesendet werden sollen.

4.1. Artikel lesen

Um Elemente zu lesen, müssen Sie ItemReader implementieren .

In diesem Fall erstellen wir einen Reader, der einfach die Zahlen 1 bis 10 ausgibt:

@Named public class SimpleChunkItemReader extends AbstractItemReader { private Integer[] tokens; private Integer count; @Inject JobContext jobContext; @Override public Integer readItem() throws Exception { if (count >= tokens.length) { return null; } jobContext.setTransientUserData(count); return tokens[count++]; } @Override public void open(Serializable checkpoint) throws Exception { tokens = new Integer[] { 1,2,3,4,5,6,7,8,9,10 }; count = 0; } }

Jetzt lesen wir hier nur aus dem internen Zustand der Klasse. Aber readItem kann natürlich aus einer Datenbank , aus dem Dateisystem oder einer anderen externen Quelle abgerufen werden .

Beachten Sie, dass wir einen Teil dieses internen Status mit JobContext # setTransientUserData () speichern, was später nützlich sein wird.

Beachten Sie auch den Checkpoint- Parameter . Wir werden das auch wieder aufgreifen.

4.2. Artikel bearbeiten

Der Grund, warum wir chunking, ist natürlich, dass wir eine Art Operation an unseren Gegenständen durchführen wollen!

Jedes Mal, wenn wir von einem Artikelverarbeiter null zurückgeben , wird dieser Artikel aus dem Stapel entfernt.

Nehmen wir hier also an, wir wollen nur die geraden Zahlen behalten. Wir können einen ItemProcessor verwenden , der die ungeraden zurückweist, indem er null zurückgibt :

@Named public class SimpleChunkItemProcessor implements ItemProcessor { @Override public Integer processItem(Object t) { Integer item = (Integer) t; return item % 2 == 0 ? item : null; } }

processItem wird einmal für jeden Artikel aufgerufen, den unser ItemReader ausgibt .

4.3. Artikel schreiben

Schließlich ruft der Job den ItemWriter auf, damit wir unsere transformierten Elemente schreiben können:

@Named public class SimpleChunkWriter extends AbstractItemWriter { List processed = new ArrayList(); @Override public void writeItems(List items) throws Exception { items.stream().map(Integer.class::cast).forEach(processed::add); } } 

Wie lang sind Artikel ? In einem Moment definieren wir die Größe eines Blocks , die die Größe der Liste bestimmt, die an writeItems gesendet wird .

4.4. Definieren eines Chunks in einem Job

Jetzt fügen wir all dies in einer XML-Datei mit JSL oder Job Specification Language zusammen. Beachten Sie, dass wir unseren Reader, Prozessor, Chunker und auch eine Chunk-Größe auflisten:

Die Blockgröße gibt an, wie oft der Fortschritt im Block in das Job-Repository übernommen wird. Dies ist wichtig, um die Fertigstellung zu gewährleisten, falls ein Teil des Systems ausfällt.

Wir müssen diese Datei in META-INF / Batch-Jobs für platzieren. JAR- Dateien und in WEB-INF / classes / META-INF / Batch-Jobs für .war- Dateien .

Wir haben unserem Job die ID "simpleChunk" gegeben, also versuchen wir das in einem Unit-Test.

Jetzt werden Jobs asynchron ausgeführt, was das Testen schwierig macht. Schauen Sie sich im Beispiel unseren BatchTestHelper an, der abfragt und wartet, bis der Auftrag abgeschlossen ist:

@Test public void givenChunk_thenBatch_completesWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleChunk", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); } 

Das sind also Brocken. Schauen wir uns nun die Batchlets an.

5. Erstellen eines Batchlets

Nicht alles passt gut in ein iteratives Modell. Beispielsweise haben wir möglicherweise eine Aufgabe, die wir nur einmal aufrufen , vollständig ausführen und einen Beendigungsstatus zurückgeben müssen.

Der Vertrag für ein Batchlet ist ganz einfach:

@Named public class SimpleBatchLet extends AbstractBatchlet { @Override public String process() throws Exception { return BatchStatus.COMPLETED.toString(); } }

Wie ist die JSL:

Und wir können es mit dem gleichen Ansatz wie zuvor testen:

@Test public void givenBatchlet_thenBatch_completeWithSuccess() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobExecution = BatchTestHelper.keepTestAlive(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

Wir haben uns verschiedene Möglichkeiten angesehen, um Schritte zu implementieren.

Betrachten wir nun Mechanismen zur Kennzeichnung und Garantie des Fortschritts.

6. Benutzerdefinierter Prüfpunkt

Fehler treten zwangsläufig mitten im Job auf. Sollen wir einfach von vorne anfangen oder können wir irgendwie dort anfangen, wo wir aufgehört haben?

Wie der Name schon sagt, helfen uns Checkpoints , im Fehlerfall regelmäßig ein Lesezeichen zu setzen.

Standardmäßig ist das Ende der Blockverarbeitung ein natürlicher Prüfpunkt .

Wir können es jedoch mit unserem eigenen CheckpointAlgorithm anpassen :

@Named public class CustomCheckPoint extends AbstractCheckpointAlgorithm { @Inject JobContext jobContext; @Override public boolean isReadyToCheckpoint() throws Exception { int counterRead = (Integer) jobContext.getTransientUserData(); return counterRead % 5 == 0; } }

Erinnern Sie sich an die Anzahl, die wir früher in vorübergehende Daten eingegeben haben? Hier können wir es mit JobContext # getTransientUserData herausziehenzu erklären, dass wir uns auf jede fünfte verarbeitete Nummer festlegen wollen.

Ohne dies würde ein Commit am Ende jedes Blocks oder in unserem Fall jeder dritten Nummer stattfinden.

And then, we match that up with the checkout-algorithm directive in our XML underneath our chunk:

Let's test the code, again noting that some of the boilerplate steps are hidden away in BatchTestHelper:

@Test public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId) .stream() .map(BatchTestHelper::getCommitCount) .forEach(count -> assertEquals(3L, count.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

So, we might be expecting a commit count of 2 since we have ten items and configured the commits to be every 5th item. But, the framework does one more final read commit at the end to ensure everything has been processed, which is what brings us up to 3.

Next, let's look at how to handle errors.

7. Exception Handling

By default, the job operator will mark our job as FAILED in case of an exception.

Let's change our item reader to make sure that it fails:

@Override public Integer readItem() throws Exception { if (tokens.hasMoreTokens()) { String tempTokenize = tokens.nextToken(); throw new RuntimeException(); } return null; }

And then test:

@Test public void whenChunkError_thenBatch_CompletesWithFailed() throws Exception { // ... start job and wait for completion assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED); }

But, we can override this default behavior in a number of ways:

  • skip-limit specifies the number of exceptions this step will ignore before failing
  • retry-limit specifies the number of times the job operator should retry the step before failing
  • skippable-exception-class specifies a set of exceptions that chunk processing will ignore

So, we can edit our job so that it ignores RuntimeException, as well as a few others, just for illustration:

And now our code will pass:

@Test public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception { // ... start job and wait for completion jobOperator.getStepExecutions(executionId).stream() .map(BatchTestHelper::getProcessSkipCount) .forEach(skipCount -> assertEquals(1L, skipCount.longValue())); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8. Executing Multiple Steps

We mentioned earlier that a job can have any number of steps, so let's see that now.

8.1. Firing the Next Step

By default, each step is the last step in the job.

In order to execute the next step within a batch job, we'll have to explicitly specify by using the next attribute within the step definition:

If we forget this attribute, then the next step in sequence will not get executed.

And we can see what this looks like in the API:

@Test public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(2 , jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.2. Flows

A sequence of steps can also be encapsulated into a flow. When the flow is finished, it is the entire flow that transitions to the execution element. Also, elements inside the flow can't transition to elements outside the flow.

We can, say, execute two steps inside a flow, and then have that flow transition to an isolated step:

And we can still see each step execution independently:

@Test public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception { // ... start job and wait for completion assertEquals(3, jobOperator.getStepExecutions(executionId).size()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

8.3. Decisions

We also have if/else support in the form of decisions. Decisions provide a customized way of determining a sequence among steps, flows, and splits.

Like steps, it works on transition elements such as next which can direct or terminate job execution.

Let's see how the job can be configured:

Any decision element needs to be configured with a class that implements Decider. Its job is to return a decision as a String.

Each next inside decision is like a case in a switch statement.

8.4. Splits

Splits are handy since they allow us to execute flows concurrently:

Of course, this means that the order isn't guaranteed.

Let's confirm that they still all get run. The flow steps will be performed in an arbitrary order, but the isolated step will always be last:

@Test public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception { // ... start job and wait for completion List stepExecutions = jobOperator.getStepExecutions(executionId); assertEquals(3, stepExecutions.size()); assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName()); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

9. Partitioning a Job

We can also consume the batch properties within our Java code which have been defined in our job.

They can be scoped at three levels – the job, the step, and the batch-artifact.

Let's see some examples of how they consumed.

When we want to consume the properties at job level:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties(); ...

This can be consumed at a step level as well:

@Inject StepContext stepContext; ... stepProperties = stepContext.getProperties(); ...

When we want to consume the properties at batch-artifact level:

@Inject @BatchProperty(name = "name") private String nameString;

This comes in handy with partitions.

See, with splits, we can run flows concurrently. But we can also partition a step into n sets of items or set separate inputs, allowing us another way to split up the work across multiple threads.

To comprehend the segment of work each partition should do, we can combine properties with partitions:

10. Stop and Restart

Now, that's it for defining jobs. Now let's talk for a minute about managing them.

We've already seen in our unit tests that we can get an instance of JobOperator from BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator();

And then, we can start the job:

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

However, we can also stop the job:

jobOperator.stop(executionId);

And lastly, we can restart the job:

executionId = jobOperator.restart(executionId, new Properties());

Let's see how we can stop a running job:

@Test public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Long executionId = jobOperator.start("simpleBatchLet", new Properties()); JobExecution jobExecution = jobOperator.getJobExecution(executionId); jobOperator.stop(executionId); jobExecution = BatchTestHelper.keepTestStopped(jobExecution); assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); }

And if a batch is STOPPED, then we can restart it:

@Test public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() { // ... start and stop the job assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED); executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties()); jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId)); assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED); }

11. Fetching Jobs

When a batch job is submitted then the batch runtime creates an instance of JobExecution to track it.

To obtain the JobExecution for an execution id, we can use the JobOperator#getJobExecution(executionId) method.

And, StepExecution provides helpful information for tracking a step's execution.

To obtain the StepExecution for an execution id, we can use the JobOperator#getStepExecutions(executionId) method.

And from that, we can get several metrics about the step via StepExecution#getMetrics:

@Test public void givenChunk_whenJobStarts_thenStepsHaveMetrics() throws Exception { // ... start job and wait for completion assertTrue(jobOperator.getJobNames().contains("simpleChunk")); assertTrue(jobOperator.getParameters(executionId).isEmpty()); StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0); Map metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics()); assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue()); assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue()); assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue()); // ... and many more! }

12. Disadvantages

JSR 352 is powerful, though it is lacking in a number of areas:

  • Es scheint an Lesern und Autoren zu mangeln, die andere Formate wie JSON verarbeiten können
  • Generika werden nicht unterstützt
  • Die Partitionierung unterstützt nur einen einzelnen Schritt
  • Die API bietet nichts zur Unterstützung der Zeitplanung (obwohl J2EE über ein separates Planungsmodul verfügt).
  • Aufgrund seiner asynchronen Natur kann das Testen eine Herausforderung sein
  • Die API ist ziemlich ausführlich

13. Schlussfolgerung

In diesem Artikel haben wir uns JSR 352 angesehen und mehr über Chunks, Batchlets, Splits, Flows und vieles mehr erfahren. Wir haben die Oberfläche jedoch kaum zerkratzt.

Wie immer ist der Demo-Code auf GitHub zu finden.