PubSub Messaging mit Spring Data Redis

1. Übersicht

In diesem zweiten Artikel aus der Serie, in der Spring Data Redis untersucht wird, werden die Warteschlangen für Pubs / Subnachrichten beschrieben.

In Redis sind Publisher nicht so programmiert, dass sie ihre Nachrichten an bestimmte Abonnenten senden. Veröffentlichte Nachrichten werden vielmehr in Kanäle unterteilt, ohne zu wissen, welche (wenn überhaupt) Abonnenten es geben kann.

Ebenso zeigen Abonnenten Interesse an einem oder mehreren Themen und erhalten nur Nachrichten, die von Interesse sind, ohne zu wissen, welche (wenn überhaupt) Verlage es gibt.

Diese Entkopplung von Publishern und Abonnenten kann eine größere Skalierbarkeit und eine dynamischere Netzwerktopologie ermöglichen.

2. Redis Konfiguration

Beginnen wir mit dem Hinzufügen der Konfiguration, die für die Nachrichtenwarteschlangen erforderlich ist.

Zunächst definieren wir eine MessageListenerAdapter- Bean, die eine benutzerdefinierte Implementierung der MessageListener- Schnittstelle mit dem Namen RedisMessageSubscriber enthält . Diese Bean fungiert als Abonnent im Pub-Sub-Messaging-Modell:

@Bean MessageListenerAdapter messageListener() { return new MessageListenerAdapter(new RedisMessageSubscriber()); }

RedisMessageListenerContainer ist eine von Spring Data Redis bereitgestellte Klasse, die Redis-Nachrichten-Listenern asynchrones Verhalten bietet. Dies wird intern aufgerufen und behandelt laut der Spring Data Redis-Dokumentation „die Details auf niedriger Ebene beim Abhören, Konvertieren und Versenden von Nachrichten“.

@Bean RedisMessageListenerContainer redisContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(jedisConnectionFactory()); container.addMessageListener(messageListener(), topic()); return container; }

Wir werden auch eine Bean mithilfe einer benutzerdefinierten MessagePublisher- Schnittstelle und einer RedisMessagePublisher- Implementierung erstellen . Auf diese Weise können wir eine generische Nachrichtenveröffentlichungs-API haben und die Redis-Implementierung eine redisTemplate und ein Thema als Konstruktorargumente verwenden lassen:

@Bean MessagePublisher redisPublisher() { return new RedisMessagePublisher(redisTemplate(), topic()); }

Schließlich richten wir ein Thema ein, an das der Herausgeber Nachrichten sendet und der Abonnent sie erhält:

@Bean ChannelTopic topic() { return new ChannelTopic("messageQueue"); }

3. Nachrichten veröffentlichen

3.1. Definieren der MessagePublisher- Schnittstelle

Spring Data Redis bietet keine MessagePublisher- Schnittstelle für die Nachrichtenverteilung. Wir können eine benutzerdefinierte Schnittstelle definieren, die redisTemplate in der Implementierung verwendet:

public interface MessagePublisher { void publish(String message); }

3.2. RedisMessagePublisher- Implementierung

Unser nächster Schritt besteht darin, eine Implementierung der MessagePublisher- Schnittstelle bereitzustellen, Details zur Nachrichtenveröffentlichung hinzuzufügen und die Funktionen in redisTemplate zu verwenden.

Die Vorlage enthält eine Vielzahl von Funktionen für eine Vielzahl von Vorgängen, von denen convertAndSend eine Nachricht über ein Thema an eine Warteschlange senden kann:

public class RedisMessagePublisher implements MessagePublisher { @Autowired private RedisTemplate redisTemplate; @Autowired private ChannelTopic topic; public RedisMessagePublisher() { } public RedisMessagePublisher( RedisTemplate redisTemplate, ChannelTopic topic) { this.redisTemplate = redisTemplate; this.topic = topic; } public void publish(String message) { redisTemplate.convertAndSend(topic.getTopic(), message); } } 

Wie Sie sehen, ist die Publisher-Implementierung unkompliziert. Es verwendet die convertAndSend () -Methode des redisTemplate , um die angegebene Nachricht zu formatieren und im konfigurierten Thema zu veröffentlichen.

Ein Thema implementiert die Semantik zum Veröffentlichen und Abonnieren: Wenn eine Nachricht veröffentlicht wird, geht sie an alle Abonnenten, die registriert sind, um dieses Thema anzuhören.

4. Nachrichten abonnieren

RedisMessageSubscriber implementiert die von Spring Data Redis bereitgestellte MessageListener- Schnittstelle:

@Service public class RedisMessageSubscriber implements MessageListener { public static List messageList = new ArrayList(); public void onMessage(Message message, byte[] pattern) { messageList.add(message.toString()); System.out.println("Message received: " + message.toString()); } }

Beachten Sie, dass es einen zweiten Parameter namens pattern gibt , den wir in diesem Beispiel nicht verwendet haben. In der Spring Data Redis-Dokumentation heißt es, dass dieser Parameter das "Muster, das dem Kanal entspricht (falls angegeben)" darstellt, aber null sein kann .

5. Senden und Empfangen von Nachrichten

Jetzt werden wir alles zusammenfügen. Erstellen wir eine Nachricht und veröffentlichen sie anschließend mit dem RedisMessagePublisher :

String message = "Message " + UUID.randomUUID(); redisMessagePublisher.publish(message);

Wenn wir " Publish" (Nachricht) aufrufen , wird der Inhalt an Redis gesendet, wo er an das in unserem Publisher definierte Thema der Nachrichtenwarteschlange weitergeleitet wird. Dann wird es an die Abonnenten dieses Themas verteilt.

Möglicherweise haben Sie bereits bemerkt, dass RedisMessageSubscriber ein Listener ist, der sich zum Abrufen von Nachrichten in der Warteschlange registriert.

Beim Eintreffen der Nachricht wird die definierte onMessage () -Methode des Teilnehmers ausgelöst.

In unserem Beispiel können wir überprüfen, ob wir Nachrichten erhalten haben , die durch die Überprüfung des veröffentlicht wurde Message in unserem RedisMessageSubscriber :

RedisMessageSubscriber.messageList.get(0).contains(message) 

6. Fazit

In diesem Artikel haben wir eine Implementierung der Pub / Sub-Nachrichtenwarteschlange mithilfe von Spring Data Redis untersucht.

Die Implementierung des obigen Beispiels finden Sie in einem GitHub-Projekt.