Erste Schritte mit der Stream-Verarbeitung mit Spring Cloud Data Flow

1. Einleitung

Spring Cloud Data Flow ist ein Cloud-natives Programmier- und Betriebsmodell für zusammensetzbare Datenmikroservices.

Mit Spring Cloud Data Flow können Entwickler Datenpipelines für häufig verwendete Anwendungsfälle wie Datenerfassung, Echtzeitanalyse und Datenimport / -export erstellen und orchestrieren.

Diese Daten-Pipelines gibt es in zwei Varianten: Streaming- und Batch-Daten-Pipelines.

Im ersten Fall wird eine unbegrenzte Datenmenge über Messaging-Middleware verbraucht oder produziert. Während im zweiten Fall die kurzlebige Aufgabe einen endlichen Datensatz verarbeitet und dann beendet.

Dieser Artikel konzentriert sich auf die Streaming-Verarbeitung.

2. Architekturübersicht

Die Schlüsselkomponenten dieser Art von Architektur sind Anwendungen , der Datenflussserver und die Ziellaufzeit.

Zusätzlich zu diesen Schlüsselkomponenten verfügen wir normalerweise über eine Datenfluss-Shell und einen Nachrichtenbroker innerhalb der Architektur.

Sehen wir uns all diese Komponenten genauer an.

2.1. Anwendungen

In der Regel umfasst eine Streaming-Datenpipeline das Konsumieren von Ereignissen aus externen Systemen, die Datenverarbeitung und die Polyglot-Persistenz. Diese Phasen werden in der Spring Cloud- Terminologie häufig als Quelle , Prozessor und Senke bezeichnet :

  • Quelle: ist die Anwendung, die Ereignisse verwendet
  • Prozessor: Verwendet Daten aus der Quelle , verarbeitet sie und sendet die verarbeiteten Daten an die nächste Anwendung in der Pipeline
  • Senke: Wird entweder von einer Quelle oder einem Prozessor verbraucht und schreibt die Daten in die gewünschte Persistenzschicht

Diese Anwendungen können auf zwei Arten gepackt werden:

  • Spring Boot uber-jar, das in einem Maven-Repository, einer Datei, http oder einer anderen Spring-Ressourcenimplementierung gehostet wird (diese Methode wird in diesem Artikel verwendet).
  • Docker

Viele Quellen-, Prozessor- und Senkenanwendungen für gängige Anwendungsfälle (z. B. jdbc, hdfs, http, router) werden bereits vom Spring Cloud Data Flow- Team bereitgestellt und können verwendet werden .

2.2. Laufzeit

Außerdem wird eine Laufzeit benötigt, damit diese Anwendungen ausgeführt werden können. Die unterstützten Laufzeiten sind:

  • Wolkengießerei
  • Apache YARN
  • Kubernetes
  • Apache Mesos
  • Lokaler Server für die Entwicklung (der in diesem Artikel verwendet wird)

2.3. Datenflussserver

Die Komponente, die für die Bereitstellung von Anwendungen zur Laufzeit verantwortlich ist, ist der Datenflussserver . Für jede der Ziellaufzeiten wird eine ausführbare JAR-Datei von Data Flow Server bereitgestellt.

Der Datenflussserver ist für die Interpretation verantwortlich:

  • Ein Stream-DSL, der den logischen Datenfluss durch mehrere Anwendungen beschreibt.
  • Ein Bereitstellungsmanifest, das die Zuordnung von Anwendungen zur Laufzeit beschreibt.

2.4. Datenfluss-Shell

Die Datenfluss-Shell ist ein Client für den Datenflussserver. Mit der Shell können wir den DSL-Befehl ausführen, der für die Interaktion mit dem Server erforderlich ist.

Als Beispiel würde das DSL zur Beschreibung des Datenflusses von einer http-Quelle zu einer JDBC-Senke als "http |" geschrieben jdbc ”. Diese Namen im DSL mit den registrierten Data Flow Server und Karte auf Anwendungsartefakte , die in Maven oder Docker Repositories gehostet werden können.

Spring bietet auch eine grafische Oberfläche mit dem Namen Flo zum Erstellen und Überwachen von Streaming-Daten-Pipelines. Seine Verwendung liegt jedoch außerhalb der Diskussion dieses Artikels.

2.5. Nachrichtenbroker

Wie wir im Beispiel des vorherigen Abschnitts gesehen haben, haben wir das Pipe-Symbol für die Definition des Datenflusses verwendet. Das Pipe-Symbol repräsentiert die Kommunikation zwischen den beiden Anwendungen über Messaging-Middleware.

Dies bedeutet, dass wir einen Nachrichtenbroker benötigen, der in der Zielumgebung ausgeführt wird.

Die beiden unterstützten Messaging-Middleware-Broker sind:

  • Apache Kafka
  • RabbitMQ

