Spring Batch - Tasklets gegen Chunks

1. Einleitung

Spring Batch bietet zwei verschiedene Möglichkeiten zum Implementieren eines Jobs: Verwenden von Tasklets und Chunks .

In diesem Artikel erfahren Sie anhand eines einfachen Beispiels aus der Praxis, wie Sie beide Methoden konfigurieren und implementieren.

2. Abhängigkeiten

Beginnen wir mit dem Hinzufügen der erforderlichen Abhängigkeiten :

 org.springframework.batch spring-batch-core 4.2.0.RELEASE   org.springframework.batch spring-batch-test 4.2.0.RELEASE test 

Informationen zur neuesten Version des Spring-Batch-Core- und Spring-Batch-Tests finden Sie unter Maven Central.

3. Unser Anwendungsfall

Betrachten wir eine CSV-Datei mit folgendem Inhalt:

Mae Hodges,10/22/1972 Gary Potter,02/22/1953 Betty Wise,02/17/1968 Wayne Rose,04/06/1977 Adam Caldwell,09/27/1995 Lucille Phillips,05/14/1992

Die erste Position jeder Zeile repräsentiert den Namen einer Person und die zweite Position repräsentiert ihr Geburtsdatum .

Unser Anwendungsfall besteht darin, eine weitere CSV-Datei zu generieren, die den Namen und das Alter jeder Person enthält :

Mae Hodges,45 Gary Potter,64 Betty Wise,49 Wayne Rose,40 Adam Caldwell,22 Lucille Phillips,25

Nachdem unsere Domäne klar ist, können wir mit beiden Ansätzen eine Lösung entwickeln. Wir werden mit Tasklets beginnen.

4. Tasklets-Ansatz

4.1. Einführung und Design

Tasklets sollen eine einzelne Aufgabe innerhalb eines Schritts ausführen. Unsere Aufgabe besteht aus mehreren Schritten, die nacheinander ausgeführt werden. Jeder Schritt sollte nur eine definierte Aufgabe ausführen .

Unsere Aufgabe besteht aus drei Schritten:

  1. Lesen Sie Zeilen aus der CSV-Eingabedatei.
  2. Berechnen Sie das Alter für jede Person in der CSV-Eingabedatei.
  3. Schreiben Sie den Namen und das Alter jeder Person in eine neue Ausgabe-CSV-Datei.

Nachdem das Gesamtbild fertig ist, erstellen wir eine Klasse pro Schritt.

LinesReader ist für das Lesen der Daten aus der Eingabedatei verantwortlich:

