Einführung in Spring Cloud Stream

1. Übersicht

Spring Cloud Stream ist ein Framework, das auf Spring Boot und Spring Integration basiert und bei der Erstellung ereignisgesteuerter oder nachrichtengesteuerter Mikrodienste hilft .

In diesem Artikel werden Konzepte und Konstrukte von Spring Cloud Stream anhand einiger einfacher Beispiele vorgestellt.

2. Maven-Abhängigkeiten

Um zu beginnen, müssen wir den Spring Cloud Starter Stream mit der Broker-RabbitMQ Maven-Abhängigkeit als Messaging-Middleware zu unserer pom.xml hinzufügen :

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

Und wir werden die Modulabhängigkeit von Maven Central hinzufügen, um auch die JUnit-Unterstützung zu aktivieren:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE test 

3. Hauptkonzepte

Die Microservices-Architektur folgt dem Prinzip „Smart Endpoints and Dumb Pipes“. Die Kommunikation zwischen Endpunkten wird von Messaging-Middleware-Parteien wie RabbitMQ oder Apache Kafka gesteuert. Dienste kommunizieren, indem sie Domänenereignisse über diese Endpunkte oder Kanäle veröffentlichen .

Lassen Sie uns die Konzepte des Spring Cloud Stream-Frameworks sowie die wesentlichen Paradigmen durchgehen, die wir kennen müssen, um nachrichtengesteuerte Dienste zu erstellen.

3.1. Konstruiert

Schauen wir uns einen einfachen Dienst in Spring Cloud Stream an, der die Eingabebindung abhört und eine Antwort auf die Ausgabebindung sendet :

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }

Die Anmerkung @EnableBinding konfiguriert die Anwendung der Kanäle zu binden INPUT und OUTPUT innerhalb der Schnittstelle definiert Prozessors . Beide Kanäle sind Bindungen, die für die Verwendung einer konkreten Messaging-Middleware oder eines Ordners konfiguriert werden können.

Werfen wir einen Blick auf die Definition all dieser Konzepte:

  • Bindungen - eine Sammlung von Schnittstellen, die die Eingangs- und Ausgangskanäle deklarativ identifizieren
  • Binder - Implementierung von Messaging-Middleware wie Kafka oder RabbitMQ
  • Kanal - repräsentiert die Kommunikationsleitung zwischen Messaging-Middleware und der Anwendung
  • StreamListeners - Nachrichtenbehandlungsmethoden in Beans, die automatisch für eine Nachricht aus dem Kanal aufgerufen werden, nachdem der MessageConverter die Serialisierung / Deserialisierung zwischen Middleware-spezifischen Ereignissen und Domänenobjekttypen / POJOs durchgeführt hat
  • Mes Salbei Schemen - für die Serialisierung und Deserialisierung von Nachrichten verwendet werden, kann dieses Schema statisch von einem Ort gelesen werden oder dynamisch geladen, um die Entwicklung von Domain - Objekttypen unterstützt

3.2. Kommunikationsmuster

Für Ziele bestimmte Nachrichten werden vom Publish-Subscribe- Nachrichtenmuster übermittelt. Publisher kategorisieren Nachrichten in Themen, die jeweils durch einen Namen gekennzeichnet sind. Abonnenten zeigen Interesse an einem oder mehreren Themen. Die Middleware filtert die Nachrichten und liefert den Abonnenten die interessanten Themen.

Jetzt konnten die Abonnenten gruppiert werden. Eine Verbrauchergruppe ist eine Gruppe von Abonnenten oder Verbrauchern, die durch eine Gruppen-ID identifiziert werden und in denen Nachrichten von einem Thema oder einer Themenpartition auf eine lastausgeglichene Weise übermittelt werden.

4. Programmiermodell

In diesem Abschnitt werden die Grundlagen zum Erstellen von Spring Cloud Stream-Anwendungen beschrieben.

4.1. Funktionsprüfung

Die Testunterstützung ist eine Binder-Implementierung, die die Interaktion mit den Kanälen und das Überprüfen von Nachrichten ermöglicht.

Senden wir eine Nachricht an den oben genannten bereicherungslogMessage- Dienst und prüfen, ob die Antwort am Anfang der Nachricht den Text „[1]:“ enthält:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = MyLoggerServiceApplication.class) @DirtiesContext public class MyLoggerApplicationTests { @Autowired private Processor pipe; @Autowired private MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText() { pipe.input() .send(MessageBuilder.withPayload(new LogMessage("This is my message")) .build()); Object payload = messageCollector.forChannel(pipe.output()) .poll() .getPayload(); assertEquals("[1]: This is my message", payload.toString()); } }

4.2. Benutzerdefinierte Kanäle

Im obigen Beispiel haben wir die von Spring Cloud bereitgestellte Prozessorschnittstelle verwendet, die nur einen Eingangs- und einen Ausgangskanal hat.

Wenn wir etwas anderes benötigen, wie einen Eingangs- und zwei Ausgangskanäle, können wir einen benutzerdefinierten Prozessor erstellen:

public interface MyProcessor { String INPUT = "myInput"; @Input SubscribableChannel myInput(); @Output("myOutput") MessageChannel anOutput(); @Output MessageChannel anotherOutput(); }

Spring wird die ordnungsgemäße Implementierung dieser Schnittstelle für uns bereitstellen. Die Kanalnamen können mithilfe von Anmerkungen wie in @Output („myOutput“) festgelegt werden .