Jetzt, da wir einen Überblick über die Architekturkomponenten haben, ist es Zeit, unsere erste Stream-Verarbeitungs-Pipeline zu erstellen.

3. Installieren Sie einen Message Broker

Wie wir gesehen haben, benötigen die Anwendungen in der Pipeline eine Messaging-Middleware für die Kommunikation. Für den Zweck dieses Artikels werden wir mit RabbitMQ gehen .

Ausführliche Informationen zur Installation finden Sie in den Anweisungen auf der offiziellen Website.

4. Der lokale Datenflussserver

Um die Erstellung unserer Anwendungen zu beschleunigen, verwenden wir Spring Initializr. Mit seiner Hilfe können wir unsere Spring Boot- Anwendungen in wenigen Minuten erhalten.

Nachdem Sie die Website erreicht haben, wählen Sie einfach eine Gruppe und einen Artefaktnamen .

Klicken Sie anschließend auf die Schaltfläche Projekt generieren , um den Download des Maven-Artefakts zu starten.

Entpacken Sie das Projekt nach Abschluss des Downloads und importieren Sie es als Maven-Projekt in die IDE Ihrer Wahl.

Fügen wir dem Projekt eine Maven-Abhängigkeit hinzu. Da wir lokale Dataflow-Serverbibliotheken benötigen , fügen wir die lokale Abhängigkeit von Spring-Cloud-Starter-Dataflow-Server-Local hinzu:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Now we need to annotate the Spring Boot main class with @EnableDataFlowServer annotation:

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } } 

That's all. Our Local Data Flow Server is ready to be executed:

mvn spring-boot:run

The application will boot up on port 9393.

5. The Data Flow Shell

Again, go to the Spring Initializr and choose a Group and Artifact name.

Once we've downloaded and imported the project, let's add a spring-cloud-dataflow-shell dependency:

 org.springframework.cloud spring-cloud-dataflow-shell 

Now we need to add the @EnableDataFlowShell annotation to the Spring Boot main class:

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } } 

We can now run the shell:

mvn spring-boot:run

After the shell is running, we can type the help command in the prompt to see a complete list of command that we can perform.

6. The Source Application

Similarly, on Initializr, we'll now create a simple application and add a Stream Rabbit dependency called spring-cloud-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

We'll then add the @EnableBinding(Source.class) annotation to the Spring Boot main class:

@EnableBinding(Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeSourceApplication.class, args); } }

Now we need to define the source of the data that must be processed. This source could be any potentially endless workload (internet-of-things sensor data, 24/7 event processing, online transaction data ingest).

In our sample application, we produce one event (for simplicity a new timestamp) every 10 seconds with a Poller.

The @InboundChannelAdapter annotation sends a message to the source’s output channel, using the return value as the payload of the message:

@Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource timeMessageSource() { return () -> MessageBuilder.withPayload(new Date().getTime()).build(); } 

Our data source is ready.

7. The Processor Application

Next- we'll create an application and add a Stream Rabbit dependency.

We'll then add the @EnableBinding(Processor.class) annotation to the Spring Boot main class:

@EnableBinding(Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeProcessorApplication.class, args); } }

Next, we need to define a method to process the data that coming from the source application.

To define a transformer, we need to annotate this method with @Transformer annotation:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); return date; }

It converts a timestamp from the ‘input' channel to a formatted date which will be sent to the ‘output' channel.

8. The Sink Application

The last application to create is the Sink application.

Again, go to the Spring Initializr and choose a Group, an Artifact name. After downloading the project let's add a Stream Rabbit dependency.

Then add the @EnableBinding(Sink.class) annotation to the Spring Boot main class:

@EnableBinding(Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowLoggingSinkApplication.class, args); } }

Now we need a method to intercept the messages coming from the processor application.

To do this, we need to add the @StreamListener(Sink.INPUT) annotation to our method:

@StreamListener(Sink.INPUT) public void loggerSink(String date) { logger.info("Received: " + date); }

The method simply prints the timestamp transformed in a formatted date to a log file.

9. Register a Stream App

The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.

We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.

When providing a URI with the maven scheme, the format should conform to the following:

maven://:[:[:]]:

To register the Source, Processor and Sink applications previously created , go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:

app register --name time-source --type source --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT app register --name time-processor --type processor --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT 

10. Create and Deploy the Stream

To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:

stream create --name time-to-log --definition 'time-source | time-processor | logging-sink'

This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink'.

Then to deploy the stream execute the following shell command:

stream deploy --name time-to-log

The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.

If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:

2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Reviewing the Result

In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.

The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Conclusion

In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.

Außerdem haben wir die Rolle von Quell- , Prozessor- und Sink- Anwendungen im Stream und das Einstecken und Binden dieses Moduls in einen Datenflussserver mithilfe der Datenfluss-Shell erkannt .

Der Beispielcode befindet sich im GitHub-Projekt.