Spring Batch mit Partitioner

1. Übersicht

In unserer vorherigen Einführung in Spring Batch haben wir das Framework als Stapelverarbeitungswerkzeug eingeführt. Wir haben auch die Konfigurationsdetails und die Implementierung für eine Single-Threaded-Single-Process-Jobausführung untersucht.

Um einen Job mit einer parallelen Verarbeitung zu implementieren, stehen verschiedene Optionen zur Verfügung. Auf einer höheren Ebene gibt es zwei Arten der Parallelverarbeitung:

  1. Single-Process, Multithread
  2. Multi-Prozess

In diesem kurzen Artikel wird die Partitionierung von Step erläutert , die sowohl für Einzelprozess- als auch für Mehrprozessjobs implementiert werden kann.

2. Partitionieren eines Schritts

Spring Batch mit Partitionierung bietet uns die Möglichkeit, die Ausführung eines Schritts zu teilen :

Partitionierungsübersicht

Das obige Bild zeigt eine Implementierung eines Jobs mit einem partitionierten Schritt .

Es gibt einen Schritt namens "Master", dessen Ausführung in einige "Slave" -Schritte unterteilt ist. Diese Slaves können den Platz eines Masters einnehmen, und das Ergebnis bleibt unverändert. Sowohl Master als auch Slave sind Instanzen von Step . Slaves können Remote-Dienste sein oder nur lokal Threads ausführen.

Bei Bedarf können wir Daten vom Master an den Slave übergeben. Die Metadaten (dh das JobRepository ) stellen sicher, dass jeder Slave in einer einzelnen Ausführung des Jobs nur einmal ausgeführt wird .

Hier ist das Sequenzdiagramm, das zeigt, wie alles funktioniert:

Partitionierungsschritt

Wie gezeigt, steuert der PartitionStep die Ausführung. Der PartitionHandler ist dafür verantwortlich, die Arbeit von „Master“ in die „Slaves“ aufzuteilen. Der Schritt ganz rechts ist der Sklave.

3. Der Maven POM

Die Maven-Abhängigkeiten sind die gleichen wie in unserem vorherigen Artikel erwähnt. Das heißt, Spring Core, Spring Batch und die Abhängigkeit für die Datenbank (in unserem Fall SQLite ).

4. Konfiguration

In unserem Einführungsartikel haben wir ein Beispiel für die Konvertierung einiger Finanzdaten von CSV in XML-Dateien gesehen. Lassen Sie uns das gleiche Beispiel erweitern.

Hier konvertieren wir die Finanzinformationen aus 5 CSV-Dateien mithilfe einer Multithread-Implementierung in entsprechende XML-Dateien.

Wir können dies mit einer einzigen Job- und Step- Partitionierung erreichen. Wir haben fünf Threads, einen für jede der CSV-Dateien.

Lassen Sie uns zunächst einen Job erstellen:

@Bean(name = "partitionerJob") public Job partitionerJob() throws UnexpectedInputException, MalformedURLException, ParseException { return jobs.get("partitioningJob") .start(partitionStep()) .build(); }

Wie wir sehen können, beginnt dieser Job mit dem PartitioningStep . Dies ist unser Hauptschritt, der in verschiedene Slave-Schritte unterteilt wird:

@Bean public Step partitionStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("partitionStep") .partitioner("slaveStep", partitioner()) .step(slaveStep()) .taskExecutor(taskExecutor()) .build(); }

Hier erstellen wir den PartitioningStep mit der StepBuilderFactory . Dazu müssen wir die Informationen über die SlaveSteps und den Partitioner geben .

Der Partitionierer ist eine Schnittstelle, die die Möglichkeit bietet, einen Satz von Eingabewerten für jeden der Slaves zu definieren. Mit anderen Worten, hier geht es um die Logik, Aufgaben in entsprechende Threads zu unterteilen.

Erstellen wir eine Implementierung namens CustomMultiResourcePartitioner , in der wir die Namen der Eingabe- und Ausgabedateien in den ExecutionContext einfügen , um sie an jeden Slave-Schritt weiterzuleiten:

public class CustomMultiResourcePartitioner implements Partitioner { @Override public Map partition(int gridSize) { Map map = new HashMap(gridSize); int i = 0, k = 1; for (Resource resource : resources) { ExecutionContext context = new ExecutionContext(); Assert.state(resource.exists(), "Resource does not exist: " + resource); context.putString(keyName, resource.getFilename()); context.putString("opFileName", "output"+k+++".xml"); map.put(PARTITION_KEY + i, context); i++; } return map; } }

Wir werden auch die Bean für diese Klasse erstellen, in der wir das Quellverzeichnis für Eingabedateien angeben:

@Bean public CustomMultiResourcePartitioner partitioner() { CustomMultiResourcePartitioner partitioner = new CustomMultiResourcePartitioner(); Resource[] resources; try { resources = resoursePatternResolver .getResources("file:src/main/resources/input/*.csv"); } catch (IOException e) { throw new RuntimeException("I/O problems when resolving" + " the input file pattern.", e); } partitioner.setResources(resources); return partitioner; }

Wir werden den Slave-Schritt definieren, genau wie jeden anderen Schritt mit dem Leser und dem Schreiber. Der Leser und der Schreiber sind dieselben wie in unserem Einführungsbeispiel, außer dass sie den Dateinamenparameter vom StepExecutionContext erhalten.

Beachten Sie, dass diese Beans schrittweise festgelegt werden müssen, damit sie bei jedem Schritt die Parameter stepExecutionContext empfangen können . Wenn sie keinen Schrittbereich haben, werden ihre Beans zunächst erstellt und akzeptieren die Dateinamen auf Stufenebene nicht:

@StepScope @Bean public FlatFileItemReader itemReader( @Value("#{stepExecutionContext[fileName]}") String filename) throws UnexpectedInputException, ParseException { FlatFileItemReader reader = new FlatFileItemReader(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); String[] tokens = {"username", "userid", "transactiondate", "amount"}; tokenizer.setNames(tokens); reader.setResource(new ClassPathResource("input/" + filename)); DefaultLineMapper lineMapper = new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new RecordFieldSetMapper()); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper); return reader; } 
@Bean @StepScope public ItemWriter itemWriter(Marshaller marshaller, @Value("#{stepExecutionContext[opFileName]}") String filename) throws MalformedURLException { StaxEventItemWriter itemWriter = new StaxEventItemWriter(); itemWriter.setMarshaller(marshaller); itemWriter.setRootTagName("transactionRecord"); itemWriter.setResource(new ClassPathResource("xml/" + filename)); return itemWriter; }

Während wir den Leser und Schreiber im Slave-Schritt erwähnen, können wir die Argumente als null übergeben, da diese Dateinamen nicht verwendet werden, da sie die Dateinamen von stepExecutionContext erhalten :

@Bean public Step slaveStep() throws UnexpectedInputException, MalformedURLException, ParseException { return steps.get("slaveStep").chunk(1) .reader(itemReader(null)) .writer(itemWriter(marshaller(), null)) .build(); }

5. Schlussfolgerung

In diesem Tutorial haben wir erläutert, wie Sie einen Job mit paralleler Verarbeitung mithilfe von Spring Batch implementieren.

Wie immer ist die vollständige Implementierung für dieses Beispiel auf GitHub verfügbar.