Einführung in MBassador

1. Übersicht

Einfach ausgedrückt ist MBassador ein Hochleistungs-Ereignisbus, der die Publish-Subscribe-Semantik verwendet.

Nachrichten werden an einen oder mehrere Peers gesendet, ohne vorher zu wissen, wie viele Abonnenten es gibt oder wie sie die Nachricht verwenden.

2. Maven-Abhängigkeit

Bevor wir die Bibliothek verwenden können, müssen wir die mbassador-Abhängigkeit hinzufügen:

 net.engio mbassador 1.3.1 

3. Grundlegende Ereignisbehandlung

3.1. Einfaches Beispiel

Wir beginnen mit einem einfachen Beispiel für die Veröffentlichung einer Nachricht:

private MBassador dispatcher = new MBassador(); private String messageString; @Before public void prepareTests() { dispatcher.subscribe(this); } @Test public void whenStringDispatched_thenHandleString() { dispatcher.post("TestString").now(); assertNotNull(messageString); assertEquals("TestString", messageString); } @Handler public void handleString(String message) { messageString = message; } 

Am Anfang dieser Testklasse sehen wir die Erstellung eines MBassador mit seinem Standardkonstruktor. Als nächstes rufen wir in der @ Before- Methode subscribe () auf und übergeben einen Verweis auf die Klasse selbst.

In subscribe () überprüft der Dispatcher den Abonnenten auf @ Handler- Anmerkungen.

Und im ersten Test rufen wir dispatcher.post (…) .now () auf, um die Nachricht zu versenden - was dazu führt, dass handleString () aufgerufen wird.

Dieser erste Test zeigt mehrere wichtige Konzepte. Jedes Objekt kann ein Abonnent sein, sofern eine oder mehrere mit @Handler kommentierte Methoden vorhanden sind . Ein Teilnehmer kann eine beliebige Anzahl von Handlern haben.

Wir verwenden der Einfachheit halber Testobjekte, die sich selbst abonnieren. In den meisten Produktionsszenarien werden Nachrichten-Disponenten jedoch in andere Klassen als Verbraucher eingeteilt.

Handler-Methoden haben nur einen Eingabeparameter - die Nachricht - und können keine aktivierten Ausnahmen auslösen.

Ähnlich wie bei der subscribe () -Methode akzeptiert die post-Methode jedes Objekt . Dieses Objekt wird an Abonnenten geliefert.

Wenn eine Nachricht gesendet wird, wird sie an alle Listener gesendet, die den Nachrichtentyp abonniert haben.

Fügen wir einen weiteren Nachrichtenhandler hinzu und senden einen anderen Nachrichtentyp:

private Integer messageInteger; @Test public void whenIntegerDispatched_thenHandleInteger() { dispatcher.post(42).now(); assertNull(messageString); assertNotNull(messageInteger); assertTrue(42 == messageInteger); } @Handler public void handleInteger(Integer message) { messageInteger = message; } 

Wie erwartet, wenn wir versendenEine Ganzzahl , handleInteger (), wird aufgerufen und handleString () nicht. Ein einzelner Dispatcher kann verwendet werden, um mehr als einen Nachrichtentyp zu senden.

3.2. Tote Nachrichten

Wohin geht eine Nachricht, wenn es keinen Handler dafür gibt? Fügen wir einen neuen Ereignishandler hinzu und senden dann einen dritten Nachrichtentyp:

private Object deadEvent; @Test public void whenLongDispatched_thenDeadEvent() { dispatcher.post(42L).now(); assertNull(messageString); assertNull(messageInteger); assertNotNull(deadEvent); assertTrue(deadEvent instanceof Long); assertTrue(42L == (Long) deadEvent); } @Handler public void handleDeadEvent(DeadMessage message) { deadEvent = message.getMessage(); } 

In diesem Test senden wir ein Long anstelle eines Integer. Weder handleInteger () noch handleString () werden aufgerufen, handleDeadEvent () jedoch.

