WebSockets mit dem Play Framework und Akka

1. Übersicht

Wenn wir möchten, dass unsere Webclients einen Dialog mit unserem Server führen, können WebSockets eine nützliche Lösung sein. WebSockets halten eine dauerhafte Vollduplexverbindung aufrecht. Dies gibt uns die Möglichkeit, bidirektionale Nachrichten zwischen unserem Server und Client zu senden.

In diesem Tutorial erfahren Sie, wie Sie WebSockets mit Akka im Play Framework verwenden.

2. Setup

Lassen Sie uns eine einfache Chat-Anwendung einrichten. Der Benutzer sendet Nachrichten an den Server und der Server antwortet mit einer Nachricht von JSONPlaceholder.

2.1. Einrichten der Play Framework-Anwendung

Wir werden diese Anwendung mit dem Play Framework erstellen.

Befolgen Sie die Anweisungen in Einführung in Play in Java, um eine einfache Play Framework-Anwendung einzurichten und auszuführen.

2.2. Hinzufügen der erforderlichen JavaScript-Dateien

Außerdem müssen wir für clientseitiges Scripting mit JavaScript arbeiten. Auf diese Weise können wir neue Nachrichten vom Server empfangen. Wir werden dafür die jQuery-Bibliothek verwenden.

Fügen wir jQuery am Ende der Datei app / views / i ndex.scala.html hinzu :

2.3. Akka einrichten

Schließlich werden wir Akka verwenden, um die WebSocket-Verbindungen auf der Serverseite zu verwalten.

Navigieren wir zur Datei build.sbt und fügen die Abhängigkeiten hinzu.

Wir müssen die Abhängigkeiten von Akka-Actor und Akka-Testkit hinzufügen :

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

Wir benötigen diese, um den Akka Framework-Code verwenden und testen zu können.

Als nächstes werden wir Akka-Streams verwenden. Fügen wir also die Akka-Stream- Abhängigkeit hinzu:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

Zuletzt müssen wir einen Ruheendpunkt von einem Akka-Schauspieler aufrufen. Dafür benötigen wir die akka-http- Abhängigkeit. Wenn wir dies tun, gibt der Endpunkt JSON-Daten zurück, die wir deserialisieren müssen. Daher müssen wir auch die akka-http-jackson- Abhängigkeit hinzufügen :

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

Und jetzt sind wir fertig. Mal sehen, wie WebSockets funktionieren!

3. Umgang mit WebSockets mit Akka-Akteuren

Der WebSocket-Handhabungsmechanismus von Play basiert auf Akka-Streams. Ein WebSocket wird als Flow modelliert. Eingehende WebSocket-Nachrichten werden also in den Flow eingespeist, und vom Flow erzeugte Nachrichten werden an den Client gesendet.

Um ein WebSocket mit einem Actor verarbeiten zu können, benötigen wir das Play-Dienstprogramm ActorFlow , das ein ActorRef in einen Flow konvertiert . Dies erfordert hauptsächlich Java-Code mit ein wenig Konfiguration.

3.1. Die WebSocket Controller-Methode

Zunächst benötigen wir eine Materializer- Instanz. Der Materializer ist eine Factory für Stream Execution Engines.

Wir müssen das ActorSystem und den Materializer in die Controller- App / controller / HomeController.java einfügen :

private ActorSystem actorSystem; private Materializer materializer; @Inject public HomeController( ActorSystem actorSystem, Materializer materializer) { this.actorSystem = actorSystem; this.materializer = materializer; }

Fügen wir nun eine Socket-Controller-Methode hinzu:

public WebSocket socket() { return WebSocket.Json .acceptOrResult(this::createActorFlow); }

Hier rufen wir die Funktion acceptOrResult auf , die den Anforderungsheader verwendet und eine Zukunft zurückgibt. Die zurückgegebene Zukunft ist ein Ablauf für die Verarbeitung der WebSocket-Nachrichten.

Wir können stattdessen die Anfrage ablehnen und ein Ablehnungsergebnis zurückgeben.

Nun erstellen wir den Flow:

private CompletionStage
    
     > createActorFlow(Http.RequestHeader request) { return CompletableFuture.completedFuture( F.Either.Right(createFlowForActor())); }
    

Die F- Klasse in Play Framework definiert eine Reihe von Hilfsprogrammen für den funktionalen Programmierstil. In diesem Fall verwenden wir F. Entweder. Recht , um die Verbindung zu akzeptieren und den Fluss zurückzugeben.

Angenommen, wir wollten die Verbindung ablehnen, wenn der Client nicht authentifiziert ist.

Dazu könnten wir prüfen, ob in der Sitzung ein Benutzername festgelegt ist. Wenn dies nicht der Fall ist, lehnen wir die Verbindung mit HTTP 403 ab. Verboten:

private CompletionStage
    
     > createActorFlow2(Http.RequestHeader request) { return CompletableFuture.completedFuture( request.session() .getOptional("username") .map(username -> F.Either.
     
      Right( createFlowForActor())) .orElseGet(() -> F.Either.Left(forbidden()))); }
     
    

Wir verwenden F.Either.Left , um die Verbindung auf die gleiche Weise abzulehnen, wie wir einen Flow mit F.Either.Right bereitstellen .

Schließlich verknüpfen wir den Flow mit dem Akteur, der die Nachrichten verarbeitet:

private Flow createFlowForActor() { return ActorFlow.actorRef(out -> Messenger.props(out), actorSystem, materializer); }

The ActorFlow.actorRef creates a flow that is handled by the Messenger actor.

3.2. The routes File

Now, let's add the routes definitions for the controller methods in conf/routes:

GET / controllers.HomeController.index(request: Request) GET /chat controllers.HomeController.socket GET /chat/with/streams controllers.HomeController.akkaStreamsSocket GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)

These route definitions map incoming HTTP requests to controller action methods as explained in Routing in Play Applications in Java.

3.3. The Actor Implementation

The most important part of the actor class is the createReceive method which determines which messages the actor can handle:

@Override public Receive createReceive() { return receiveBuilder() .match(JsonNode.class, this::onSendMessage) .matchAny(o -> log.error("Received unknown message: {}", o.getClass())) .build(); }

The actor will forward all messages matching the JsonNode class to the onSendMessage handler method:

private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); //.. processMessage(requestDTO); }

Then the handler will respond to every message using the processMessage method:

private void processMessage(RequestDTO requestDTO) { CompletionStage responseFuture = getRandomMessage(); responseFuture.thenCompose(this::consumeHttpResponse) .thenAccept(messageDTO -> out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf())); }

3.4. Consuming Rest API with Akka HTTP

We'll send HTTP requests to the dummy message generator at JSONPlaceholder Posts. When the response arrives, we send the response to the client by writing it out.

Let's have a method that calls the endpoint with a random post id:

private CompletionStage getRandomMessage() { int postId = ThreadLocalRandom.current().nextInt(0, 100); return Http.get(getContext().getSystem()) .singleRequest(HttpRequest.create( "//jsonplaceholder.typicode.com/posts/" + postId)); }

We're also processing the HttpResponse we get from calling the service in order to get the JSON response:

private CompletionStage consumeHttpResponse( HttpResponse httpResponse) { Materializer materializer = Materializer.matFromSystem(getContext().getSystem()); return Jackson.unmarshaller(MessageDTO.class) .unmarshal(httpResponse.entity(), materializer) .thenApply(messageDTO -> { log.info("Received message: {}", messageDTO); discardEntity(httpResponse, materializer); return messageDTO; }); }

The MessageConverter class is a utility for converting between JsonNode and the DTOs:

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) { ObjectMapper mapper = new ObjectMapper(); return mapper.convertValue(jsonNode, MessageDTO.class); }

Next, we need to discard the entity. The discardEntityBytes convenience method serves the purpose of easily discarding the entity if it has no purpose for us.

Let's see how to discard the bytes:

private void discardEntity( HttpResponse httpResponse, Materializer materializer) { HttpMessage.DiscardedEntity discarded = httpResponse.discardEntityBytes(materializer); discarded.completionStage() .whenComplete((done, ex) -> log.info("Entity discarded completely!")); }

Now having done the handling of the WebSocket, let's see how we can set up a client for this using HTML5 WebSockets.

4. Setting up the WebSocket Client

For our client, let's build a simple web-based chat application.

4.1. The Controller Action

We need to define a controller action that renders the index page. We'll put this in the controller class app.controllers.HomeController:

public Result index(Http.Request request) { String url = routes.HomeController.socket() .webSocketURL(request); return ok(views.html.index.render(url)); } 

4.2. The Template Page

Now, let's head over to the app/views/ndex.scala.html page and add a container for the received messages and a form to capture a new message:

 F   Send 

We'll also need to pass in the URL for the WebSocket controller action by declaring this parameter at the top of the app/views/index.scala.htmlpage:

@(url: String)

4.3. WebSocket Event Handlers in JavaScript

And now, we can add the JavaScript to handle the WebSocket events. For simplicity, we'll add the JavaScript functions at the bottom of the app/views/index.scala.html page.

Let's declare the event handlers:

var webSocket; var messageInput; function init() { initWebSocket(); } function initWebSocket() { webSocket = new WebSocket("@url"); webSocket.onopen = onOpen; webSocket.onclose = onClose; webSocket.onmessage = onMessage; webSocket.onerror = onError; }

Let's add the handlers themselves:

function onOpen(evt) { writeToScreen("CONNECTED"); } function onClose(evt) { writeToScreen("DISCONNECTED"); } function onError(evt) { writeToScreen("ERROR: " + JSON.stringify(evt)); } function onMessage(evt) { var receivedData = JSON.parse(evt.data); appendMessageToView("Server", receivedData.body); }