public class LinesReader implements Tasklet { // ... }

LinesProcessor berechnet das Alter für jede Person in der Datei:

public class LinesProcessor implements Tasklet { // ... }

Schließlich hat LinesWriter die Verantwortung, Namen und Alter in eine Ausgabedatei zu schreiben:

public class LinesWriter implements Tasklet { // ... }

Zu diesem Zeitpunkt implementieren alle unsere Schritte die Tasklet- Schnittstelle . Das wird uns zwingen, seine Ausführungsmethode zu implementieren :

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { // ... }

Bei dieser Methode fügen wir die Logik für jeden Schritt hinzu. Bevor wir mit diesem Code beginnen, konfigurieren wir unseren Job.

4.2. Aufbau

Wir müssen dem Anwendungskontext von Spring eine Konfiguration hinzufügen . Nachdem Sie die Standard-Bean-Deklaration für die im vorherigen Abschnitt erstellten Klassen hinzugefügt haben, können Sie unsere Jobdefinition erstellen:

@Configuration @EnableBatchProcessing public class TaskletsConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean protected Step readLines() { return steps .get("readLines") .tasklet(linesReader()) .build(); } @Bean protected Step processLines() { return steps .get("processLines") .tasklet(linesProcessor()) .build(); } @Bean protected Step writeLines() { return steps .get("writeLines") .tasklet(linesWriter()) .build(); } @Bean public Job job() { return jobs .get("taskletsJob") .start(readLines()) .next(processLines()) .next(writeLines()) .build(); } // ... }

Dies bedeutet, dass unser „TaskletsJob“ aus drei Schritten besteht. Die erste ( readLines ) führt das im Bean linesReader definierte Tasklet aus und fährt mit dem nächsten Schritt fort: processLines. ProcessLines führt das im Bean LinesProcessor definierte Tasklet aus und fährt mit dem letzten Schritt fort: writeLines .

Unser Jobfluss ist definiert und wir sind bereit, eine Logik hinzuzufügen!

4.3. Modell und Utils

Da wir Zeilen in einer CSV-Datei bearbeiten, erstellen wir eine Klassenzeile :

public class Line implements Serializable { private String name; private LocalDate dob; private Long age; // standard constructor, getters, setters and toString implementation }

Bitte beachten Sie, dass Line Serializable implementiert . Dies liegt daran, dass Line als DTO fungiert, um Daten zwischen Schritten zu übertragen. Gemäß Spring Batch müssen Objekte, die zwischen Schritten übertragen werden, serialisierbar sein .

Auf der anderen Seite können wir anfangen, über das Lesen und Schreiben von Zeilen nachzudenken.

Dafür verwenden wir OpenCSV:

 com.opencsv opencsv 4.1 

Suchen Sie in Maven Central nach der neuesten OpenCSV-Version.

Sobald OpenCSV enthalten ist, erstellen wir auch eine FileUtils- Klasse . Es werden Methoden zum Lesen und Schreiben von CSV-Zeilen bereitgestellt:

public class FileUtils { public Line readLine() throws Exception { if (CSVReader == null) initReader(); String[] line = CSVReader.readNext(); if (line == null) return null; return new Line( line[0], LocalDate.parse( line[1], DateTimeFormatter.ofPattern("MM/dd/yyyy"))); } public void writeLine(Line line) throws Exception { if (CSVWriter == null) initWriter(); String[] lineStr = new String[2]; lineStr[0] = line.getName(); lineStr[1] = line .getAge() .toString(); CSVWriter.writeNext(lineStr); } // ... }

Beachten Sie, dass readLine als Wrapper über die readNext- Methode von OpenCSV fungiert und ein Line- Objekt zurückgibt .

Auf die gleiche Weise umschließt writeLine den writeNext von OpenCSV , der ein Line- Objekt empfängt . Die vollständige Implementierung dieser Klasse finden Sie im GitHub-Projekt.

An diesem Punkt beginnen wir alle mit jeder Schrittimplementierung.

4.4. LinesReader

Lassen Sie uns fortfahren und unsere LinesReader- Klasse abschließen :

public class LinesReader implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesReader.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { lines = new ArrayList(); fu = new FileUtils( "taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Lines Reader initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { Line line = fu.readLine(); while (line != null) { lines.add(line); logger.debug("Read line: " + line.toString()); line = fu.readLine(); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines); logger.debug("Lines Reader ended."); return ExitStatus.COMPLETED; } }

Die Ausführungsmethode von LinesReader erstellt eine FileUtils- Instanz über den Pfad der Eingabedatei. Dann fügt Zeilen zu einer Liste , bis es nicht mehr Linien sind zu lesen .

Unsere Klasse implementiert auch StepExecutionListener , der zwei zusätzliche Methoden bereitstellt: beforeStep und afterStep . Wir werden diese Methoden verwenden, um Dinge vor und nach Ausführungsläufen zu initialisieren und zu schließen .

Wenn wir uns den afterStep- Code ansehen , werden wir die Zeile bemerken, in der die Ergebnisliste ( Zeilen) in den Kontext des Jobs gestellt wird, um sie für den nächsten Schritt verfügbar zu machen:

stepExecution .getJobExecution() .getExecutionContext() .put("lines", this.lines);

Zu diesem Zeitpunkt hat unser erster Schritt bereits seine Verantwortung erfüllt: Laden Sie CSV-Zeilen in eine Liste im Speicher. Fahren wir mit dem zweiten Schritt fort und verarbeiten sie.

4.5. LinesProcessor

LinesProcessor implementiert auch StepExecutionListener und natürlich Tasklet . Das bedeutetdass es implementieren beforeStep , ausführen und Afterstep Methoden auch:

public class LinesProcessor implements Tasklet, StepExecutionListener { private Logger logger = LoggerFactory.getLogger( LinesProcessor.class); private List lines; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); logger.debug("Lines Processor initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { long age = ChronoUnit.YEARS.between( line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Lines Processor ended."); return ExitStatus.COMPLETED; } }

Es ist leicht zu verstehen , dass es lädt Linien Liste aus dem Kontext des Jobs und berechnet das Alter jeder Person .

Es ist nicht erforderlich, eine weitere Ergebnisliste in den Kontext einzufügen, da Änderungen an demselben Objekt vorgenommen werden, das aus dem vorherigen Schritt stammt.

Und wir sind bereit für unseren letzten Schritt.

4.6. LinesWriter

LinesWriter ‚s Aufgabe ist es vorbei zu gehen Linien Liste und Schreib Namen und das Alter in die Ausgabedatei :

public class LinesWriter implements Tasklet, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private List lines; private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { ExecutionContext executionContext = stepExecution .getJobExecution() .getExecutionContext(); this.lines = (List) executionContext.get("lines"); fu = new FileUtils("output.csv"); logger.debug("Lines Writer initialized."); } @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } return RepeatStatus.FINISHED; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Lines Writer ended."); return ExitStatus.COMPLETED; } }

We're done with our job's implementation! Let's create a test to run it and see the results.

4.7. Running the Job

To run the job, we'll create a test:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = TaskletsConfig.class) public class TaskletsTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenTaskletsJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

ContextConfiguration annotation is pointing to the Spring context configuration class, that has our job definition.

We'll need to add a couple of extra beans before running the test:

@Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } @Bean public JobRepository jobRepository() throws Exception { MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(); factory.setTransactionManager(transactionManager()); return (JobRepository) factory.getObject(); } @Bean public PlatformTransactionManager transactionManager() { return new ResourcelessTransactionManager(); } @Bean public JobLauncher jobLauncher() throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository()); return jobLauncher; }

Everything is ready! Go ahead and run the test!

After the job has finished, output.csv has the expected content and logs show the execution flow:

[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized. [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended. [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized. [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended. [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized. [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.

That's it for Tasklets. Now we can move on to the Chunks approach.

5. Chunks Approach

5.1. Introduction and Design

As the name suggests, this approach performs actions over chunks of data. That is, instead of reading, processing and writing all the lines at once, it'll read, process and write a fixed amount of records (chunk) at a time.

Then, it'll repeat the cycle until there's no more data in the file.

As a result, the flow will be slightly different:

  1. While there're lines:
    • Do for X amount of lines:
      • Read one line
      • Process one line
    • Write X amount of lines.

So, we also need to create three beans for chunk oriented approach:

public class LineReader { // ... }
public class LineProcessor { // ... }
public class LinesWriter { // ... }

Before moving to implementation, let's configure our job.

5.2. Configuration

The job definition will also look different:

@Configuration @EnableBatchProcessing public class ChunksConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public ItemReader itemReader() { return new LineReader(); } @Bean public ItemProcessor itemProcessor() { return new LineProcessor(); } @Bean public ItemWriter itemWriter() { return new LinesWriter(); } @Bean protected Step processLines(ItemReader reader, ItemProcessor processor, ItemWriter writer) { return steps.get("processLines"). chunk(2) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public Job job() { return jobs .get("chunksJob") .start(processLines(itemReader(), itemProcessor(), itemWriter())) .build(); } }

In this case, there's only one step performing only one tasklet.

However, that tasklet defines a reader, a writer and a processor that will act over chunks of data.

Note that the commit interval indicates the amount of data to be processed in one chunk. Our job will read, process and write two lines at a time.

Now we're ready to add our chunk logic!

5.3. LineReader

LineReader will be in charge of reading one record and returning a Line instance with its content.

To become a reader, our class has to implement ItemReader interface:

public class LineReader implements ItemReader { @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } }

The code is straightforward, it just reads one line and returns it. We'll also implement StepExecutionListener for the final version of this class:

public class LineReader implements ItemReader, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LineReader.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv"); logger.debug("Line Reader initialized."); } @Override public Line read() throws Exception { Line line = fu.readLine(); if (line != null) logger.debug("Read line: " + line.toString()); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeReader(); logger.debug("Line Reader ended."); return ExitStatus.COMPLETED; } }

It should be noticed that beforeStep and afterStep execute before and after the whole step respectively.

5.4. LineProcessor

LineProcessor follows pretty much the same logic than LineReader.

However, in this case, we'll implement ItemProcessor and its method process():

public class LineProcessor implements ItemProcessor { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug("Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } }

The process() method takes an input line, processes it and returns an output line. Again, we'll also implement StepExecutionListener:

public class LineProcessor implements ItemProcessor, StepExecutionListener { private Logger logger = LoggerFactory.getLogger(LineProcessor.class); @Override public void beforeStep(StepExecution stepExecution) { logger.debug("Line Processor initialized."); } @Override public Line process(Line line) throws Exception { long age = ChronoUnit.YEARS .between(line.getDob(), LocalDate.now()); logger.debug( "Calculated age " + age + " for line " + line.toString()); line.setAge(age); return line; } @Override public ExitStatus afterStep(StepExecution stepExecution) { logger.debug("Line Processor ended."); return ExitStatus.COMPLETED; } }

5.5. LinesWriter

Unlike reader and processor, LinesWriter will write an entire chunk of lines so that it receives a List of Lines:

public class LinesWriter implements ItemWriter, StepExecutionListener { private final Logger logger = LoggerFactory .getLogger(LinesWriter.class); private FileUtils fu; @Override public void beforeStep(StepExecution stepExecution) { fu = new FileUtils("output.csv"); logger.debug("Line Writer initialized."); } @Override public void write(List lines) throws Exception { for (Line line : lines) { fu.writeLine(line); logger.debug("Wrote line " + line.toString()); } } @Override public ExitStatus afterStep(StepExecution stepExecution) { fu.closeWriter(); logger.debug("Line Writer ended."); return ExitStatus.COMPLETED; } }

LinesWriter code speaks for itself. And again, we're ready to test our job.

5.6. Running the Job

We'll create a new test, same as the one we created for the tasklets approach:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = ChunksConfig.class) public class ChunksTest { @Autowired private JobLauncherTestUtils jobLauncherTestUtils; @Test public void givenChunksJob_whenJobEnds_thenStatusCompleted() throws Exception { JobExecution jobExecution = jobLauncherTestUtils.launchJob(); assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus()); } }

After configuring ChunksConfig as explained above for TaskletsConfig, we're all set to run the test!

Once the job is done, we can see that output.csv contains the expected result again, and the logs describe the flow:

[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized. [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized. [main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995] [main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22] [main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25] [main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended. [main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended. [main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.

We have the same result and a different flow. Logs make evident how the job executes following this approach.

6. Conclusion

Unterschiedliche Kontexte zeigen die Notwendigkeit des einen oder anderen Ansatzes. Während sich Tasklets für "eine Aufgabe nach der anderen" -Szenarien natürlicher anfühlen, bieten Chunks eine einfache Lösung für paginierte Lesevorgänge oder Situationen, in denen wir keine signifikante Datenmenge im Speicher behalten möchten.

Die vollständige Implementierung dieses Beispiels finden Sie im GitHub-Projekt .