Wenn für eine Nachricht keine Handler vorhanden sind, wird sie in ein DeadMessage- Objekt eingeschlossen. Da wir einen Handler für Deadmessage hinzugefügt haben , erfassen wir ihn.

DeadMessage kann ignoriert werden. Wenn eine Anwendung tote Nachrichten nicht verfolgen muss, kann sie nirgendwo hingehen.

4. Verwenden einer Ereignishierarchie

Das Senden von String- und Integer- Ereignissen ist einschränkend. Erstellen wir einige Nachrichtenklassen:

public class Message {} public class AckMessage extends Message {} public class RejectMessage extends Message { int code; // setters and getters }

Wir haben eine einfache Basisklasse und zwei Klassen, die sie erweitern.

4.1. Senden einer Basisklassennachricht

Wir beginnen mit Nachrichtenereignissen :

private MBassador dispatcher = new MBassador(); private Message message; private AckMessage ackMessage; private RejectMessage rejectMessage; @Before public void prepareTests() { dispatcher.subscribe(this); } @Test public void whenMessageDispatched_thenMessageHandled() { dispatcher.post(new Message()).now(); assertNotNull(message); assertNull(ackMessage); assertNull(rejectMessage); } @Handler public void handleMessage(Message message) { this.message = message; } @Handler public void handleRejectMessage(RejectMessage message) { rejectMessage = message; } @Handler public void handleAckMessage(AckMessage message) { ackMessage = message; }

Entdecken Sie MBassador - einen leistungsstarken Pub-Sub-Eventbus. Dies beschränkt uns auf die Verwendung von Nachrichten , fügt jedoch eine zusätzliche Ebene der Typensicherheit hinzu.

Wenn wir eine Nachricht senden , empfängt handleMessage () sie. Die anderen beiden Handler nicht.

4.2. Senden einer Unterklassen-Nachricht

Senden wir eine RejectMessage :

@Test public void whenRejectDispatched_thenMessageAndRejectHandled() { dispatcher.post(new RejectMessage()).now(); assertNotNull(message); assertNotNull(rejectMessage); assertNull(ackMessage); }

Wenn wir eine RejectMessage senden, empfangen sie sowohl handleRejectMessage () als auch handleMessage () .

Da RejectMessage erstreckt Nachricht, die Nachricht empfangen Handler es, zusätzlich zu dem R ejectMessage Handler.

Lassen Sie uns dieses Verhalten mit einer AckMessage überprüfen :

@Test public void whenAckDispatched_thenMessageAndAckHandled() { dispatcher.post(new AckMessage()).now(); assertNotNull(message); assertNotNull(ackMessage); assertNull(rejectMessage); }

Wie erwartet erhalten sowohl handleAckMessage () als auch handleMessage () beim Senden einer AckMessage diese.

5. Nachrichten filtern

Das Organisieren von Nachrichten nach Typ ist bereits eine leistungsstarke Funktion, aber wir können sie noch weiter filtern.

5.1. Filter nach Klasse und Unterklasse

Wenn wir eine RejectMessage oder AckMessage gepostet haben , haben wir das Ereignis sowohl im Ereignishandler für den jeweiligen Typ als auch in der Basisklasse erhalten.

We can solve this type hierarchy issue by making Message abstract and creating a class such as GenericMessage. But what if we don't have this luxury?

We can use message filters:

private Message baseMessage; private Message subMessage; @Test public void whenMessageDispatched_thenMessageFiltered() { dispatcher.post(new Message()).now(); assertNotNull(baseMessage); assertNull(subMessage); } @Test public void whenRejectDispatched_thenRejectFiltered() { dispatcher.post(new RejectMessage()).now(); assertNotNull(subMessage); assertNull(baseMessage); } @Handler(filters = { @Filter(Filters.RejectSubtypes.class) }) public void handleBaseMessage(Message message) { this.baseMessage = message; } @Handler(filters = { @Filter(Filters.SubtypesOnly.class) }) public void handleSubMessage(Message message) { this.subMessage = message; }

The filters parameter for the @Handler annotation accepts a Class that implements IMessageFilter. The library offers two examples:

The Filters.RejectSubtypes does as its name suggests: it will filter out any subtypes. In this case, we see that RejectMessage is not handled by handleBaseMessage().

The Filters.SubtypesOnly also does as its name suggests: it will filter out any base types. In this case, we see that Message is not handled by handleSubMessage().

5.2. IMessageFilter

The Filters.RejectSubtypes and the Filters.SubtypesOnly both implement IMessageFilter.

RejectSubTypes compares the class of the message to its defined message types and will only allow through messages that equal one of its types, as opposed to any subclasses.

5.3. Filter With Conditions

Fortunately, there is an easier way of filtering messages. MBassador supports a subset of Java EL expressions as conditions for filtering messages.

Let's filter a String message based on its length:

private String testString; @Test public void whenLongStringDispatched_thenStringFiltered() { dispatcher.post("foobar!").now(); assertNull(testString); } @Handler(condition = "msg.length() < 7") public void handleStringMessage(String message) { this.testString = message; }

The “foobar!” message is seven characters long and is filtered. Let's send a shorter String:

