Versand der RabbitMQ-Nachricht mit Spring AMQP

1. Einleitung

In diesem Tutorial werden wir das Konzept des Fanout und des Themenaustauschs mit Spring AMQP und RabbitMQ untersuchen.

Auf einem hohen Niveau, Fanout - Börsen wird übertragen die gleiche Nachricht an alle gebundenen Warteschlangen , während Thema Austausch eine Routing - Schlüssel verwenden , um Nachrichten an eine bestimmte Grenze Warteschlange oder Warteschlangen vorbei .

Vorheriges Lesen von Messaging With Spring AMQP wird für dieses Lernprogramm empfohlen.

2. Einrichten eines Fanout-Austauschs

Lassen Sie uns einen Fanout-Austausch mit zwei daran gebundenen Warteschlangen einrichten. Wenn wir eine Nachricht an diesen Austausch senden, erhalten beide Warteschlangen die Nachricht. Unser Fanout-Austausch ignoriert alle in der Nachricht enthaltenen Routing-Schlüssel.

Mit Spring AMQP können wir alle Deklarationen von Warteschlangen, Austauschen und Bindungen in einem Declarables- Objekt zusammenfassen:

@Bean public Declarables fanoutBindings() { Queue fanoutQueue1 = new Queue("fanout.queue1", false); Queue fanoutQueue2 = new Queue("fanout.queue2", false); FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange"); return new Declarables( fanoutQueue1, fanoutQueue2, fanoutExchange, bind(fanoutQueue1).to(fanoutExchange), BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)); }

3. Einrichten eines Themenaustauschs

Jetzt richten wir auch einen Themenaustausch mit zwei Warteschlangen mit jeweils unterschiedlichen Bindungsmustern ein:

@Bean public Declarables topicBindings() { Queue topicQueue1 = new Queue(topicQueue1Name, false); Queue topicQueue2 = new Queue(topicQueue2Name, false); TopicExchange topicExchange = new TopicExchange(topicExchangeName); return new Declarables( topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind(topicQueue1) .to(topicExchange).with("*.important.*"), BindingBuilder .bind(topicQueue2) .to(topicExchange).with("#.error")); }

Ein Themenaustausch ermöglicht es uns, Warteschlangen mit unterschiedlichen Schlüsselmustern daran zu binden. Dies ist sehr flexibel und ermöglicht es uns, mehrere Warteschlangen mit demselben Muster oder sogar mehrere Muster an dieselbe Warteschlange zu binden.

Wenn der Routing-Schlüssel der Nachricht mit dem Muster übereinstimmt, wird er in die Warteschlange gestellt. Wenn eine Warteschlange mehrere Bindungen hat, die mit dem Routing-Schlüssel der Nachricht übereinstimmen, wird nur eine Kopie der Nachricht in die Warteschlange gestellt.

Unsere Bindungsmuster können ein Sternchen ("*") verwenden, um ein Wort an einer bestimmten Position abzugleichen, oder ein Nummernzeichen ("#"), um null oder mehr Wörter abzugleichen.

So erhält unser topicQueue1 Nachrichten mit Routing-Schlüsseln mit einem Drei-Wort-Muster, wobei das mittlere Wort "wichtig" ist - zum Beispiel: "user.important.error" oder "blog.important.notification".

Und unser topicQueue2 empfängt Nachrichten, deren Routing-Schlüssel mit dem Wort error enden. Übereinstimmende Beispiele sind "error" , "user.important.error" oder "blog.post.save.error".

4. Einrichten eines Produzenten

Wir werden die convertAndSend- Methode der RabbitTemplate verwenden , um unsere Beispielnachrichten zu senden:

 String message = " payload is broadcast"; return args -> { rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic important warn" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "topic important error" + message); };

Das RabbitTemplate bietet viele überladene convertAndSend () -Methoden für verschiedene Austauschtypen.

Wenn wir eine Nachricht an eine Fanout-Vermittlungsstelle senden, wird der Routing-Schlüssel ignoriert und die Nachricht an alle gebundenen Warteschlangen übergeben.

Wenn wir eine Nachricht an den Themenaustausch senden, müssen wir einen Routing-Schlüssel übergeben. Basierend auf diesem Routing-Schlüssel wird die Nachricht an bestimmte Warteschlangen übermittelt.

5. Verbraucher konfigurieren

Lassen Sie uns abschließend vier Konsumenten einrichten - einen für jede Warteschlange -, um die produzierten Nachrichten aufzunehmen:

 @RabbitListener(queues = {FANOUT_QUEUE_1_NAME}) public void receiveMessageFromFanout1(String message) { System.out.println("Received fanout 1 message: " + message); } @RabbitListener(queues = {FANOUT_QUEUE_2_NAME}) public void receiveMessageFromFanout2(String message) { System.out.println("Received fanout 2 message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_1_NAME}) public void receiveMessageFromTopic1(String message) { System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_2_NAME}) public void receiveMessageFromTopic2(String message) { System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message); }

Wir konfigurieren Verbraucher mithilfe der Annotation @RabbitListener . Das einzige Argument, das hier übergeben wird, ist der Name der Warteschlangen. Den Verbrauchern sind hier keine Vermittlungs- oder Weiterleitungsschlüssel bekannt.

6. Ausführen des Beispiels

Unser Beispielprojekt ist eine Spring Boot-Anwendung. Daher wird die Anwendung zusammen mit einer Verbindung zu RabbitMQ initialisiert und alle Warteschlangen, Austausche und Bindungen eingerichtet.

Standardmäßig erwartet unsere Anwendung eine RabbitMQ-Instanz, die auf dem lokalen Host an Port 5672 ausgeführt wird. Wir können diese und andere Standardeinstellungen in application.yaml ändern .

Unser Projekt macht den HTTP-Endpunkt auf dem URI - / Broadcast - verfügbar, der POSTs mit einer Nachricht im Anforderungshauptteil akzeptiert.

Wenn wir eine Anfrage an diesen URI mit dem Body "Test" senden, sollten wir in der Ausgabe etwas Ähnliches sehen:

Received fanout 1 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important warn payload is broadcast Received topic 2 (#.error) message: topic important error payload is broadcast Received fanout 2 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important error payload is broadcast

Die Reihenfolge, in der wir diese Nachrichten sehen, ist natürlich nicht garantiert.

7. Fazit

In diesem kurzen Tutorial haben wir uns mit Fanout und Themenaustausch mit Spring AMQP und RabbitMQ befasst.

Der vollständige Quellcode und alle Codefragmente für dieses Tutorial sind im GitHub-Repository verfügbar.