Testen von Kafka und Spring Boot

1. Übersicht

Apache Kafka ist ein leistungsstarkes, verteiltes und fehlertolerantes Stream-Verarbeitungssystem. In einem früheren Tutorial haben wir gelernt, wie man mit Spring und Kafka arbeitet.

In diesem Tutorial bauen wir auf dem vorherigen auf und lernen, wie Sie zuverlässige, in sich geschlossene Integrationstests schreiben, die nicht auf einem externen Kafka-Server basieren .

Zunächst werden wir uns jedoch mit der Verwendung und Konfiguration einer eingebetteten Instanz von Kafka befassen. Dann werden wir sehen, wie wir die beliebten Framework-Testcontainer aus unseren Tests nutzen können.

2. Abhängigkeiten

Natürlich müssen wir die Standard- Spring-Kafka- Abhängigkeit zu unserer pom.xml hinzufügen :

 org.springframework.kafka spring-kafka 2.6.3.RELEASE 

Dann benötigen wir zwei weitere Abhängigkeiten speziell für unsere Tests . Zuerst fügen wir das Spring-Kafka-Test- Artefakt hinzu:

 org.springframework.kafka spring-kafka-test 2.6.3.RELEASE test 

Und schließlich fügen wir die Testcontainer-Kafka-Abhängigkeit hinzu, die auch über Maven Central verfügbar ist:

 org.testcontainers kafka 1.15.0 test 

Nachdem wir alle erforderlichen Abhängigkeiten konfiguriert haben, können wir mit Kafka eine einfache Spring Boot-Anwendung schreiben.

3. Eine einfache Kafka Producer-Consumer-Anwendung

In diesem Tutorial liegt der Schwerpunkt unserer Tests auf einer einfachen Spring Boot Kafka-Anwendung für Hersteller und Verbraucher.

Beginnen wir mit der Definition unseres Anwendungseinstiegspunkts:

@SpringBootApplication @EnableAutoConfiguration public class KafkaProducerConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerConsumerApplication.class, args); } }

Wie wir sehen können, ist dies eine Standard-Spring Boot-Anwendung. Nach Möglichkeit möchten wir Standardkonfigurationswerte verwenden . In diesem Sinne verwenden wir die Annotation @EnableAutoConfiguration, um unsere Anwendung automatisch zu konfigurieren .

3.1. Produzenten-Setup

Als nächstes betrachten wir eine Producer-Bean, mit der wir Nachrichten an ein bestimmtes Kafka-Thema senden:

@Component public class KafkaProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, String payload) { LOGGER.info("sending payload="{}" to topic="{}"", payload, topic); kafkaTemplate.send(topic, payload); } }

Unsere oben definierte KafkaProducer- Bean ist lediglich ein Wrapper um die KafkaTemplate- Klasse. Diese Klasse bietet High-Level - Thread-sicheren Operationen, wie zum Beispiel Daten an das angegebene Thema zu senden, das ist genau das , was wir in unserer tun Sendemethode .

3.2. Consumer-Setup

Ebenso definieren wir jetzt eine einfache Consumer-Bean, die ein Kafka-Thema abhört und Nachrichten empfängt:

@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); private CountDownLatch latch = new CountDownLatch(1); private String payload = null; @KafkaListener(topics = "${test.topic}") public void receive(ConsumerRecord consumerRecord) { LOGGER.info("received payload="{}"", consumerRecord.toString()); setPayload(consumerRecord.toString()); latch.countDown(); } public CountDownLatch getLatch() { return latch; } public String getPayload() { return payload; } }

Unser einfacher Verbraucher verwendet die Annotation @KafkaListener für die Empfangsmethode , um Nachrichten zu einem bestimmten Thema abzuhören. Wir werden später sehen, wie wir die test.topic aus unseren Tests konfigurieren .

Darüber hinaus speichert die Empfangsmethode den Nachrichteninhalt in unserer Bean und dekrementiert die Anzahl der Latch- Variablen. Diese Variable ist ein einfaches thread-sicheres Zählerfeld, das wir später aus unseren Tests verwenden werden, um sicherzustellen, dass wir erfolgreich eine Nachricht erhalten haben .

Nachdem wir unsere einfache Kafka-Anwendung mit Spring Boot implementiert haben, wollen wir sehen, wie wir Integrationstests schreiben können.

4. Ein Wort zum Testen

Im Allgemeinen sollten wir beim Schreiben sauberer Integrationstests nicht auf externe Dienste angewiesen sein, die wir möglicherweise nicht steuern können oder die plötzlich nicht mehr funktionieren . Dies könnte sich nachteilig auf unsere Testergebnisse auswirken.

Wenn wir von einem externen Dienst abhängig sind, in diesem Fall einem laufenden Kafka-Broker, können wir ihn wahrscheinlich nicht so einrichten, steuern und abbauen, wie wir es von unseren Tests erwarten.

4.1. Anwendungseigenschaften

Wir werden einen sehr leichten Satz von Anwendungskonfigurationseigenschaften aus unseren Tests verwenden. Wir definieren diese Eigenschaften in unserer Datei src / test / resources / application.yml :

spring: kafka: consumer: auto-offset-reset: earliest group-id: baeldung test: topic: embedded-test-topic

Dies ist die Mindestmenge an Eigenschaften, die wir benötigen, wenn wir mit einer eingebetteten Instanz von Kafka oder einem lokalen Broker arbeiten.

Die meisten davon sind selbsterklärend, aber das , was wir besonders hervorheben sollten, ist das automatische Zurücksetzen und Zurücksetzen von Verbrauchereigenschaften : frühestens . Diese Eigenschaft stellt sicher, dass unsere Verbrauchergruppe die von uns gesendeten Nachrichten erhält, da der Container möglicherweise nach Abschluss der Sendungen gestartet wird.

