Apache RocketMQ mit Spring Boot

1. Einleitung

In diesem Tutorial erstellen wir einen Nachrichtenproduzenten und -konsumenten mit Spring Boot und Apache RocketMQ, einer Open-Source-Plattform für verteilte Messaging- und Streaming-Daten.

2. Abhängigkeiten

Für Maven-Projekte müssen wir die RocketMQ Spring Boot Starter-Abhängigkeit hinzufügen:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Nachrichten erstellen

In unserem Beispiel erstellen wir einen grundlegenden Nachrichtenproduzenten, der Ereignisse sendet, wenn der Benutzer einen Artikel zum Warenkorb hinzufügt oder daraus entfernt.

Lassen Sie uns zunächst unseren Serverstandort und den Gruppennamen in unserer application.properties einrichten :

rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=cart-producer-group

Beachten Sie, dass wir, wenn wir mehr als einen Nameserver hätten, diese wie Host: Port; Host: Port auflisten könnten .

Um es einfach zu halten, erstellen wir jetzt eine CommandLineRunner- Anwendung und generieren beim Starten der Anwendung einige Ereignisse:

@SpringBootApplication public class CartEventProducer implements CommandLineRunner { @Autowired private RocketMQTemplate rocketMQTemplate; public static void main(String[] args) { SpringApplication.run(CartEventProducer.class, args); } public void run(String... args) throws Exception { rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1)); rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2)); rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); } }

Das CartItemEvent besteht aus nur zwei Eigenschaften - der ID des Artikels und einer Menge:

class CartItemEvent { private String itemId; private int quantity; // constructor, getters and setters }

Im obigen Beispiel verwenden wir die convertAndSend () -Methode, eine generische Methode, die von der abstrakten Klasse AbstractMessageSendingTemplate definiert wird , um unsere Warenkorbereignisse zu senden. Es werden zwei Parameter benötigt: Ein Ziel, in unserem Fall ein Themenname, und eine Nachrichtennutzlast.

4. Nachricht Verbraucher

Das Konsumieren von RocketMQ-Nachrichten ist so einfach wie das Erstellen einer mit @RocketMQMessageListener kommentierten Spring-Komponente und das Implementieren der RocketMQListener- Schnittstelle:

@SpringBootApplication public class CartEventConsumer { public static void main(String[] args) { SpringApplication.run(CartEventConsumer.class, args); } @Service @RocketMQMessageListener( topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic" ) public class CardItemAddConsumer implements RocketMQListener { public void onMessage(CartItemEvent addItemEvent) { log.info("Adding item: {}", addItemEvent); // additional logic } } @Service @RocketMQMessageListener( topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic" ) public class CardItemRemoveConsumer implements RocketMQListener { public void onMessage(CartItemEvent removeItemEvent) { log.info("Removing item: {}", removeItemEvent); // additional logic } } }

Wir müssen für jedes Nachrichtenthema, auf das wir hören, eine separate Komponente erstellen. In jedem dieser Listener definieren wir den Namen des Themas und den Namen der Verbrauchergruppe über die Annotation @ RocketMQMessageListener .

5. Synchrone und asynchrone Übertragung

In den vorherigen Beispielen haben wir die Methode convertAndSend verwendet, um unsere Nachrichten zu senden. Wir haben jedoch einige andere Möglichkeiten.

Wir könnten zum Beispiel syncSend aufrufen, das sich von convertAndSend unterscheidet, weil es das SendResult- Objekt zurückgibt .

Es kann zum Beispiel verwendet werden, um zu überprüfen, ob unsere Nachricht erfolgreich gesendet wurde, oder um ihre ID zu erhalten:

public void run(String... args) throws Exception { SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("bike", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("computer", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); }

Wie convertAndSend wird diese Methode nur zurückgegeben, wenn der Sendevorgang abgeschlossen ist.

Wir sollten die synchrone Übertragung in Fällen verwenden, die eine hohe Zuverlässigkeit erfordern, wie z. B. wichtige Benachrichtigungsnachrichten oder SMS-Benachrichtigungen.

Auf der anderen Seite möchten wir die Nachricht möglicherweise stattdessen asynchron senden und benachrichtigt werden, wenn der Versand abgeschlossen ist.

Wir können dies mit asyncSend tun , das einen SendCallback als Parameter verwendet und sofort zurückgibt:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.error("Successfully sent cart item"); } @Override public void onException(Throwable throwable) { log.error("Exception during cart item sending", throwable); } });

Wir verwenden die asynchrone Übertragung in Fällen, die einen hohen Durchsatz erfordern.

Schließlich können wir in Szenarien mit sehr hohen Durchsatzanforderungen sendOneWay anstelle von asyncSend verwenden . sendOneWay unterscheidet sich von asyncSend darin, dass es nicht garantiert, dass die Nachricht gesendet wird.

Die Einwegübertragung kann auch für gewöhnliche Zuverlässigkeitsfälle verwendet werden, z. B. zum Sammeln von Protokollen.

6. Senden von Nachrichten in der Transaktion

RocketMQ bietet uns die Möglichkeit, Nachrichten innerhalb einer Transaktion zu senden. Wir können dies mit der sendInTransaction () -Methode tun :

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build(); rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

Außerdem müssen wir eine RocketMQLocalTransactionListener- Schnittstelle implementieren :

@RocketMQTransactionListener(txProducerGroup="test-transaction") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.COMMIT; } }

In sendMessageInTransaction () ist der erste Parameter der Transaktionsname. Es muss das gleiche wie das sein @RocketMQTransactionListener ‚s Mitgliedsfeld txProducerGroup.

7. Konfiguration des Nachrichtenproduzenten

Wir können auch Aspekte des Nachrichtenproduzenten selbst konfigurieren:

  • rocketmq.producer.send-message-timeout : Das Zeitlimit für das Senden von Nachrichten in Millisekunden - der Standardwert ist 3000
  • rocketmq.producer.compress-message-body-Schwelle : Schwellenwert, oberhalb dessen RocketMQ Nachrichten komprimiert - der Standardwert ist 1024.
  • rocketmq.producer.max-message-size : Die maximale Nachrichtengröße in Bytes - der Standardwert ist 4096.
  • rocketmq.producer.retry-times-when-send-async-failed : Die maximale Anzahl von Wiederholungsversuchen, die intern im asynchronen Modus ausgeführt werden müssen, bevor ein Fehler fehlgeschlagen ist. Der Standardwert ist 2.
  • rocketmq.producer.retry-next-server : Gibt an, ob ein anderer Broker erneut versucht werden soll, wenn ein Fehler intern gesendet wird . Der Standardwert ist false .
  • rocketmq.producer.retry-times-when-send-failed : Die maximale Anzahl von Wiederholungsversuchen, die intern im asynchronen Modus ausgeführt werden sollen, bevor ein Fehler fehlgeschlagen ist. Der Standardwert ist 2.

8. Fazit

In diesem Artikel haben wir gelernt, wie Sie Nachrichten mit Apache RocketMQ und Spring Boot senden und verarbeiten. Wie immer ist der gesamte Quellcode auf GitHub verfügbar.