 @Test public void whenShortStringDispatched_thenStringHandled() { dispatcher.post("foobar").now(); assertNotNull(testString); }

Now, the “foobar” is only six characters long and is passed through.

Our RejectMessage contains a field with an accessor. Let's write a filter for that:

private RejectMessage rejectMessage; @Test public void whenWrongRejectDispatched_thenRejectFiltered() { RejectMessage testReject = new RejectMessage(); testReject.setCode(-1); dispatcher.post(testReject).now(); assertNull(rejectMessage); assertNotNull(subMessage); assertEquals(-1, ((RejectMessage) subMessage).getCode()); } @Handler(condition = "msg.getCode() != -1") public void handleRejectMessage(RejectMessage rejectMessage) { this.rejectMessage = rejectMessage; }

Here again, we can query a method on an object and either filter the message or not.

5.4. Capture Filtered Messages

Similar to DeadEvents, we may want to capture and process filtered messages. There is a dedicated mechanism for capturing filtered events too. Filtered events are treated differently from “dead” events.

Let's write a test that illustrates this:

private String testString; private FilteredMessage filteredMessage; private DeadMessage deadMessage; @Test public void whenLongStringDispatched_thenStringFiltered() { dispatcher.post("foobar!").now(); assertNull(testString); assertNotNull(filteredMessage); assertTrue(filteredMessage.getMessage() instanceof String); assertNull(deadMessage); } @Handler(condition = "msg.length() < 7") public void handleStringMessage(String message) { this.testString = message; } @Handler public void handleFilterMessage(FilteredMessage message) { this.filteredMessage = message; } @Handler public void handleDeadMessage(DeadMessage deadMessage) { this.deadMessage = deadMessage; } 

With the addition of a FilteredMessage handler, we can track Strings that are filtered because of their length. The filterMessage contains our too-long String while deadMessage remains null.

6. Asynchronous Message Dispatch and Handling

So far all of our examples have used synchronous message dispatch; when we called post.now() the messages were delivered to each handler in the same thread we called post() from.

6.1. Asynchronous Dispatch

The MBassador.post() returns a SyncAsyncPostCommand. This class offers several methods, including:

