Vom Server gesendete Ereignisse im Frühjahr

1. Übersicht

In diesem Tutorial erfahren Sie, wie Sie mit Spring auf Server-Sent-Events basierende APIs implementieren können.

Einfach ausgedrückt, Server-Sent-Events, kurz SSE, ist ein HTTP-Standard, mit dem eine Webanwendung einen unidirektionalen Ereignisstrom verarbeiten und Aktualisierungen erhalten kann, wenn der Server Daten ausgibt.

Die Version Spring 4.2 hat es bereits unterstützt, aber ab Spring 5 haben wir jetzt eine idiomatischere und bequemere Möglichkeit, damit umzugehen.

2. SSE mit Spring 5 Webflux

Um dies zu erreichen, können wir Implementierungen wie die von der Reactor- Bibliothek bereitgestellte Flux- Klasse oder möglicherweise die ServerSentEvent- Entität verwenden , die uns die Kontrolle über die Ereignismetadaten gibt.

2.1. Streaming-Ereignisse mit Flux

Flux ist eine reaktive Darstellung eines Ereignisstroms - es wird je nach angegebenem Anforderungs- oder Antwortmedientyp unterschiedlich behandelt.

Um einen SSE-Streaming-Endpunkt zu erstellen, müssen wir den W3C-Spezifikationen folgen und seinen MIME-Typ als Text- / Ereignis-Stream festlegen :

@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux streamFlux() { return Flux.interval(Duration.ofSeconds(1)) .map(sequence -> "Flux - " + LocalTime.now().toString()); }

Die Intervallmethode erstellt einen Fluss , der inkrementell lange Werte ausgibt. Dann ordnen wir diese Werte unserer gewünschten Ausgabe zu.

Lassen Sie uns unsere Anwendung starten und ausprobieren, indem Sie dann den Endpunkt durchsuchen.

Wir werden sehen, wie der Browser auf die Ereignisse reagiert, die vom Server Sekunde für Sekunde übertragen werden. Weitere Informationen zu Flux und dem Reaktorkern finden Sie in diesem Beitrag.

2.2. Verwendung des ServerSentEvent- Elements

Wir werden jetzt unseren Ausgang wickeln String in ein ServerSentSevent Objekt, und untersuchen Sie die Vorteile , dies zu tun:

@GetMapping("/stream-sse") public Flux
    
      streamEvents() { return Flux.interval(Duration.ofSeconds(1)) .map(sequence -> ServerSentEvent. builder() .id(String.valueOf(sequence)) .event("periodic-event") .data("SSE - " + LocalTime.now().toString()) .build()); }
    

Wie wir erkennen können, bietet die Verwendung der ServerSentEvent- Entität einige Vorteile :

  1. Wir können die Ereignismetadaten verarbeiten, die wir in einem realen Fall benötigen würden
  2. Wir können die Deklaration des Medientyps " Text / Ereignisstrom " ignorieren

In diesem Fall haben wir eine ID , einen Ereignisnamen und vor allem die tatsächlichen Daten des Ereignisses angegeben.

Wir hätten auch ein Kommentarattribut und einen Wiederholungswert hinzufügen können, der die Wiederverbindungszeit angibt, die beim Senden des Ereignisses verwendet werden soll.

2.3. Konsumieren der vom Server gesendeten Ereignisse mit einem WebClient

Lassen Sie uns nun unseren Ereignisstrom mit einem WebClient nutzen .:

public void consumeServerSentEvent() { WebClient client = WebClient.create("//localhost:8080/sse-server"); ParameterizedTypeReference
    
      type = new ParameterizedTypeReference
     
      () {}; Flux
      
        eventStream = client.get() .uri("/stream-sse") .retrieve() .bodyToFlux(type); eventStream.subscribe( content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error), () -> logger.info("Completed!!!")); }
      
     
    

Mit der Abonnementmethode können wir angeben, wie wir vorgehen, wenn wir ein Ereignis erfolgreich empfangen, wenn ein Fehler auftritt und wenn das Streaming abgeschlossen ist.

