Einführung in RSocket

1. Einleitung

In diesem Tutorial werfen wir einen ersten Blick auf RSocket und wie es die Client-Server-Kommunikation ermöglicht.

2. Was ist RSocket ?

RSocket ist ein binäres Punkt-zu-Punkt-Kommunikationsprotokoll zur Verwendung in verteilten Anwendungen. In diesem Sinne bietet es eine Alternative zu anderen Protokollen wie HTTP.

Ein vollständiger Vergleich zwischen RSocket und anderen Protokollen würde den Rahmen dieses Artikels sprengen. Stattdessen konzentrieren wir uns auf ein Hauptmerkmal von RSocket: seine Interaktionsmodelle.

RSocket bietet vier Interaktionsmodelle. In diesem Sinne werden wir jeden anhand eines Beispiels untersuchen.

3. Maven-Abhängigkeiten

RSocket benötigt für unsere Beispiele nur zwei direkte Abhängigkeiten:

 io.rsocket rsocket-core 0.11.13   io.rsocket rsocket-transport-netty 0.11.13 

Die Abhängigkeiten rsocket-core und rsocket-transport-netty sind in Maven Central verfügbar.

Ein wichtiger Hinweis ist, dass die RSocket-Bibliothek häufig reaktive Streams verwendet . Die Flux- und Mono- Klassen werden in diesem Artikel verwendet, daher ist ein grundlegendes Verständnis hilfreich.

4. Server-Setup

Zuerst erstellen wir die Serverklasse :

public class Server { private final Disposable server; public Server() { this.server = RSocketFactory.receive() .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl())) .transport(TcpServerTransport.create("localhost", TCP_PORT)) .start() .subscribe(); } public void dispose() { this.server.dispose(); } private class RSocketImpl extends AbstractRSocket {} }

Hier verwenden wir die RSocketFactory , um einen TCP-Socket einzurichten und abzuhören. Wir übergeben unser benutzerdefiniertes RSocketImpl , um Anfragen von Kunden zu bearbeiten. Wir werden dem RSocketImpl im Laufe der Zeit Methoden hinzufügen .

Um den Server zu starten, müssen wir ihn nur instanziieren:

Server server = new Server();

Eine einzelne Serverinstanz kann mehrere Verbindungen verarbeiten . Daher unterstützt nur eine Serverinstanz alle unsere Beispiele.

Wenn wir fertig sind, stoppt die dispose- Methode den Server und gibt den TCP-Port frei.

4. Interaktionsmodelle

4.1. Anfrage / Antwort

RSocket bietet ein Anforderungs- / Antwortmodell - jede Anforderung erhält eine einzelne Antwort.

Für dieses Modell erstellen wir einen einfachen Dienst, der eine Nachricht an den Client zurückgibt.

Beginnen wir mit dem Hinzufügen einer Methode zu unserer Erweiterung von AbstractRSocket, RSocketImpl :