Darüber hinaus konfigurieren wir eine Topic-Eigenschaft mit dem Wert embedded-test-topic . Dies ist das Thema, das wir aus unseren Tests verwenden werden.

5. Testen mit Embedded Kafka

In diesem Abschnitt sehen wir uns an, wie Sie eine In-Memory-Kafka-Instanz verwenden, um unsere Tests auszuführen. Dies ist auch als Embedded Kafka bekannt.

Der zuvor hinzugefügte Abhängigkeits- Spring-Kafka-Test enthält einige nützliche Dienstprogramme, die Sie beim Testen unserer Anwendung unterstützen. Insbesondere enthält es die EmbeddedKafkaBroker- Klasse .

Lassen Sie uns in diesem Sinne unseren ersten Integrationstest schreiben:

@SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) class EmbeddedKafkaIntegrationTest { @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own simple KafkaProducer"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

Lassen Sie uns die wichtigsten Teile unseres Tests durchgehen. Zuerst dekorieren wir unsere Testklasse mit zwei hübschen Standard-Frühlingsanmerkungen:

  • Die Annotation @SpringBootTest stellt sicher, dass unser Test den Spring-Anwendungskontext bootstrappt
  • Wir verwenden auch die Annotation @DirtiesContext , die sicherstellt, dass dieser Kontext zwischen verschiedenen Tests bereinigt und zurückgesetzt wird

Hier kommt der entscheidende Teil, wir verwenden die Annotation @EmbeddedKafka , um eine Instanz eines EmbeddedKafkaBroker in unsere Tests einzufügen . Darüber hinaus stehen verschiedene Eigenschaften zur Verfügung, mit denen wir den eingebetteten Kafka-Knoten konfigurieren können:

  • Partitionen - Dies ist die Anzahl der pro Thema verwendeten Partitionen. Um die Dinge schön und einfach zu halten, möchten wir, dass nur eine aus unseren Tests verwendet wird
  • BrokerProperties - zusätzliche Eigenschaften für den Kafka-Broker. Auch hier halten wir die Dinge einfach und geben einen Nur-Text-Listener und eine Portnummer an

Als Nächstes verkabeln wir unsere Consumer- und Producer- Klassen automatisch und konfigurieren ein Thema, um den Wert aus unserer application.properties zu verwenden .

Für das letzte Teil des Puzzles senden wir einfach eine Nachricht an unser Testthema und überprüfen, ob die Nachricht empfangen wurde und den Namen unseres Testthemas enthält .

Wenn wir unseren Test ausführen, sehen wir unter der ausführlichen Spring-Ausgabe:

... 12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer - sending payload="Sending with our own simple KafkaProducer" to topic="embedded-test-topic" ... 12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.b.kafka.embedded.KafkaConsumer - received payload= 'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, serialized key size = -1, serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),  key = null, value = Senden mit unserem eigenen einfachen KafkaProducer) ' 

Dies bestätigt, dass unser Test ordnungsgemäß funktioniert. Genial! Wir haben jetzt die Möglichkeit, eigenständige, unabhängige Integrationstests mit einem In-Memory-Kafka-Broker zu schreiben .

6. Testen von Kafka mit TestContainern

Sometimes we might see small differences between a real external service vs. an embedded in-memory instance of a service that has been specifically provided for testing purposes. Although unlikely, it could also be that the port used from our test might be occupied, causing a failure.

With that in mind, in this section, we'll see a variation on our previous approach to testing using the Testcontainers framework. We'll see how to instantiate and manage an external Apache Kafka broker hosted inside a Docker container from our integration test.

Let's define another integration test which will be quite similar to the one we saw in the previous section:

@RunWith(SpringRunner.class) @Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest(classes = KafkaProducerConsumerApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest { @ClassRule public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own controller"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

Let's take a look at the differences this time around. We're declaring the kafka field, which is a standard JUnit @ClassRule. This field is an instance of the KafkaContainer class that will prepare and manage the lifecycle of our container running Kafka.

To avoid port clashes, Testcontainers allocates a port number dynamically when our docker container starts. For this reason, we provide a custom consumer and producer factory configuration using the class KafkaTestContainersConfiguration:

@Bean public Map consumerConfigs() { Map props = new HashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // more standard configuration return props; } @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); // more standard configuration return new DefaultKafkaProducerFactory(configProps); }

We then reference this configuration via the @Import annotation at the beginning of our test.

The reason for this is that we need a way to inject the server address into our application, which as previously mentioned, is generated dynamically. We achieve this by calling the getBootstrapServers() method, which will return the bootstrap server location:

bootstrap.servers = [PLAINTEXT://localhost:32789]

Now when we run our test, we should see that Testcontainers does several things:

  • Checks our local Docker setup.
  • Pulls the confluentinc/cp-kafka:5.4.3 docker image if necessary
  • Starts a new container and waits for it to be ready
  • Finally, shuts down and deletes the container after our test finishes

Again, this is confirmed by inspecting the test output:

13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Creating container for image: confluentinc/cp-kafka:5.4.3 13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! A working integration test using a Kafka docker container.

7. Conclusion

In diesem Artikel haben wir einige Ansätze zum Testen von Kafka-Anwendungen mit Spring Boot kennengelernt. Im ersten Ansatz haben wir gesehen, wie ein lokaler In-Memory-Kafka-Broker konfiguriert und verwendet wird.

Dann haben wir aus unseren Tests gesehen, wie Testcontainer verwendet werden, um einen externen Kafka-Broker einzurichten, der in einem Docker-Container ausgeführt wird.

Wie immer ist der vollständige Quellcode des Artikels auf GitHub verfügbar.