MQTT-Client in Java

1. Übersicht

In diesem Tutorial erfahren Sie, wie Sie MQTT-Nachrichten in einem Java-Projekt mithilfe der vom Eclipse Paho-Projekt bereitgestellten Bibliotheken hinzufügen können.

2. MQTT-Primer

MQTT (MQ Telemetry Transport) ist ein Messaging-Protokoll , das entwickelt wurde, um die Notwendigkeit einer einfachen und einfachen Methode zum Übertragen von Daten zu / von Geräten mit geringem Stromverbrauch zu erfüllen, wie sie beispielsweise in industriellen Anwendungen verwendet werden.

Mit der zunehmenden Beliebtheit von IoT-Geräten (Internet of Things) wurde MQTT zunehmend eingesetzt, was zu einer Standardisierung durch OASIS und ISO führte.

Das Protokoll unterstützt ein einzelnes Nachrichtenmuster, nämlich das Publish-Subscribe-Muster: Jede von einem Client gesendete Nachricht enthält ein zugehöriges „Thema“, das vom Broker verwendet wird, um es an abonnierte Clients weiterzuleiten. Themennamen können einfache Zeichenfolgen wie „ oiltemp “ oder eine pfadartige Zeichenfolge „ motor / 1 / rpm “ sein.

Um Nachrichten zu empfangen, abonniert ein Client ein oder mehrere Themen mit seinem genauen Namen oder einer Zeichenfolge, die einen der unterstützten Platzhalter enthält ("#" für mehrstufige Themen und "+" für einstufige Themen).

3. Projekteinrichtung

Um die Paho-Bibliothek in ein Maven-Projekt aufzunehmen, müssen wir die folgende Abhängigkeit hinzufügen:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

Die neueste Version des Eclipse Paho Java-Bibliotheksmoduls kann von Maven Central heruntergeladen werden.

4. Client-Setup

Wenn Sie die Paho-Bibliothek verwenden, müssen Sie zunächst eine Implementierung der IMqttClient- Schnittstelle erhalten, um Nachrichten von einem MQTT-Broker zu senden und / oder zu empfangen . Diese Schnittstelle enthält alle Methoden, die eine Anwendung benötigt, um eine Verbindung zum Server herzustellen, Nachrichten zu senden und zu empfangen.

Paho wird mit zwei Implementierungen dieser Schnittstelle ausgeliefert, einer asynchronen ( MqttAsyncClient ) und einer synchronen ( MqttClient ).In unserem Fall konzentrieren wir uns auf die synchrone Version mit einfacherer Semantik.

Das Setup selbst besteht aus zwei Schritten: Wir erstellen zuerst eine Instanz der MqttClient- Klasse und verbinden sie dann mit unserem Server. Der folgende Unterabschnitt beschreibt diese Schritte.

4.1. Erstellen einer neuen IMqttClient- Instanz

Das folgende Codefragment zeigt, wie eine neue synchrone IMqttClient- Instanz erstellt wird:

String publisherId = UUID.randomUUID().toString(); IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

In diesem Fall verwenden wir den einfachsten verfügbaren Konstruktor, der die Endpunktadresse unseres MQTT-Brokers und eine Client-ID verwendet , die unseren Client eindeutig identifiziert.

In unserem Fall haben wir eine zufällige UUID verwendet, sodass bei jedem Lauf eine neue Client-ID generiert wird.

Paho bietet auch zusätzliche Konstruktoren, die wir verwenden können, um den Persistenzmechanismus zum Speichern nicht bestätigter Nachrichten und / oder den ScheduledExecutorService anzupassen, der zum Ausführen von Hintergrundaufgaben verwendet wird, die für die Implementierung der Protokoll-Engine erforderlich sind.

Der von uns verwendete Serverendpunkt ist ein öffentlicher MQTT-Broker, der vom Paho-Projekt gehostet wird und es jedem mit einer Internetverbindung ermöglicht, Clients zu testen, ohne dass eine Authentifizierung erforderlich ist.

4.2. Verbinde mit dem Server

Unsere neu erstellte MqttClient- Instanz ist nicht mit dem Server verbunden. Dazu rufen wir die Methode connect () auf und übergeben optional eine MqttConnectOptions- Instanz, mit der wir einige Aspekte des Protokolls anpassen können.

Insbesondere können wir diese Optionen verwenden, um zusätzliche Informationen wie Sicherheitsanmeldeinformationen, Sitzungswiederherstellungsmodus, Wiederverbindungsmodus usw. zu übergeben.

Die MqttConnectionOptions- Klasse macht diese Optionen als einfache Eigenschaften verfügbar , die wir mit normalen Setter-Methoden festlegen können. Wir müssen nur die für unser Szenario erforderlichen Eigenschaften festlegen - die übrigen nehmen Standardwerte an.

Der Code zum Herstellen einer Verbindung zum Server sieht normalerweise folgendermaßen aus:

MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(true); options.setConnectionTimeout(10); publisher.connect(options);

Hier definieren wir unsere Verbindungsoptionen so, dass:

  • Die Bibliothek versucht bei einem Netzwerkfehler automatisch, die Verbindung zum Server wiederherzustellen
  • Nicht gesendete Nachrichten aus einem vorherigen Lauf werden verworfen
  • Das Verbindungszeitlimit ist auf 10 Sekunden eingestellt

5. Nachrichten senden

Das Senden von Nachrichten mit einem bereits verbundenen MqttClient ist sehr einfach. Wir verwenden eine der Publish () -Methodenvarianten, um die Nutzdaten, bei denen es sich immer um ein Byte-Array handelt , mit einer der folgenden Quality-of-Service-Optionen an ein bestimmtes Thema zu senden :

  • 0 - "höchstens einmal" -Semantik, auch bekannt als "Feuer und Vergessen". Verwenden Sie diese Option, wenn der Verlust von Nachrichten akzeptabel ist, da keine Bestätigung oder Persistenz erforderlich ist
  • 1 - "mindestens einmal" Semantik. Verwenden Sie diese Option, wenn der Verlust von Nachrichten nicht akzeptabel ist und Ihre Abonnenten Duplikate verarbeiten können
  • 2 - "genau einmal" Semantik. Verwenden Sie diese Option, wenn der Verlust von Nachrichten nicht akzeptabel ist und Ihre Abonnenten keine Duplikate verarbeiten können

In unserem Beispielprojekt spielt die EngineTemperatureSensor- Klasse die Rolle eines Scheinsensors , der jedes Mal, wenn wir seine call () -Methode aufrufen, einen neuen Temperaturmesswert erzeugt .

Diese Klasse implementiert die Callable- Schnittstelle, sodass wir sie problemlos mit einer der ExecutorService- Implementierungen verwenden können, die im Paket java.util.concurrent verfügbar sind :

public class EngineTemperatureSensor implements Callable { // ... private members omitted public EngineTemperatureSensor(IMqttClient client) { this.client = client; } @Override public Void call() throws Exception { if ( !client.isConnected()) { return null; } MqttMessage msg = readEngineTemp(); msg.setQos(0); msg.setRetained(true); client.publish(TOPIC,msg); return null; } private MqttMessage readEngineTemp() { double temp = 80 + rnd.nextDouble() * 20.0; byte[] payload = String.format("T:%04.2f",temp) .getBytes(); return new MqttMessage(payload); } }

Die MqttMessage kapselt die Nutzdaten selbst, die angeforderte Dienstgüte und auch das beibehaltene Flag für die Nachricht. Dieses Flag zeigt dem Broker an, dass er diese Nachricht behalten soll, bis sie von einem Abonnenten verwendet wird.

We can use this feature to implement a “last known good” behavior, so when a new subscriber connects to the server, it will receive the retained message right away.

6. Receiving Messages

In order to receive messages from the MQTT broker, we need to use one of the subscribe() method variants, which allow us to specify:

  • One or more topic filters for messages we want to receive
  • The associated QoS
  • The callback handler to process received messages

In the following example, we show how to add a message listener to an existing IMqttClient instance to receive messages from a given topic. We use a CountDownLatch as a synchronization mechanism between our callback and the main execution thread, decrementing it every time a new message arrives.

In the sample code, we've used a different IMqttClient instance to receive messages. We did it just to make more clear which client does what, but this is not a Paho limitation – if you want, you can use the same client for publishing and receiving messages:

CountDownLatch receivedSignal = new CountDownLatch(10); subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { byte[] payload = msg.getPayload(); // ... payload handling omitted receivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES);

The subscribe() variant used above takes an IMqttMessageListener instance as its second argument.

In our case, we use a simple lambda function that processes the payload and decrements a counter. If not enough messages arrive in the specified time window (1 minute), the await() method will throw an exception.

When using Paho, we don't need to explicitly acknowledge message receipt. If the callback returns normally, Paho assumes it a successful consumption and sends an acknowledgment to the server.

If the callback throws an Exception, the client will be shut down. Please note that this will result in loss of any messages sent with QoS level of 0.

Messages sent with QoS level 1 or 2 will be resent by the server once the client is reconnected and subscribes to the topic again.

7. Conclusion

In this article, we demonstrated how we can add support for the MQTT protocol in our Java applications using the library provided by the Eclipse Paho project.

Diese Bibliothek verarbeitet alle Protokolldetails auf niedriger Ebene, sodass wir uns auf andere Aspekte unserer Lösung konzentrieren können und gleichzeitig ausreichend Platz für die Anpassung wichtiger Aspekte der internen Funktionen wie der Nachrichtenpersistenz bleibt.

Der in diesem Artikel gezeigte Code ist auf GitHub verfügbar.