@Override public Mono requestResponse(Payload payload) { try { return Mono.just(payload); // reflect the payload back to the sender } catch (Exception x) { return Mono.error(x); } }

Die requestResponse- Methode gibt für jede Anforderung ein einzelnes Ergebnis zurück , wie der Mono- Antworttyp zeigt.

Payload ist die Klasse, die Nachrichteninhalt und Metadaten enthält . Es wird von allen Interaktionsmodellen verwendet. Der Inhalt der Nutzdaten ist binär, es gibt jedoch praktische Methoden, die String- basierten Inhalt unterstützen.

Als nächstes können wir unsere Client-Klasse erstellen:

public class ReqResClient { private final RSocket socket; public ReqResClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } public String callBlocking(String string) { return socket .requestResponse(DefaultPayload.create(string)) .map(Payload::getDataUtf8) .block(); } public void dispose() { this.socket.dispose(); } }

Der Client verwendet die Methode RSocketFactory.connect () , um eine Socket-Verbindung mit dem Server herzustellen. Wir verwenden die requestResponse- Methode für den Socket, um eine Nutzlast an den Server zu senden .

Unsere Nutzdaten enthalten den an den Client übergebenen String . Wenn der MonoAntwort kommt an Wir können die Methode getDataUtf8 () verwenden, um auf den String- Inhalt der Antwort zuzugreifen .

Schließlich können wir den Integrationstest ausführen, um die Anforderung / Antwort in Aktion zu sehen. Wir senden einen String an den Server und überprüfen, ob derselbe String zurückgegeben wird:

@Test public void whenSendingAString_thenRevceiveTheSameString() { ReqResClient client = new ReqResClient(); String string = "Hello RSocket"; assertEquals(string, client.callBlocking(string)); client.dispose(); }

4.2. Feuer und vergessen

Beim Fire-and-Forget-Modell erhält der Client keine Antwort vom Server .

In diesem Beispiel sendet der Client simulierte Messungen in Intervallen von 50 ms an den Server. Der Server veröffentlicht die Messungen.

Fügen wir unserem Server in der RSocketImpl- Klasse einen Fire-and-Forget-Handler hinzu :

@Override public Mono fireAndForget(Payload payload) { try { dataPublisher.publish(payload); // forward the payload return Mono.empty(); } catch (Exception x) { return Mono.error(x); } }

Dieser Handler sieht dem Request / Response-Handler sehr ähnlich. Allerdings fire-and-forget gibt Mono statt Mono .

The dataPublisher is an instance of org.reactivestreams.Publisher. Thus, it makes the payload available to subscribers. We'll make use of that in the request/stream example.

Next, we'll create the fire-and-forget client:

public class FireNForgetClient { private final RSocket socket; private final List data; public FireNForgetClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } /** Send binary velocity (float) every 50ms */ public void sendData() { data = Collections.unmodifiableList(generateData()); Flux.interval(Duration.ofMillis(50)) .take(data.size()) .map(this::createFloatPayload) .flatMap(socket::fireAndForget) .blockLast(); } // ... }

The socket setup is exactly the same as before.

The sendData() method uses a Flux stream to send multiple messages. For each message, we invoke socket::fireAndForget.

We need to subscribe to the Mono response for each message. If we forget to subscribe then socket::fireAndForget will not execute.

The flatMap operator makes sure the Void responses are passed to the subscriber, while the blockLast operator acts as the subscriber.

We're going to wait until the next section to run the fire-and-forget test. At that point, we'll create a request/stream client to receive the data that was pushed by the fire-and-forget client.

4.3. Request/Stream

In the request/stream model, a single request may receive multiple responses. To see this in action we can build upon the fire-and-forget example. To do that, let's request a stream to retrieve the measurements we sent in the previous section.

As before, let's start by adding a new listener to the RSocketImpl on the server:

@Override public Flux requestStream(Payload payload) { return Flux.from(dataPublisher); }

The requestStream handler returns a Flux stream. As we recall from the previous section, the fireAndForget handler published incoming data to the dataPublisher. Now, we'll create a Flux stream using that same dataPublisher as the event source. By doing this the measurement data will flow asynchronously from our fire-and-forget client to our request/stream client.

Let's create the request/stream client next:

public class ReqStreamClient { private final RSocket socket; public ReqStreamClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); } public Flux getDataStream() { return socket .requestStream(DefaultPayload.create(DATA_STREAM_NAME)) .map(Payload::getData) .map(buf -> buf.getFloat()) .onErrorReturn(null); } public void dispose() { this.socket.dispose(); } }

We connect to the server in the same way as our previous clients.

In getDataStream()we use socket.requestStream() to receive a Flux stream from the server. From that stream, we extract the Float values from the binary data. Finally, the stream is returned to the caller, allowing the caller to subscribe to it and process the results.

Now let's test. We'll verify the round trip from fire-and-forget to request/stream.

We can assert that each value is received in the same order as it was sent. Then, we can assert that we receive the same number of values that were sent:

@Test public void whenSendingStream_thenReceiveTheSameStream() { FireNForgetClient fnfClient = new FireNForgetClient(); ReqStreamClient streamClient = new ReqStreamClient(); List data = fnfClient.getData(); List dataReceived = new ArrayList(); Disposable subscription = streamClient.getDataStream() .index() .subscribe( tuple -> { assertEquals("Wrong value", data.get(tuple.getT1().intValue()), tuple.getT2()); dataReceived.add(tuple.getT2()); }, err -> LOG.error(err.getMessage()) ); fnfClient.sendData(); // ... dispose client & subscription assertEquals("Wrong data count received", data.size(), dataReceived.size()); }

4.4. Channel

The channel model provides bidirectional communication. In this model, message streams flow asynchronously in both directions.

Let's create a simple game simulation to test this. In this game, each side of the channel will become a player. As the game runs, these players will send messages to the other side at random time intervals. The opposite side will react to the messages.

Firstly, we'll create the handler on the server. Like before, we add to the RSocketImpl:

@Override public Flux requestChannel(Publisher payloads) { Flux.from(payloads) .subscribe(gameController::processPayload); return Flux.from(gameController); }

The requestChannel handler has Payload streams for both input and output. The Publisher input parameter is a stream of payloads received from the client. As they arrive, these payloads are passed to the gameController::processPayload function.

In response, we return a different Flux stream back to the client. This stream is created from our gameController, which is also a Publisher.

Here is a summary of the GameController class:

public class GameController implements Publisher { @Override public void subscribe(Subscriber subscriber) { // send Payload messages to the subscriber at random intervals } public void processPayload(Payload payload) { // react to messages from the other player } }

When the GameController receives a subscriber it begins sending messages to that subscriber.

Next, let's create the client:

public class ChannelClient { private final RSocket socket; private final GameController gameController; public ChannelClient() { this.socket = RSocketFactory.connect() .transport(TcpClientTransport.create("localhost", TCP_PORT)) .start() .block(); this.gameController = new GameController("Client Player"); } public void playGame() { socket.requestChannel(Flux.from(gameController)) .doOnNext(gameController::processPayload) .blockLast(); } public void dispose() { this.socket.dispose(); } }

As we have seen in our previous examples, the client connects to the server in the same way as the other clients.

The client creates its own instance of the GameController.

We use socket.requestChannel() to send our Payload stream to the server. The server responds with a Payload stream of its own.

Als vom Server empfangene Nutzdaten übergeben wir sie an unseren gameController :: processPayload- Handler.

In unserer Spielsimulation sind Client und Server Spiegelbilder voneinander. Das heißt, jede Seite sendet einen Nutzlaststrom und empfängt einen Nutzlaststrom vom anderen Ende .

Die Streams werden unabhängig und ohne Synchronisierung ausgeführt.

Lassen Sie uns abschließend die Simulation in einem Test ausführen:

@Test public void whenRunningChannelGame_thenLogTheResults() { ChannelClient client = new ChannelClient(); client.playGame(); client.dispose(); }

5. Schlussfolgerung

In diesem Einführungsartikel haben wir die von RSocket bereitgestellten Interaktionsmodelle untersucht. Den vollständigen Quellcode der Beispiele finden Sie in unserem Github-Repository.

Besuchen Sie unbedingt die RSocket-Website, um eine eingehendere Diskussion zu erhalten. Insbesondere die FAQ- und Motivationsdokumente bieten einen guten Hintergrund.