  • now() – dispatch messages synchronously; the call will block until all messages have been delivered
  • asynchronously() – executes the message publication asynchronously

Let's use asynchronous dispatch in a sample class. We'll use Awaitility in these tests to simplify the code:

private MBassador dispatcher = new MBassador(); private String testString; private AtomicBoolean ready = new AtomicBoolean(false); @Test public void whenAsyncDispatched_thenMessageReceived() { dispatcher.post("foobar").asynchronously(); await().untilAtomic(ready, equalTo(true)); assertNotNull(testString); } @Handler public void handleStringMessage(String message) { this.testString = message; ready.set(true); }

We call asynchronously() in this test, and use an AtomicBoolean as a flag with await() to wait for the delivery thread to deliver the message.

If we comment out the call to await(), we risk the test failing, because we check testString before the delivery thread completes.

6.2. Asynchronous Handler Invocation

Asynchronous dispatch allows the message provider to return to message processing before the messages are delivered to each handler, but it still calls each handler in order, and each handler has to wait for the previous to finish.

This can lead to problems if one handler performs an expensive operation.

MBassador provides a mechanism for asynchronous handler invocation. Handlers configured for this receive messages in their thread:

private Integer testInteger; private String invocationThreadName; private AtomicBoolean ready = new AtomicBoolean(false); @Test public void whenHandlerAsync_thenHandled() { dispatcher.post(42).now(); await().untilAtomic(ready, equalTo(true)); assertNotNull(testInteger); assertFalse(Thread.currentThread().getName().equals(invocationThreadName)); } @Handler(delivery = Invoke.Asynchronously) public void handleIntegerMessage(Integer message) { this.invocationThreadName = Thread.currentThread().getName(); this.testInteger = message; ready.set(true); }

Handlers can request asynchronous invocation with the delivery = Invoke.Asynchronously property on the Handler annotation. We verify this in our test by comparing the Thread names in the dispatching method and the handler.

7. Customizing MBassador

So far we've been using an instance of MBassador with its default configuration. The dispatcher's behavior can be modified with annotations, similar to those we have seen so far; we'll cover a few more to finish this tutorial.

7.1. Exception Handling

Handlers cannot define checked exceptions. Instead, the dispatcher can be provided with an IPublicationErrorHandler as an argument to its constructor:

public class MBassadorConfigurationTest implements IPublicationErrorHandler { private MBassador dispatcher; private String messageString; private Throwable errorCause; @Before public void prepareTests() { dispatcher = new MBassador(this); dispatcher.subscribe(this); } @Test public void whenErrorOccurs_thenErrorHandler() { dispatcher.post("Error").now(); assertNull(messageString); assertNotNull(errorCause); } @Test public void whenNoErrorOccurs_thenStringHandler() { dispatcher.post("Error").now(); assertNull(errorCause); assertNotNull(messageString); } @Handler public void handleString(String message) { if ("Error".equals(message)) { throw new Error("BOOM"); } messageString = message; } @Override public void handleError(PublicationError error) { errorCause = error.getCause().getCause(); } }

When handleString() throws an Error, it is saved to errorCause.

7.2. Handler Priority

Handlers are called in reverse order of how they are added, but this isn't behavior we want to rely on. Even with the ability to call handlers in their threads, we may still need to know what order they will be called in.

We can set handler priority explicitly:

private LinkedList list = new LinkedList(); @Test public void whenRejectDispatched_thenPriorityHandled() { dispatcher.post(new RejectMessage()).now(); // Items should pop() off in reverse priority order assertTrue(1 == list.pop()); assertTrue(3 == list.pop()); assertTrue(5 == list.pop()); } @Handler(priority = 5) public void handleRejectMessage5(RejectMessage rejectMessage) { list.push(5); } @Handler(priority = 3) public void handleRejectMessage3(RejectMessage rejectMessage) { list.push(3); } @Handler(priority = 2, rejectSubtypes = true) public void handleMessage(Message rejectMessage) logger.error("Reject handler #3"); list.push(3); } @Handler(priority = 0) public void handleRejectMessage0(RejectMessage rejectMessage) { list.push(1); } 

Handlers are called from highest priority to lowest. Handlers with the default priority, which is zero, are called last. We see that the handler numbers pop() off in reverse order.

7.3. Reject Subtypes, the Easy Way

What happened to handleMessage() in the test above? We don't have to use RejectSubTypes.class to filter our sub types.

RejectSubTypes is a boolean flag that provides the same filtering as the class, but with better performance than the IMessageFilter implementation.

We still need to use the filter-based implementation for accepting subtypes only, though.

8. Conclusion

MBassador ist eine einfache und unkomplizierte Bibliothek zum Weiterleiten von Nachrichten zwischen Objekten. Nachrichten können auf verschiedene Arten organisiert und synchron oder asynchron versendet werden.

Und wie immer ist das Beispiel in diesem GitHub-Projekt verfügbar.