Vom Server gesendete Ereignisse (SSE) In JAX-RS

1. Übersicht

Server-Sent Events (SSE) ist eine HTTP-basierte Spezifikation, mit der eine lang laufende Monokanalverbindung vom Server zum Client hergestellt werden kann.

Der Client initiiert die SSE-Verbindung mithilfe des Medientyps text / event-stream im Accept- Header.

Später wird es automatisch aktualisiert, ohne den Server anzufordern.

Wir können weitere Details über die Spezifikation auf der offiziellen Spezifikation überprüfen.

In diesem Tutorial stellen wir die neue JAX-RS 2.1-Implementierung von SSE vor.

Daher werden wir uns ansehen, wie wir Ereignisse mit der JAX-RS Server-API veröffentlichen können. Außerdem werden wir untersuchen, wie wir sie entweder von der JAX-RS-Client-API oder nur von einem HTTP-Client wie dem Curl- Tool verwenden können.

2. SSE-Ereignisse verstehen

Ein SSE-Ereignis ist ein Textblock, der aus folgenden Feldern besteht:

  • Ereignis: Der Typ des Ereignisses. Der Server kann viele Nachrichten unterschiedlichen Typs senden, und der Client kann nur auf einen bestimmten Typ warten oder jeden Ereignistyp anders verarbeiten
  • Daten: Die vom Server gesendete Nachricht. Wir können viele Datenleitungen für dasselbe Ereignis haben
  • ID : Die ID des Ereignisses, die zum Senden des Last-Event-ID- Headers nach einem erneuten Verbindungsversuch verwendet wird. Dies ist nützlich, da dadurch verhindert werden kann, dass der Server bereits gesendete Ereignisse sendet
  • Wiederholen: Die Zeit in Millisekunden, die der Client benötigt, um eine neue Verbindung herzustellen, wenn der Strom verloren geht. Die zuletzt empfangene ID wird automatisch über den Header " Last-Event-ID" gesendet
  • ' : ': Dies ist ein Kommentar und wird vom Client ignoriert

Außerdem werden zwei aufeinanderfolgende Ereignisse durch einen doppelten Zeilenumbruch ' \ n \ n ' getrennt.

Darüber hinaus können die Daten im selben Ereignis in vielen Zeilen geschrieben werden, wie im folgenden Beispiel dargestellt:

event: stock id: 1 : price change retry: 4000 data: {"dateTime":"2018-07-14T18:06:00.285","id":1, data: "name":"GOOG","price":75.7119} event: stock id: 2 : price change retry: 4000 data: {"dateTime":"2018-07-14T18:06:00.285","id":2,"name":"IBM","price":83.4611}

In JAX RS wird ein SSE - Ereignis durch die abstrahierte SseEvent Schnittstelle , oder genauer gesagt, von den beiden Subinterfaces OutboundSseEvent und InboundSseEvent.

Während das OutboundSseEvent in der Server-API verwendet wird und ein gesendetes Ereignis entwirft, wird das InboundSseEvent von der Client-API verwendet und abstrahiert ein empfangenes Ereignis .

3. Veröffentlichen von SSE-Ereignissen

Nachdem wir nun besprochen haben, was ein SSE-Ereignis ist, wollen wir sehen, wie wir es erstellen und an einen HTTP-Client senden können.

3.1. Projektaufbau

Wir haben bereits ein Tutorial zum Einrichten eines JAX RS-basierten Maven-Projekts. Schauen Sie sich dort um, wie Sie Abhängigkeiten festlegen und mit JAX RS beginnen.

3.2. SSE-Ressourcenmethode

Eine SSE-Ressourcenmethode ist eine JAX-RS-Methode, die:

  • Kann einen Text- / Ereignisstrom- Medientyp erzeugen
  • Hat einen injizierten SseEventSink- Parameter, an den Ereignisse gesendet werden
  • Kann auch einen injizierten Sse- Parameter haben, der als Einstiegspunkt zum Erstellen eines Event Builders verwendet wird
