Einführung in Project Reactor Bus

1. Übersicht

In diesem kurzen Artikel stellen wir den Reaktorbus vor, indem wir ein reales Szenario für eine reaktive, ereignisgesteuerte Anwendung erstellen.

2. Die Grundlagen des Projektreaktors

2.1. Warum Reaktor?

Moderne Anwendungen müssen eine große Anzahl von gleichzeitigen Anforderungen verarbeiten und eine erhebliche Datenmenge verarbeiten. Standard-Sperrcode reicht nicht mehr aus, um diese Anforderungen zu erfüllen.

Das reaktive Entwurfsmuster ist ein ereignisbasierter Architekturansatz für die asynchrone Verarbeitung eines großen Volumens gleichzeitiger Dienstanforderungen, die von einzelnen oder mehreren Diensthandlern stammen.

Der Projektreaktor basiert auf diesem Muster und hat ein klares und ehrgeiziges Ziel, nicht blockierende, reaktive Anwendungen auf der JVM zu erstellen .

2.2. Beispielszenarien

Bevor wir beginnen, sind hier einige interessante Szenarien aufgeführt, in denen die Nutzung des reaktiven Architekturstils sinnvoll wäre, um eine Vorstellung davon zu bekommen, wo wir ihn anwenden könnten:

  • Benachrichtigungsdienste für eine große Online-Shopping-Plattform wie Amazon
  • Riesige Transaktionsabwicklungsdienste für den Bankensektor
  • Aktienhandelsunternehmen, in denen sich die Aktienkurse gleichzeitig ändern

3. Maven-Abhängigkeiten

Beginnen wir mit der Verwendung von Project Reactor Bus, indem wir der pom.xml die folgende Abhängigkeit hinzufügen :

 io.projectreactor reactor-bus 2.0.8.RELEASE 

Wir können die neueste Version des Reaktorbusses in Maven Central überprüfen .

4. Erstellen einer Demo-Anwendung

Schauen wir uns ein praktisches Beispiel an, um die Vorteile des reaktorbasierten Ansatzes besser zu verstehen.

Wir erstellen eine einfache Anwendung, die für das Senden von Benachrichtigungen an die Benutzer einer Online-Einkaufsplattform verantwortlich ist. Wenn ein Benutzer beispielsweise eine neue Bestellung aufgibt, sendet die App eine Auftragsbestätigung per E-Mail oder SMS.

Eine typische synchrone Implementierung wäre natürlich durch den Durchsatz des E-Mail- oder SMS-Dienstes begrenzt. Daher wären Verkehrsspitzen wie Feiertage im Allgemeinen problematisch.

Mit einem reaktiven Ansatz können wir unser System flexibler gestalten und uns besser an Fehler oder Zeitüberschreitungen anpassen, die in externen Systemen wie Gateway-Servern auftreten können.

Werfen wir einen Blick auf die Anwendung - angefangen bei den traditionelleren Aspekten bis hin zu den reaktiveren Konstrukten.

4.1. Einfaches POJO

Erstellen wir zunächst eine POJO-Klasse, um die Benachrichtigungsdaten darzustellen:

public class NotificationData { private long id; private String name; private String email; private String mobile; // getter and setter methods }

4.2. Die Serviceschicht

Definieren wir nun eine einfache Service-Schicht:

public interface NotificationService { void initiateNotification(NotificationData notificationData) throws InterruptedException; }

Und die Implementierung, die einen langfristigen Betrieb simuliert:

@Service public class NotificationServiceimpl implements NotificationService { @Override public void initiateNotification(NotificationData notificationData) throws InterruptedException { System.out.println("Notification service started for " + "Notification ID: " + notificationData.getId()); Thread.sleep(5000); System.out.println("Notification service ended for " + "Notification ID: " + notificationData.getId()); } }

Beachten Sie, dass ein Real-Life - Szenario des Senden von Nachrichten per SMS oder E - Mail - Gateway zu veranschaulichen, wir Einführung absichtlich ein fünf - Sekunden - Verzögerung in der initiateNotification Methode mit Thread.sleep (5000).

Wenn ein Thread den Dienst erreicht, wird er folglich für fünf Sekunden blockiert.

4.3. Der Verbraucher

Lassen Sie uns nun zu den reaktiveren Aspekten unserer Anwendung springen und einen Verbraucher implementieren, den wir dann dem Reaktorereignisbus zuordnen:

@Service public class NotificationConsumer implements Consumer
    
