Spring Integration Java DSL

1. Einleitung

In diesem Tutorial lernen wir die Spring Integration Java DSL zum Erstellen von Anwendungsintegrationen kennen.

Wir nehmen die in Einführung in Spring Integration erstellte Integration zum Verschieben von Dateien und verwenden stattdessen DSL.

2. Abhängigkeiten

Das Java DSL von Spring Integration ist Teil von Spring Integration Core.

Also können wir diese Abhängigkeit hinzufügen:

 org.springframework.integration spring-integration-core 5.0.6.RELEASE 

Um an unserer Anwendung zum Verschieben von Dateien arbeiten zu können, benötigen wir außerdem die Spring-Integrationsdatei:

 org.springframework.integration spring-integration-file 5.0.6.RELEASE 

3. Spring Integration Java DSL

Vor dem Java DSL konfigurierten Benutzer Spring Integration-Komponenten in XML.

Mit DSL werden einige fließende Builder vorgestellt, mit denen wir problemlos eine vollständige Spring Integration-Pipeline nur in Java erstellen können.

Nehmen wir also an, wir wollten einen Kanal erstellen, der alle Daten, die durch die Pipe kommen, in Großbuchstaben schreibt.

In der Vergangenheit hätten wir vielleicht getan:

Und jetzt können wir stattdessen tun:

@Bean public IntegrationFlow upcaseFlow() { return IntegrationFlows.from("input") .transform(String::toUpperCase) .get(); }

4. Die App zum Verschieben von Dateien

Um mit der Integration von Dateien zu beginnen, benötigen wir einige einfache Bausteine.

4.1. Integrationsablauf

Der erste Baustein, den wir benötigen, ist ein Integrationsfluss, den wir vom IntegrationFlows- Builder abrufen können :

IntegrationFlows.from(...)

von kann verschiedene Typen annehmen, aber in diesem Tutorial werden wir uns nur drei ansehen:

  • MessageSource s
  • MessageChannel s und
  • String s

Wir werden in Kürze über alle drei sprechen.

Nachdem wir genannt haben aus sind einige Anpassungsmethoden uns jetzt zur Verfügung:

IntegrationFlow flow = IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .handle(targetDirectory()) // add more components .get();

Letztendlich erzeugt IntegrationFlows immer eine Instanz von IntegrationFlow, dem Endprodukt jeder Spring Integration-App.

Dieses Muster der Eingabe, Durchführung der entsprechenden Transformationen und Ausgabe der Ergebnisse ist für alle Spring Integration-Apps von grundlegender Bedeutung .

4.2. Beschreiben einer Eingabequelle

Um Dateien zu verschieben, müssen wir zunächst unserem Integrationsablauf angeben, wo sie gesucht werden sollen, und dafür benötigen wir eine MessageSource:

@Bean public MessageSource sourceDirectory() { // .. create a message source }

Einfach ausgedrückt ist eine MessageSource ein Ort, von dem Nachrichten stammen können, die außerhalb der Anwendung liegen.

Insbesondere benötigen wir etwas, das diese externe Quelle an die Spring-Messaging-Darstellung anpassen kann . Und da sich diese Anpassung auf die Eingabe konzentriert , werden diese häufig als Eingangskanaladapter bezeichnet.

Die Abhängigkeit von Spring-Integration-Dateien bietet uns einen Eingangskanaladapter, der sich hervorragend für unseren Anwendungsfall eignet: FileReadingMessageSource:

@Bean public MessageSource sourceDirectory() { FileReadingMessageSource messageSource = new FileReadingMessageSource(); messageSource.setDirectory(new File(INPUT_DIR)); return messageSource; }

Hier unsere FileReadingMessageSource wird ein Verzeichnis angegeben werden durch das Lesen INPUT_DIR und einen erstellen Message davon.

Geben Sie dies als Quelle in einem Aufruf von IntegrationFlows.from an :

IntegrationFlows.from(sourceDirectory());

4.3. Konfigurieren einer Eingabequelle

Wenn wir dies als langlebige Anwendung betrachten, möchten wir wahrscheinlich in der Lage sein, Dateien beim Eingang zu bemerken und nicht nur die Dateien zu verschieben, die bereits beim Start vorhanden sind.

Um dies zu erleichtern, aus kann auch zusätzliche nehmen Projektierer wie die weitere Anpassung der Eingangsquelle:

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

In diesem Fall können wir unsere Eingabequelle widerstandsfähiger machen, indem wir Spring Integration anweisen, diese Quelle - in diesem Fall unser Dateisystem - alle 10 Sekunden abzufragen.

Und natürlich gilt dies nicht nur für unsere Datei Eingangsquelle, könnten wir diese poller jedem hinzufügen Message .

4.4. Filtern von Nachrichten aus einer Eingabequelle

Next, let's suppose we want our file-moving application to move specific files only, say image files having jpg extension.

For this, we can use GenericSelector:

@Bean public GenericSelector onlyJpgs() { return new GenericSelector() { @Override public boolean accept(File source) { return source.getName().endsWith(".jpg"); } }; }

So, let's update our integration flow again:

IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs());