Then, to present the output, we'll use the functions appendMessageToView and writeToScreen:

function appendMessageToView(title, message) { $("#messageContent").append("

" + title + ": " + message + "

"); } function writeToScreen(message) { console.log("New message: ", message); }

4.4. Running and Testing the Application

We're ready to test the application, so let's run it:

cd websockets sbt run

With the application running, we can chat with the server by visiting //localhost:9000:

Every time we type a message and hit Send the server will immediately respond with some lorem ipsum from the JSON Placeholder service.

5. Handling WebSockets Directly with Akka Streams

If we are processing a stream of events from a source and sending these to the client, then we can model this around Akka streams.

Let's see how we can use Akka streams in an example where the server sends messages every two seconds.

We'll start with the WebSocket action in the HomeController:

public WebSocket akkaStreamsSocket() { return WebSocket.Json.accept(request -> { Sink in = Sink.foreach(System.out::println); MessageDTO messageDTO = new MessageDTO("1", "1", "Title", "Test Body"); Source out = Source.tick( Duration.ofSeconds(2), Duration.ofSeconds(2), MessageConverter.messageToJsonNode(messageDTO) ); return Flow.fromSinkAndSource(in, out); }); }

The Source#tick method takes three parameters. The first is the initial delay before the first tick is processed, and the second is the interval between successive ticks. We've set both values to two seconds in the above snippet. The third parameter is an object that should be returned on each tick.

To see this in action, we need to modify the URL in the index action and make it point to the akkaStreamsSocket endpoint:

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

And now refreshing the page, we'll see a new entry every two seconds:

6. Terminating the Actor

At some point, we'll need to shut down the chat, either through a user request or through a timeout.

6.1. Handling Actor Termination

How do we detect when a WebSocket has been closed?

Play will automatically close the WebSocket when the actor that handles the WebSocket terminates. So we can handle this scenario by implementing the Actor#postStop method:

@Override public void postStop() throws Exception { log.info("Messenger actor stopped at {}", OffsetDateTime.now() .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); }

6.2. Manually Terminating the Actor

Further, if we must stop the actor, we can send a PoisonPill to the actor. In our example application, we should be able to handle a “stop” request.

Let's see how to do this in the onSendMessage method:

private void onSendMessage(JsonNode jsonNode) { RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode); String message = requestDTO.getMessage().toLowerCase(); if("stop".equals(message)) { MessageDTO messageDTO = createMessageDTO("1", "1", "Stop", "Stopping actor"); out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()); self().tell(PoisonPill.getInstance(), getSelf()); } else { log.info("Actor received. {}", requestDTO); processMessage(requestDTO); } }

When we receive a message, we check if it's a stop request. If it is, we send the PoisonPill. Otherwise, we process the request.

7. Configuration Options

We can configure several options in terms of how the WebSocket should be handled. Let's look at a few.

7.1. WebSocket Frame Length

WebSocket communication involves the exchange of data frames.

The WebSocket frame length is configurable. We have the option to adjust the frame length to our application requirements.

Configuring a shorter frame length may help reduce denial of service attacks that use long data frames. We can change the frame length for the application by specifying the max length in application.conf:

play.server.websocket.frame.maxLength = 64k

We can also set this configuration option by specifying the max length as a command-line parameter:

sbt -Dwebsocket.frame.maxLength=64k run

7.2. Connection Idle Timeout

By default, the actor we use to handle the WebSocket is terminated after one minute. This is because the Play server in which our application is running has a default idle timeout of 60 seconds. This means that all connections that do not receive a request in sixty seconds are closed automatically.

We can change this through configuration options. Let's head over to our application.conf and change the server to have no idle timeout:

play.server.http.idleTimeout = "infinite"

Or we can pass in the option as command-line arguments:

sbt -Dhttp.idleTimeout=infinite run

We can also configure this by specifying devSettings in build.sbt.

Config options specified in build.sbt are only used in development, they will be ignored in production:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

Wenn wir die Anwendung erneut ausführen, wird der Akteur nicht beendet.

Wir können den Wert in Sekunden ändern:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

Weitere Informationen zu den verfügbaren Konfigurationsoptionen finden Sie in der Play Framework-Dokumentation.

8. Fazit

In diesem Tutorial haben wir WebSockets im Play Framework mit Akka-Darstellern und Akka-Streams implementiert.

Anschließend haben wir uns angesehen, wie Akka-Schauspieler direkt verwendet werden, und dann gesehen, wie Akka-Streams für die WebSocket-Verbindung eingerichtet werden können.

Auf der Clientseite haben wir JavaScript verwendet, um unsere WebSocket-Ereignisse zu verarbeiten.

Schließlich haben wir uns einige Konfigurationsoptionen angesehen, die wir verwenden können.

Wie üblich ist der Quellcode für dieses Tutorial auf GitHub verfügbar.