Zuverlässiges Messaging mit JGroups

1. Übersicht

JGroups ist eine Java-API für den zuverlässigen Nachrichtenaustausch. Es verfügt über eine einfache Oberfläche, die Folgendes bietet:

  • Ein flexibler Protokollstapel, einschließlich TCP und UDP
  • Fragmentierung und Wiederzusammenstellung großer Nachrichten
  • zuverlässiges Unicast und Multicast
  • Fehlererkennung
  • Ablaufsteuerung

Sowie viele andere Funktionen.

In diesem Lernprogramm erstellen wir eine einfache Anwendung zum Austauschen von String- Nachrichten zwischen Anwendungen und zum Bereitstellen des freigegebenen Status für neue Anwendungen, wenn diese dem Netzwerk beitreten.

2. Setup

2.1. Maven-Abhängigkeit

Wir müssen unserer pom.xml eine einzige Abhängigkeit hinzufügen :

 org.jgroups jgroups 4.0.10.Final  

Die neueste Version der Bibliothek kann in Maven Central überprüft werden.

2.2. Vernetzung

JGroups versucht standardmäßig, IPV6 zu verwenden. Abhängig von unserer Systemkonfiguration kann dies dazu führen, dass Anwendungen nicht kommunizieren können.

Um dies zu vermeiden, setzen wir den java.net.preferIPv4Stack auf true, wenn wir unsere Anwendungen hier ausführen :

java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger 

3. JChannels

Unsere Verbindung zu einem JGroups-Netzwerk ist ein JChannel. Der Kanal tritt einem Cluster bei und sendet und empfängt Nachrichten sowie Informationen zum Status des Netzwerks.

3.1. Einen Kanal erstellen

Wir erstellen einen JChannel mit einem Pfad zu einer Konfigurationsdatei. Wenn wir den Dateinamen weglassen, wird im aktuellen Arbeitsverzeichnis nach udp.xml gesucht .

Wir erstellen einen Kanal mit einer explizit benannten Konfigurationsdatei:

JChannel channel = new JChannel("src/main/resources/udp.xml"); 

Die Konfiguration von JGroups kann sehr kompliziert sein, aber die Standardkonfigurationen für UDP und TCP sind für die meisten Anwendungen ausreichend. Wir haben die Datei für UDP in unseren Code aufgenommen und werden sie für dieses Tutorial verwenden.

Weitere Informationen zum Konfigurieren des Transports finden Sie im JGroups-Handbuch hier.

3.2. Kanal anschließen

Nachdem wir unseren Kanal erstellt haben, müssen wir einem Cluster beitreten. Ein Cluster ist eine Gruppe von Knoten, die Nachrichten austauschen.

Für den Beitritt zu einem Cluster ist ein Clustername erforderlich:

channel.connect("Baeldung"); 

Der erste Knoten, der versucht, einem Cluster beizutreten, erstellt ihn, wenn er nicht vorhanden ist. Wir werden diesen Prozess unten in Aktion sehen.

3.3. Einen Kanal benennen

Knoten werden durch einen Namen identifiziert, sodass Peers gerichtete Nachrichten senden und Benachrichtigungen darüber erhalten können, wer den Cluster betritt und verlässt. JGroups vergeben automatisch einen Namen, oder wir können unseren eigenen festlegen:

channel.name("user1");

Wir werden diese Namen unten verwenden, um zu verfolgen, wann Knoten den Cluster betreten und verlassen.

3.4. Kanal schließen

Die Kanalbereinigung ist wichtig, wenn Peers rechtzeitig benachrichtigt werden sollen, dass wir beendet wurden.

Wir schließen einen JChannel mit seiner Close-Methode:

channel.close()

4. Änderungen der Clusteransicht

Mit einem erstellten JChannel können wir nun den Status von Peers im Cluster anzeigen und Nachrichten mit ihnen austauschen.

JGroups behält den Clusterstatus innerhalb der View- Klasse bei. Jeder Kanal hat eine einzelne Ansicht des Netzwerks. Wenn sich die Ansicht ändert, wird sie über den Rückruf viewAccepted () übermittelt .

In diesem Lernprogramm erweitern wir die ReceiverAdaptor- API-Klasse, die alle für eine Anwendung erforderlichen Schnittstellenmethoden implementiert.

Dies ist die empfohlene Methode zum Implementieren von Rückrufen.

Fügen wir viewAccepted zu unserer Anwendung:

public void viewAccepted(View newView) { private View lastView; if (lastView == null) { System.out.println("Received initial view:"); newView.forEach(System.out::println); } else { System.out.println("Received new view."); List newMembers = View.newMembers(lastView, newView); System.out.println("New members: "); newMembers.forEach(System.out::println); List exMembers = View.leftMembers(lastView, newView); System.out.println("Exited members:"); exMembers.forEach(System.out::println); } lastView = newView; } 