Or, because this filter is so simple, we could have instead defined it using a lambda:

IntegrationFlows.from(sourceDirectory()) .filter(source -> ((File) source).getName().endsWith(".jpg"));

4.5. Handling Messages With Service Activators

Now that we have a filtered list of files, we need to write them to a new location.

Service Activators are what we turn to when we're thinking about outputs in Spring Integration.

Let's use the FileWritingMessageHandler service activator from spring-integration-file:

@Bean public MessageHandler targetDirectory() { FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR)); handler.setFileExistsMode(FileExistsMode.REPLACE); handler.setExpectReply(false); return handler; }

Here, our FileWritingMessageHandler will write each Message payload it receives to OUTPUT_DIR.

Again, let's update:

IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .handle(targetDirectory());

And notice, by the way, the usage of setExpectReply. Because integration flows can bebidirectional, this invocation indicates that this particular pipe is one way.

4.6. Activating Our Integration Flow

When we have added all our components we need to register our IntegrationFlow as a bean to activate it:

@Bean public IntegrationFlow fileMover() { return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000))) .filter(onlyJpgs()) .handle(targetDirectory()) .get(); }

The get method extracts an IntegrationFlow instance that we need to register as a Spring Bean.

As soon as our application context loads, all our components contained in our IntegrationFlow gets activated.

And now, our application will start moving files from the source directory to target directory.

5. Additional Components

In our DSL-based file-moving application, we created an Inbound Channel Adapter, a Message Filter, and a Service Activator.

Let's look at a few other common Spring Integration components and see how we might use them.

5.1. Message Channels

As mentioned earlier, a Message Channel is another way to initialize a flow:

IntegrationFlows.from("anyChannel")

We can read this as “please find or create a channel bean called anyChannel. Then, read any data that is fed into anyChannel from other flows.”

But, really it is more general-purpose than that.

Simply put, a channel abstracts away producers from consumers, and we can think of it as a Java Queue. A channel can be inserted at any point in the flow.

Let's say, for example, that we want to prioritize the files as they get moved from one directory to the next:

@Bean public PriorityChannel alphabetically() { return new PriorityChannel(1000, (left, right) -> ((File)left.getPayload()).getName().compareTo( ((File)right.getPayload()).getName())); }

Then, we can insert an invocation to channel in between our flow:

@Bean public IntegrationFlow fileMover() { return IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .channel("alphabetically") .handle(targetDirectory()) .get(); }

There are dozens of channels to pick from, some of the more handy ones being for concurrency, auditing, or intermediate persistence (think Kafka or JMS buffers).

Also, channels can be powerful when combined with Bridges.

5.2. Bridge

When we want to combine two channels, we use a Bridge.

Let's imagine that instead of writing directly to an output directory, we instead had our file-moving app write to another channel:

@Bean public IntegrationFlow fileReader() { return IntegrationFlows.from(sourceDirectory()) .filter(onlyJpgs()) .channel("holdingTank") .get(); }

Now, because we've simply written it to a channel, we can bridge from there to other flows.

Let's create a bridge that polls our holding tank for messages and writes them to a destination:

@Bean public IntegrationFlow fileWriter() { return IntegrationFlows.from("holdingTank") .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20))) .handle(targetDirectory()) .get(); }

Again, because we wrote to an intermediate channel, now we can add another flow that takes these same files and writes them at a different rate:

@Bean public IntegrationFlow anotherFileWriter() { return IntegrationFlows.from("holdingTank") .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10))) .handle(anotherTargetDirectory()) .get(); }

As we can see, individual bridges can control the polling configuration for different handlers.

As soon as our application context is loaded, we now have a more complex app in action that will start moving files from the source directory to two target directories.

6. Conclusion

In diesem Artikel haben wir verschiedene Möglichkeiten gesehen, wie Sie mit Spring Integration Java DSL verschiedene Integrationspipelines erstellen können.

Im Wesentlichen konnten wir die Anwendung zum Verschieben von Dateien aus einem früheren Lernprogramm neu erstellen, diesmal mit reinem Java.

Wir haben uns auch einige andere Komponenten wie Kanäle und Brücken angesehen.

Der vollständige Quellcode, der in diesem Tutorial verwendet wird, ist auf Github verfügbar.