@GET @Path("prices") @Produces("text/event-stream") public void getStockPrices(@Context SseEventSink sseEventSink, @Context Sse sse) { //... }

Infolgedessen sollte der Client die erste HTTP-Anforderung mit dem folgenden HTTP-Header stellen:

Accept: text/event-stream 

3.3. Die SSE-Instanz

Eine SSE-Instanz ist eine Kontext-Bean, die von der JAX RS Runtime zur Injektion bereitgestellt wird.

Wir könnten es als Fabrik nutzen, um Folgendes zu schaffen:

  • OutboundSseEvent.Builder - ermöglicht es uns, Ereignisse zu erstellen
  • SseBroadcaster - Ermöglicht die Übertragung von Ereignissen an mehrere Abonnenten

Mal sehen, wie das funktioniert:

@Context public void setSse(Sse sse) { this.sse = sse; this.eventBuilder = sse.newEventBuilder(); this.sseBroadcaster = sse.newBroadcaster(); }

Konzentrieren wir uns nun auf den Event Builder. OutboundSseEvent.Builder ist für die Erstellung des OutboundSseEvent verantwortlich :

OutboundSseEvent sseEvent = this.eventBuilder .name("stock") .id(String.valueOf(lastEventId)) .mediaType(MediaType.APPLICATION_JSON_TYPE) .data(Stock.class, stock) .reconnectDelay(4000) .comment("price change") .build();

Wie wir sehen können, verfügt der Builder über Methoden zum Festlegen von Werten für alle oben gezeigten Ereignisfelder . Zusätzlich wird die mediaType () -Methode verwendet, um das Datenfeld-Java-Objekt in ein geeignetes Textformat zu serialisieren.

Standardmäßig ist der Medientyp des Datenfelds text / plain . Daher muss es beim Umgang mit dem Datentyp String nicht explizit angegeben werden .

Andernfalls müssen wir, wenn wir ein benutzerdefiniertes Objekt verarbeiten möchten, den Medientyp angeben oder einen benutzerdefinierten MessageBodyWriter bereitstellen . Die JAX RS Runtime bietet MessageBodyWriter für die bekanntesten Medientypen .

Die Sse-Instanz verfügt außerdem über zwei Builder-Verknüpfungen zum Erstellen eines Ereignisses nur mit dem Datenfeld oder den Feldern Typ und Daten:

OutboundSseEvent sseEvent = sse.newEvent("cool Event"); OutboundSseEvent sseEvent = sse.newEvent("typed event", "data Event");

3.4. Einfaches Ereignis senden

Jetzt wissen wir, wie man Ereignisse erstellt und wie eine SSE-Ressource funktioniert. Senden wir ein einfaches Ereignis.

Die SseEventSink- Schnittstelle abstrahiert eine einzelne HTTP-Verbindung. Die JAX-RS Runtime kann sie nur durch Injektion in die SSE-Ressourcenmethode verfügbar machen.

Das Senden eines Ereignisses ist dann so einfach wie das Aufrufen von SseEventSink. senden().

Im nächsten Beispiel werden eine Reihe von Bestandsaktualisierungen gesendet und schließlich der Ereignisstrom geschlossen:

@GET @Path("prices") @Produces("text/event-stream") public void getStockPrices(@Context SseEventSink sseEventSink /*..*/) { int lastEventId = //..; while (running) { Stock stock = stockService.getNextTransaction(lastEventId); if (stock != null) { OutboundSseEvent sseEvent = this.eventBuilder .name("stock") .id(String.valueOf(lastEventId)) .mediaType(MediaType.APPLICATION_JSON_TYPE) .data(Stock.class, stock) .reconnectDelay(3000) .comment("price change") .build(); sseEventSink.send(sseEvent); lastEventId++; } //.. } sseEventSink.close(); }

Nach dem Senden aller Ereignisse schließt der Server die Verbindung entweder durch explizites Aufrufen der Methode close () oder vorzugsweise durch Verwenden der Methode try-with-resource, wenn SseEventSink die AutoClosable- Schnittstelle erweitert:

try (SseEventSink sink = sseEventSink) { OutboundSseEvent sseEvent = //.. sink.send(sseEvent); }

In our sample app we can see this running if we visit:

//localhost:9080/sse-jaxrs-server/sse.html

3.5. Broadcasting Events

Broadcasting is the process by which events are sent to multiple clients simultaneously. This is accomplished by the SseBroadcaster API, and it is done in three simple steps:

First, we create the SseBroadcaster object from an injected Sse context as shown previously:

SseBroadcaster sseBroadcaster = sse.newBroadcaster();

Then, clients should subscribe to be able to receive Sse Events. This is generally done in an SSE resource method where a SseEventSink context instance is injected:

@GET @Path("subscribe") @Produces(MediaType.SERVER_SENT_EVENTS) public void listen(@Context SseEventSink sseEventSink) { this.sseBroadcaster.register(sseEventSink); }

And finally, we can trigger the event publishing by invoking the broadcast() method:

@GET @Path("publish") public void broadcast() { OutboundSseEvent sseEvent = //...; this.sseBroadcaster.broadcast(sseEvent); }

This will send the same event to each registered SseEventSink.

To showcase the broadcasting, we can access this URL:

//localhost:9080/sse-jaxrs-server/sse-broadcast.html

And then we can trigger the broadcasting by invoking the broadcast() resource method:

curl -X GET //localhost:9080/sse-jaxrs-server/sse/stock/publish

4. Consuming SSE Events

To consume an SSE event sent by the server, we can use any HTTP client, but for this tutorial, we'll use the JAX RS client API.

4.1. JAX RS Client API for SSE

To get started with the client API for SSE, we need to provide dependencies for JAX RS Client implementation.

Here, we'll use Apache CXF client implementation:

 org.apache.cxf cxf-rt-rs-client ${cxf-version}   org.apache.cxf cxf-rt-rs-sse ${cxf-version} 

The SseEventSource is the heart of this API, and it is constructed from The WebTarget.

We start by listening for incoming events whose are abstracted by the InboundSseEvent interface:

Client client = ClientBuilder.newClient(); WebTarget target = client.target(url); try (SseEventSource source = SseEventSource.target(target).build()) { source.register((inboundSseEvent) -> System.out.println(inboundSseEvent)); source.open(); }

Sobald die Verbindung hergestellt ist, wird der registrierte Ereigniskonsument für jedes empfangene InboundSseEvent aufgerufen .

Wir können dann die readData () -Methode verwenden, um die Originaldaten zu lesen. String:

String data = inboundSseEvent.readData();

Oder wir können die überladene Version verwenden, um das deserialisierte Java-Objekt mit dem geeigneten Medientyp abzurufen:

Stock stock = inboundSseEvent.readData(Stock.class, MediaType.Application_Json);

Hier haben wir nur einen einfachen Ereigniskonsumenten bereitgestellt, der das eingehende Ereignis in der Konsole druckt.

5. Schlussfolgerung

In diesem Tutorial haben wir uns auf die Verwendung der vom Server gesendeten Ereignisse in JAX RS 2.1 konzentriert. Wir haben ein Beispiel bereitgestellt, das zeigt, wie Ereignisse an einen einzelnen Client gesendet und Ereignisse an mehrere Clients gesendet werden.

Schließlich haben wir diese Ereignisse mithilfe der JAX-RS-Client-API verwendet.

Wie üblich ist der Code dieses Tutorials auf Github zu finden.