Jede Ansicht enthält eine Liste von Adressobjekten , die jedes Mitglied des Clusters darstellen. JGroups bietet bequeme Methoden zum Vergleichen einer Ansicht mit einer anderen, mit denen wir neue oder verlassene Mitglieder des Clusters erkennen.

5. Nachrichten senden

Die Nachrichtenverarbeitung in JGroups ist unkompliziert. Eine Nachricht enthält ein Byte- Array und Adressobjekte , die dem Sender und dem Empfänger entsprechen.

In diesem Tutorial verwenden wir Strings, die über die Befehlszeile gelesen werden. Es ist jedoch leicht zu erkennen, wie eine Anwendung andere Datentypen austauschen kann.

5.1. Broadcast Messages

A Message is created with a destination and a byte array; JChannel sets the sender for us. If the target is null, the entire cluster will receive the message.

We'll accept text from the command line and send it to the cluster:

System.out.print("Enter a message: "); String line = in.readLine().toLowerCase(); Message message = new Message(null, line.getBytes()); channel.send(message); 

If we run multiple instances of our program and send this message (after we implement the receive() method below), all of them would receive it, including the sender.

5.2. Blocking Our Messages

If we don't want to see our messages, we can set a property for that:

channel.setDiscardOwnMessages(true); 

When we run the previous test, the message sender does not receive its broadcast message.

5.3. Direct Messages

Sending a direct message requires a valid Address. If we're referring to nodes by name, we need a way to look up an Address. Fortunately, we have the View for that.

The current View is always available from the JChannel:

private Optional getAddress(String name) { View view = channel.view(); return view.getMembers().stream() .filter(address -> name.equals(address.toString())) .findAny(); } 

Address names are available via the class toString() method, so we merely search the List of cluster members for the name we want.

So we can accept a name on from the console, find the associated destination, and send a direct message:

Address destination = null; System.out.print("Enter a destination: "); String destinationName = in.readLine().toLowerCase(); destination = getAddress(destinationName) .orElseThrow(() -> new Exception("Destination not found"); Message message = new Message(destination, "Hi there!"); channel.send(message); 

6. Receiving Messages

We can send messages, now let's add try to receive them now.

Let's override ReceiverAdaptor's empty receive method:

public void receive(Message message) { String line = Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); } 

Since we know the message contains a String, we can safely pass getObject() to System.out.

7. State Exchange

When a node enters the network, it may need to retrieve state information about the cluster. JGroups provides a state transfer mechanism for this.

When a node joins the cluster, it simply calls getState(). The cluster usually retrieves the state from the oldest member in the group – the coordinator.

Let's add a broadcast message count to our application. We'll add a new member variable and increment it inside receive():

private Integer messageCount = 0; public void receive(Message message) { String line = "Message received from: " + message.getSrc() + " to: " + message.getDest() + " -> " + message.getObject(); System.out.println(line); if (message.getDest() == null) { messageCount++; System.out.println("Message count: " + messageCount); } } 

We check for a null destination because if we count direct messages, each node will have a different number.

Next, we override two more methods in ReceiverAdaptor:

public void setState(InputStream input) { try { messageCount = Util.objectFromStream(new DataInputStream(input)); } catch (Exception e) { System.out.println("Error deserialing state!"); } System.out.println(messageCount + " is the current messagecount."); } public void getState(OutputStream output) throws Exception { Util.objectToStream(messageCount, new DataOutputStream(output)); } 

Similar to messages, JGroups transfers state as an array of bytes.

JGroups supplies an InputStream to the coordinator to write the state to, and an OutputStream for the new node to read. The API provides convenience classes for serializing and deserializing the data.

Note that in production code access to state information must be thread-safe.

Finally, we add the call to getState() to our startup, after we connect to the cluster:

channel.connect(clusterName); channel.getState(null, 0); 

getState() accepts a destination from which to request the state and a timeout in milliseconds. A null destination indicates the coordinator and 0 means do not timeout.

Wenn wir diese App mit zwei Knoten ausführen und Broadcast-Nachrichten austauschen, wird die Anzahl der Nachrichten erhöht.

Wenn wir dann einen dritten Client hinzufügen oder einen von ihnen stoppen und starten, wird der neu verbundene Knoten die richtige Anzahl von Nachrichten drucken.

8. Fazit

In diesem Tutorial haben wir JGroups verwendet, um eine Anwendung zum Austausch von Nachrichten zu erstellen. Wir haben die API verwendet, um zu überwachen, welche Knoten mit dem Cluster verbunden sind und diesen verlassen haben, und um den Clusterstatus beim Beitritt auf einen neuen Knoten zu übertragen.

Codebeispiele finden Sie wie immer auf GitHub.