      { @Autowired private NotificationService notificationService; @Override public void accept(Event notificationDataEvent) { NotificationData notificationData = notificationDataEvent.getData(); try { notificationService.initiateNotification(notificationData); } catch (InterruptedException e) { // ignore } } }
    

Wie wir sehen können, implementiert der von uns erstellte Consumer die Consumer- Schnittstelle. Die Hauptlogik befindet sich in der Accept- Methode.

Dies ist ein ähnlicher Ansatz, den wir in einer typischen Spring Listener-Implementierung verfolgen können.

4.4. Der Controller

Nachdem wir die Ereignisse nun konsumieren können, generieren wir sie auch.

Wir machen das in einem einfachen Controller:

@Controller public class NotificationController { @Autowired private EventBus eventBus; @GetMapping("/startNotification/{param}") public void startNotification(@PathVariable Integer param) { for (int i = 0; i < param; i++) { NotificationData data = new NotificationData(); data.setId(i); eventBus.notify("notificationConsumer", Event.wrap(data)); System.out.println( "Notification " + i + ": notification task submitted successfully"); } } }

Dies ist ziemlich selbsterklärend - wir senden hier Ereignisse über den EventBus aus.

Wenn ein Client beispielsweise die URL mit einem Parameterwert von zehn trifft, werden zehn Ereignisse über den Ereignisbus gesendet.

4.5. Die Java-Konfiguration

Lassen Sie uns nun alles zusammenfügen und eine einfache Spring Boot-Anwendung erstellen.

Zuerst müssen wir EventBus- und Environment- Beans konfigurieren :

@Configuration public class Config { @Bean public Environment env() { return Environment.initializeIfEmpty().assignErrorJournal(); } @Bean public EventBus createEventBus(Environment env) { return EventBus.create(env, Environment.THREAD_POOL); } }

In unserem Fall instanziieren wir den EventBus mit einem Standard-Thread-Pool, der in der Umgebung verfügbar ist .

Alternatively, we can use a customized Dispatcher instance:

EventBus evBus = EventBus.create( env, Environment.newDispatcher( REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT, DispatcherType.THREAD_POOL_EXECUTOR));

Now, we're ready to create a main application code:

import static reactor.bus.selector.Selectors.$; @SpringBootApplication public class NotificationApplication implements CommandLineRunner { @Autowired private EventBus eventBus; @Autowired private NotificationConsumer notificationConsumer; @Override public void run(String... args) throws Exception { eventBus.on($("notificationConsumer"), notificationConsumer); } public static void main(String[] args) { SpringApplication.run(NotificationApplication.class, args); } }

In our run method we're registering the notificationConsumer to be triggered when the notification matches a given selector.

Notice how we're using the static import of the $ attribute to create a Selector object.

5. Test the Application

Let's now create a test to see our NotificationApplication in action:

@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class NotificationApplicationIntegrationTest { @LocalServerPort private int port; @Test public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getForObject("//localhost:" + port + "/startNotification/10", String.class); } }

As we can see, as soon as the request is executed, all ten tasks get submitted instantly without creating any blocking. And once submitted, the notification events get processed in parallel.

Notification 0: notification task submitted successfully Notification 1: notification task submitted successfully Notification 2: notification task submitted successfully Notification 3: notification task submitted successfully Notification 4: notification task submitted successfully Notification 5: notification task submitted successfully Notification 6: notification task submitted successfully Notification 7: notification task submitted successfully Notification 8: notification task submitted successfully Notification 9: notification task submitted successfully Notification service started for Notification ID: 1 Notification service started for Notification ID: 2 Notification service started for Notification ID: 3 Notification service started for Notification ID: 0 Notification service ended for Notification ID: 1 Notification service ended for Notification ID: 0 Notification service started for Notification ID: 4 Notification service ended for Notification ID: 3 Notification service ended for Notification ID: 2 Notification service started for Notification ID: 6 Notification service started for Notification ID: 5 Notification service started for Notification ID: 7 Notification service ended for Notification ID: 4 Notification service started for Notification ID: 8 Notification service ended for Notification ID: 6 Notification service ended for Notification ID: 5 Notification service started for Notification ID: 9 Notification service ended for Notification ID: 7 Notification service ended for Notification ID: 8 Notification service ended for Notification ID: 9

Es ist wichtig zu beachten, dass in unserem Szenario diese Ereignisse nicht in einer bestimmten Reihenfolge verarbeitet werden müssen.

6. Fazit

In diesem kurzen Tutorial haben wir eine einfache ereignisgesteuerte Anwendung erstellt . Wir haben auch gesehen, wie man einen reaktiveren und nicht blockierenden Code schreibt.

Doch dieses Szenario kratzt nur an die Oberfläche des Objekts und stellt nur eine gute Basis mit dem reaktiven Paradigma zu experimentieren .

Wie immer ist der Quellcode über GitHub verfügbar.