Ein Leitfaden zum Axon Framework

1. Übersicht

In diesem Artikel werden wir uns mit Axon befassen und wie es uns hilft, Anwendungen unter Berücksichtigung von CQRS (Command Query Responsibility Segregation) und Event Sourcing zu implementieren .

In diesem Handbuch werden sowohl Axon Framework als auch Axon Server verwendet. Ersteres wird unsere Implementierung enthalten und letzteres wird unsere dedizierte Event Store- und Message Routing-Lösung sein.

Die Beispielanwendung, die wir erstellen, konzentriert sich auf eine Auftragsdomäne . Dafür werden wir die CQRS- und Event Sourcing-Bausteine ​​nutzen, die Axon uns zur Verfügung stellt .

Beachten Sie, dass viele der gemeinsam genutzten Konzepte direkt aus DDD stammen, was den Rahmen dieses aktuellen Artikels sprengt.

2. Maven-Abhängigkeiten

Wir erstellen eine Axon / Spring Boot-Anwendung. Daher müssen wir unserer pom.xml die neueste Axon-Spring-Boot-Starter- Abhängigkeit sowie die Axon-Test- Abhängigkeit zum Testen hinzufügen :

 org.axonframework axon-spring-boot-starter 4.1.2   org.axonframework axon-test 4.1.2 test 

3. Axon Server

Wir werden Axon Server als unseren Ereignisspeicher und unsere dedizierte Routing-Lösung für Befehle, Ereignisse und Abfragen verwenden.

Als Event Store bietet es uns die idealen Eigenschaften, die beim Speichern von Events erforderlich sind. Dieser Artikel bietet Hintergrundinformationen, warum dies wünschenswert ist.

Als Message Routing-Lösung haben wir die Möglichkeit, mehrere Instanzen miteinander zu verbinden, ohne uns auf die Konfiguration von Dingen wie RabbitMQ oder Kafka zum Teilen und Versenden von Nachrichten zu konzentrieren.

Axon Server kann hier heruntergeladen werden. Da es sich um eine einfache JAR-Datei handelt, reicht der folgende Vorgang aus, um sie zu starten:

java -jar axonserver.jar

Dadurch wird eine einzelne Axon Server-Instanz gestartet, auf die über localhost zugegriffen werden kann: 8024 . Der Endpunkt bietet einen Überblick über die verbundenen Anwendungen und die Nachrichten, die sie verarbeiten können, sowie einen Abfragemechanismus für den in Axon Server enthaltenen Ereignisspeicher.

Die Standardkonfiguration von Axon Server zusammen mit der Abhängigkeit von Axon-Spring-Boot-Starter stellt sicher, dass unser Bestellservice automatisch eine Verbindung dazu herstellt.

4. Order Service API - Befehle

Wir richten unseren Bestellservice unter Berücksichtigung von CQRS ein. Daher werden wir die Nachrichten hervorheben, die durch unsere Anwendung fließen.

Zuerst definieren wir die Befehle, dh die Absichtsausdrücke. Der Bestellservice kann drei verschiedene Arten von Aktionen ausführen:

  1. Eine neue Bestellung aufgeben
  2. Bestellung bestätigen
  3. Versand einer Bestellung

Natürlich gibt es drei Befehlsnachrichten, mit denen unsere Domain umgehen kann - PlaceOrderCommand , ConfirmOrderCommand und ShipOrderCommand :