In unserem Beispiel haben wir die Abrufmethode verwendet, mit der der Antworttext auf einfache und unkomplizierte Weise abgerufen werden kann.

Diese Methode löst automatisch eine WebClientResponseException aus, wenn wir eine 4xx- oder 5xx-Antwort erhalten, es sei denn, wir behandeln die Szenarien, in denen eine onStatus- Anweisung hinzugefügt wird .

Andererseits hätten wir auch die Austauschmethode verwenden können, die den Zugriff auf die ClientResponse ermöglicht und auch bei fehlgeschlagenen Antworten kein Fehlersignal gibt.

Wir müssen berücksichtigen, dass wir den ServerSentEvent- Wrapper umgehen können, wenn wir die Ereignismetadaten nicht benötigen.

3. SSE-Streaming im Frühjahr MVC

Wie bereits erwähnt, wurde die SSE-Spezifikation seit Spring 4.2 unterstützt, als die SseEmitter- Klasse eingeführt wurde.

In einfachen Worten, wir definieren einen ExecutorService , einen Thread, in dem der SseEmitter seine Arbeit zum Pushen von Daten erledigt , und geben die Emitter-Instanz zurück, wobei die Verbindung auf folgende Weise offen bleibt:

@GetMapping("/stream-sse-mvc") public SseEmitter streamSseMvc() { SseEmitter emitter = new SseEmitter(); ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor(); sseMvcExecutor.execute(() -> { try { for (int i = 0; true; i++) { SseEventBuilder event = SseEmitter.event() .data("SSE MVC - " + LocalTime.now().toString()) .id(String.valueOf(i)) .name("sse event - mvc"); emitter.send(event); Thread.sleep(1000); } } catch (Exception ex) { emitter.completeWithError(ex); } }); return emitter; }

Stellen Sie immer sicher, dass Sie den richtigen ExecutorService für Ihr Anwendungsfall-Szenario auswählen .

In diesem interessanten Tutorial können wir mehr über SSE in Spring MVC erfahren und uns andere Beispiele ansehen.

4. Grundlegendes zu vom Server gesendeten Ereignissen

Nachdem wir nun wissen, wie SSE-Endpunkte implementiert werden, versuchen wir, ein bisschen tiefer zu gehen, indem wir einige zugrunde liegende Konzepte verstehen.

Eine SSE ist eine Spezifikation, die von den meisten Browsern übernommen wird, um Streaming-Ereignisse jederzeit unidirektional zu ermöglichen.

Die 'Ereignisse' sind nur ein Strom von UTF-8-codierten Textdaten, die dem in der Spezifikation definierten Format folgen.

Dieses Format besteht aus einer Reihe von Schlüsselwertelementen (ID, Wiederholung, Daten und Ereignis, die den Namen angeben), die durch Zeilenumbrüche getrennt sind.

Kommentare werden ebenfalls unterstützt.

Die Spezifikation schränkt das Datennutzlastformat in keiner Weise ein. Wir können einen einfachen String oder eine komplexere JSON- oder XML-Struktur verwenden.

Ein letzter Punkt, den wir berücksichtigen müssen, ist der Unterschied zwischen der Verwendung von SSE-Streaming und WebSockets .

Während WebSockets eine Vollduplex-Kommunikation (bidirektional) zwischen dem Server und dem Client bieten, verwendet SSE eine unidirektionale Kommunikation.

Außerdem ist WebSockets kein HTTP-Protokoll und bietet im Gegensatz zu SSE keine Standards für die Fehlerbehandlung.

5. Schlussfolgerung

Zusammenfassend haben wir in diesem Artikel die Hauptkonzepte des SSE-Streamings kennengelernt. Dies ist zweifellos eine großartige Ressource, mit der wir Systeme der nächsten Generation erstellen können.

Wir sind jetzt in einer hervorragenden Position, um zu verstehen, was unter der Haube passiert, wenn wir dieses Protokoll verwenden.

Darüber hinaus haben wir die Theorie durch einige einfache Beispiele ergänzt, die in unserem Github-Repository zu finden sind.