Andernfalls verwendet Spring die Methodennamen als Kanalnamen. Daher haben wir drei Kanäle mit den Namen myInput , myOutput und anotherOutput .

Stellen wir uns nun vor, wir möchten die Nachrichten an eine Ausgabe weiterleiten, wenn der Wert kleiner als 10 ist, und an eine andere Ausgabe, wenn der Wert größer oder gleich 10 ist:

@Autowired private MyProcessor processor; @StreamListener(MyProcessor.INPUT) public void routeValues(Integer val) { if (val < 10) { processor.anOutput().send(message(val)); } else { processor.anotherOutput().send(message(val)); } } private static final  Message message(T val) { return MessageBuilder.withPayload(val).build(); }

4.3. Bedingter Versand

Mithilfe der Annotation @StreamListener können wir auch die Nachrichten, die wir im Consumer erwarten, unter Verwendung jeder Bedingung filtern, die wir mit SpEL-Ausdrücken definieren.

Als Beispiel könnten wir das bedingte Dispatching als einen anderen Ansatz verwenden, um Nachrichten in verschiedene Ausgaben weiterzuleiten:

@Autowired private MyProcessor processor; @StreamListener( target = MyProcessor.INPUT, condition = "payload = 10") public void routeValuesToAnotherOutput(Integer val) { processor.anotherOutput().send(message(val)); }

Die einzige Einschränkung dieses Ansatzes besteht darin, dass diese Methoden keinen Wert zurückgeben dürfen.

5. Setup

Let's set up the application that will process the message from the RabbitMQ broker.

5.1. Binder Configuration

We can configure our application to use the default binder implementation via META-INF/spring.binders:

rabbit:\ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Or we can add the binder library for RabbitMQ to the classpath by including this dependency:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 1.3.0.RELEASE 

If no binder implementation is provided, Spring will use direct message communication between the channels.

5.2. RabbitMQ Configuration

To configure the example in section 3.1 to use the RabbitMQ binder, we need to update the application.yml located at src/main/resources:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit output: destination: queue.pretty.log.messages binder: local_rabbit binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host:  port: 5672 username:  password:  virtual-host: /

The input binding will use the exchange called queue.log.messages, and the output binding will use the exchange queue.pretty.log.messages. Both bindings will use the binder called local_rabbit.

Note that we don't need to create the RabbitMQ exchanges or queues in advance. When running the application, both exchanges are automatically created.

To test the application, we can use the RabbitMQ management site to publish a message. In the Publish Message panel of the exchange queue.log.messages, we need to enter the request in JSON format.

5.3. Customizing Message Conversion

Spring Cloud Stream allows us to apply message conversion for specific content types. In the above example, instead of using JSON format, we want to provide plain text.

To do this, we'll to apply a custom transformation to LogMessage using a MessageConverter:

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { //... @Bean public MessageConverter providesTextPlainMessageConverter() { return new TextPlainMessageConverter(); } //... }
public class TextPlainMessageConverter extends AbstractMessageConverter { public TextPlainMessageConverter() { super(new MimeType("text", "plain")); } @Override protected boolean supports(Class clazz) { return (LogMessage.class == clazz); } @Override protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { Object payload = message.getPayload(); String text = payload instanceof String ? (String) payload : new String((byte[]) payload); return new LogMessage(text); } }

After applying these changes, going back to the Publish Message panel, if we set the header “contentTypes” to “text/plain” and the payload to “Hello World“, it should work as before.

5.4. Consumer Groups

When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings..group property to specify a group name:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit group: logMessageConsumers ...

6. Message-Driven Microservices

In this section, we introduce all the required features for running our Spring Cloud Stream applications in a microservices context.

6.1. Scaling Up

When multiple applications are running, it's important to ensure the data is split properly across consumers. To do so, Spring Cloud Stream provides two properties:

  • spring.cloud.stream.instanceCount — number of running applications
  • spring.cloud.stream.instanceIndex — index of the current application

For example, if we've deployed two instances of the above MyLoggerServiceApplication application, the property spring.cloud.stream.instanceCount should be 2 for both applications, and the property spring.cloud.stream.instanceIndex should be 0 and 1 respectively.

These properties are automatically set if we deploy the Spring Cloud Stream applications using Spring Data Flow as described in this article.

6.2. Partitioning

The domain events could be Partitioned messages. This helps when we are scaling up the storage and improving application performance.

The domain event usually has a partition key so that it ends up in the same partition with related messages.

Let's say that we want the log messages to be partitioned by the first letter in the message, which would be the partition key, and grouped into two partitions.

There would be one partition for the log messages that start with A-M and another partition for N-Z. This can be configured using two properties:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression — the expression to partition the payloads
  • spring.cloud.stream.bindings.output.producer.partitionCount — the number of groups

Sometimes the expression to partition is too complex to write it in only one line. For these cases, we can write our custom partition strategy using the property spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Health Indicator

In a microservices context, we also need to detect when a service is down or starts failing. Spring Cloud Stream provides the property management.health.binders.enabled to enable the health indicators for binders.

When running the application, we can query the health status at //:/health.

7. Conclusion

In diesem Tutorial haben wir die Hauptkonzepte von Spring Cloud Stream vorgestellt und anhand einiger einfacher Beispiele über RabbitMQ gezeigt, wie man es verwendet. Weitere Informationen zu Spring Cloud Stream finden Sie hier.

Den Quellcode für diesen Artikel finden Sie auf GitHub.