public class PlaceOrderCommand { @TargetAggregateIdentifier private final String orderId; private final String product; // constructor, getters, equals/hashCode and toString } public class ConfirmOrderCommand { @TargetAggregateIdentifier private final String orderId; // constructor, getters, equals/hashCode and toString } public class ShipOrderCommand { @TargetAggregateIdentifier private final String orderId; // constructor, getters, equals/hashCode and toString }

Die Annotation TargetAggregateIdentifier teilt Axon mit, dass das mit Annotationen versehene Feld eine ID eines bestimmten Aggregats ist, auf das der Befehl ausgerichtet werden soll. Wir werden später in diesem Artikel kurz auf Aggregate eingehen.

Beachten Sie außerdem, dass wir die Felder in den Befehlen als endgültig markiert haben . Dies ist beabsichtigt, da es eine bewährte Methode für ist jede Nachricht Implementierung unveränderlich zu sein .

5. Order Service API - Ereignisse

Unser Aggregat übernimmt die Befehle , da es entscheidet, ob eine Bestellung aufgegeben, bestätigt oder versendet werden kann.

Sie wird den Rest der Anwendung über ihre Entscheidung informieren, indem sie eine Veranstaltung veröffentlicht. Es gibt drei Arten von Ereignissen: OrderPlacedEvent, OrderConfirmedEvent und OrderShippedEvent :

public class OrderPlacedEvent { private final String orderId; private final String product; // default constructor, getters, equals/hashCode and toString } public class OrderConfirmedEvent { private final String orderId; // default constructor, getters, equals/hashCode and toString } public class OrderShippedEvent { private final String orderId; // default constructor, getters, equals/hashCode and toString }

6. Das Befehlsmodell - Auftragsaggregat

Nachdem wir unsere Kern-API in Bezug auf die Befehle und Ereignisse modelliert haben, können wir mit der Erstellung des Befehlsmodells beginnen.

Da sich unsere Domain auf die Bearbeitung von Bestellungen konzentriert, erstellen wir ein OrderAggregate als Zentrum unseres Befehlsmodells .

6.1. Gesamtklasse

Erstellen wir also unsere grundlegende Aggregatklasse:

@Aggregate public class OrderAggregate { @AggregateIdentifier private String orderId; private boolean orderConfirmed; @CommandHandler public OrderAggregate(PlaceOrderCommand command) { AggregateLifecycle.apply(new OrderPlacedEvent(command.getOrderId(), command.getProduct())); } @EventSourcingHandler public void on(OrderPlacedEvent event) { this.orderId = event.getOrderId(); orderConfirmed = false; } protected OrderAggregate() { } }

Die Aggregat- Annotation ist eine Axon Spring-spezifische Annotation, die diese Klasse als Aggregat kennzeichnet. Das Framework wird darüber informiert, dass die erforderlichen CQRS- und Event Sourcing-spezifischen Bausteine ​​für dieses OrderAggregate instanziiert werden müssen .

Da ein Aggregat Befehle verarbeitet, die auf eine bestimmte Aggregatinstanz abzielen, müssen wir den Bezeichner mit der Annotation AggregateIdentifier angeben .

Unser Aggregat beginnt seinen Lebenszyklus mit der Behandlung des PlaceOrderCommand im OrderAggregate -Befehlsbehandlungskonstruktor. Um dem Framework mitzuteilen, dass die angegebene Funktion Befehle verarbeiten kann, fügen wir die CommandHandler- Annotation hinzu.

Bei der Verarbeitung des PlaceOrderCommand wird der Rest der Anwendung durch Veröffentlichung des OrderPlacedEvent benachrichtigt, dass eine Bestellung aufgegeben wurde. Um ein Ereignis aus einem Aggregat heraus zu veröffentlichen, verwenden wir AggregateLifecycle # apply (Object…) .

Ab diesem Punkt können wir tatsächlich damit beginnen, Event Sourcing als treibende Kraft für die Neuerstellung einer aggregierten Instanz aus ihrem Ereignisstrom zu integrieren.

We start this off with the ‘aggregate creation event', the OrderPlacedEvent, which is handled in an EventSourcingHandler annotated function to set the orderId and orderConfirmed state of the Order aggregate.

Also note that to be able to source an aggregate based on its events, Axon requires a default constructor.

6.2. Aggregate Command Handlers

Now that we have our basic aggregate, we can start implementing the remaining command handlers:

@CommandHandler public void handle(ConfirmOrderCommand command) { apply(new OrderConfirmedEvent(orderId)); } @CommandHandler public void handle(ShipOrderCommand command) { if (!orderConfirmed) { throw new UnconfirmedOrderException(); } apply(new OrderShippedEvent(orderId)); } @EventSourcingHandler public void on(OrderConfirmedEvent event) { orderConfirmed = true; }

The signature of our command and event sourcing handlers simply states handle({the-command}) and on({the-event}) to maintain a concise format.

Additionally, we've defined that an Order can only be shipped if it's been confirmed. Thus, we'll throw an UnconfirmedOrderException if this is not the case.

This exemplifies the need for the OrderConfirmedEvent sourcing handler to update the orderConfirmed state to true for the Order aggregate.

7. Testing the Command Model

First, we need to set up our test by creating a FixtureConfiguration for the OrderAggregate:

private FixtureConfiguration fixture; @Before public void setUp() { fixture = new AggregateTestFixture(OrderAggregate.class); }

The first test case should cover the simplest situation. When the aggregate handles the PlaceOrderCommand, it should produce an OrderPlacedEvent:

String orderId = UUID.randomUUID().toString(); String product = "Deluxe Chair"; fixture.givenNoPriorActivity() .when(new PlaceOrderCommand(orderId, product)) .expectEvents(new OrderPlacedEvent(orderId, product));

Next, we can test the decision-making logic of only being able to ship an Order if it's been confirmed. Due to this, we have two scenarios — one where we expect an exception, and one where we expect an OrderShippedEvent.

Let's take a look at the first scenario, where we expect an exception:

String orderId = UUID.randomUUID().toString(); String product = "Deluxe Chair"; fixture.given(new OrderPlacedEvent(orderId, product)) .when(new ShipOrderCommand(orderId)) .expectException(IllegalStateException.class); 

And now the second scenario, where we expect an OrderShippedEvent:

String orderId = UUID.randomUUID().toString(); String product = "Deluxe Chair"; fixture.given(new OrderPlacedEvent(orderId, product), new OrderConfirmedEvent(orderId)) .when(new ShipOrderCommand(orderId)) .expectEvents(new OrderShippedEvent(orderId));

8. The Query Model – Event Handlers

So far, we've established our core API with the commands and events, and we have the Command model of our CQRS Order service, the Order aggregate, in place.

Next, we can start thinking of one of the Query Models our application should service.

One of these models is the OrderedProducts:

public class OrderedProduct { private final String orderId; private final String product; private OrderStatus orderStatus; public OrderedProduct(String orderId, String product) { this.orderId = orderId; this.product = product; orderStatus = OrderStatus.PLACED; } public void setOrderConfirmed() { this.orderStatus = OrderStatus.CONFIRMED; } public void setOrderShipped() { this.orderStatus = OrderStatus.SHIPPED; } // getters, equals/hashCode and toString functions } public enum OrderStatus { PLACED, CONFIRMED, SHIPPED }

We'll update this model based on the events propagating through our system. A Spring Service bean to update our model will do the trick:

@Service public class OrderedProductsEventHandler { private final Map orderedProducts = new HashMap(); @EventHandler public void on(OrderPlacedEvent event) { String orderId = event.getOrderId(); orderedProducts.put(orderId, new OrderedProduct(orderId, event.getProduct())); } // Event Handlers for OrderConfirmedEvent and OrderShippedEvent... }

As we've used the axon-spring-boot-starter dependency to initiate our Axon application, the framework will automatically scan all the beans for existing message-handling functions.

As the OrderedProductsEventHandler has EventHandler annotated functions to store an OrderedProduct and update it, this bean will be registered by the framework as a class that should receive events without requiring any configuration on our part.

9. The Query Model – Query Handlers

Next, to query this model, for example, to retrieve all the ordered products, we should first introduce a Query message to our core API:

public class FindAllOrderedProductsQuery { }

Second, we'll have to update the OrderedProductsEventHandler to be able to handle the FindAllOrderedProductsQuery:

@QueryHandler public List handle(FindAllOrderedProductsQuery query) { return new ArrayList(orderedProducts.values()); }

The QueryHandler annotated function will handle the FindAllOrderedProductsQuery and is set to return a List regardless, similarly to any ‘find all' query.

10. Putting Everything Together

We've fleshed out our core API with commands, events, and queries, and set up our Command and Query model by having an OrderAggregate and OrderedProducts model.

Next is to tie up the loose ends of our infrastructure. As we're using the axon-spring-boot-starter, this sets a lot of the required configuration automatically.

First, as we want to leverage Event Sourcing for our Aggregate, we'll need an EventStore. Axon Server which we have started up in step three will fill this hole.

Secondly, we need a mechanism to store our OrderedProduct query model. For this example, we can add h2 as an in-memory database and spring-boot-starter-data-jpa for ease of use:

 org.springframework.boot spring-boot-starter-data-jpa com.h2database h2 runtime 

10.1. Setting up a REST Endpoint

Next, we need to be able to access our application, for which we'll be leveraging a REST endpoint by adding the spring-boot-starter-web dependency:

 org.springframework.boot spring-boot-starter-web 

From our REST endpoint, we can start dispatching commands and queries:

@RestController public class OrderRestEndpoint { private final CommandGateway commandGateway; private final QueryGateway queryGateway; // Autowiring constructor and POST/GET endpoints }

The CommandGateway is used as the mechanism to send our command messages, and the QueryGateway, in turn, to send query messages. The gateways provide a simpler, more straightforward API, compared to the CommandBus and QueryBus that they connect with.

From here on, our OrderRestEndpoint should have a POST endpoint to place, confirm, and ship an order:

@PostMapping("/ship-order") public void shipOrder() { String orderId = UUID.randomUUID().toString(); commandGateway.send(new PlaceOrderCommand(orderId, "Deluxe Chair")); commandGateway.send(new ConfirmOrderCommand(orderId)); commandGateway.send(new ShipOrderCommand(orderId)); }

This rounds up the Command side of our CQRS application.

Now, all that's left is a GET endpoint to query all the OrderedProducts:

@GetMapping("/all-orders") public List findAllOrderedProducts() { return queryGateway.query(new FindAllOrderedProductsQuery(), ResponseTypes.multipleInstancesOf(OrderedProduct.class)).join(); }

In the GET endpoint, we leverage the QueryGateway to dispatch a point-to-point query. In doing so, we create a default FindAllOrderedProductsQuery, but we also need to specify the expected return type.

As we expect multiple OrderedProduct instances to be returned, we leverage the static ResponseTypes#multipleInstancesOf(Class) function. With this, we have provided a basic entrance into the Query side of our Order service.

We completed the setup, so now we can send some commands and queries through our REST Controller once we've started up the OrderApplication.

POST-ing to endpoint /ship-order will instantiate an OrderAggregate that'll publish events, which, in turn, will save/update our OrderedProducts. GET-ing from the /all-orders endpoint will publish a query message that'll be handled by the OrderedProductsEventHandler, which will return all the existing OrderedProducts.

11. Conclusion

In this article, we introduced the Axon Framework as a powerful base for building an application leveraging the benefits of CQRS and Event Sourcing.

We implemented a simple Order service using the framework to show how such an application should be structured in practice.

Lastly, Axon Server posed as our Event Store and the message routing mechanism.

The implementation of all these examples and code snippets can be found over on GitHub.

For any additional questions you may have, also check out the Axon Framework User Group.