Einführung in Apache Kafka mit Spring

Ausdauer oben

Ich habe gerade den neuen Learn Spring- Kurs angekündigt , der sich auf die Grundlagen von Spring 5 und Spring Boot 2 konzentriert:

>> Überprüfen Sie den Kurs

1. Übersicht

Apache Kafka ist ein verteiltes und fehlertolerantes Stream-Verarbeitungssystem.

In diesem Artikel behandeln wir die Spring-Unterstützung für Kafka und die Ebene der Abstraktionen, die über native Kafka Java-Client-APIs bereitgestellt werden.

Spring Kafka bringt das einfache und typische Spring-Template-Programmiermodell mit einer KafkaTemplate und nachrichtengesteuerten POJOs über die @ KafkaListener- Annotation.

2. Installation und Einrichtung

Informationen zum Herunterladen und Installieren von Kafka finden Sie in der offiziellen Anleitung hier.

Wir müssen auch die Spring-Kafka- Abhängigkeit zu unserer pom.xml hinzufügen :

 org.springframework.kafka spring-kafka 2.3.7.RELEASE 

Die neueste Version dieses Artefakts finden Sie hier.

Unsere Beispielanwendung ist eine Spring Boot-Anwendung.

In diesem Artikel wird davon ausgegangen, dass der Server mit der Standardkonfiguration gestartet wird und keine Serverports geändert werden.

3. Themen konfigurieren

Zuvor haben wir Befehlszeilentools ausgeführt, um Themen in Kafka zu erstellen, z.

$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic

Mit der Einführung von AdminClient in Kafka können wir jetzt Themen programmgesteuert erstellen.

Wir müssen die KafkaAdmin Spring Bean hinzufügen , die automatisch Themen für alle Beans vom Typ NewTopic hinzufügt:

@Configuration public class KafkaTopicConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map configs = new HashMap(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("baeldung", 1, (short) 1); } }

4. Nachrichten erstellen

Um Nachrichten zu erstellen, müssen wir zunächst eine ProducerFactory konfigurieren, die die Strategie zum Erstellen von Kafka Producer- Instanzen festlegt .

Dann benötigen wir eine KafkaTemplate, die eine Producer- Instanz umschließt und bequeme Methoden zum Senden von Nachrichten an Kafka-Themen bereitstellt.

Produzenteninstanzen sind threadsicher und daher führt die Verwendung einer einzelnen Instanz im gesamten Anwendungskontext zu einer höheren Leistung. Folglich sind KakfaTemplate- Instanzen auch threadsicher und die Verwendung einer Instanz wird empfohlen.

4.1. Herstellerkonfiguration

@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } }

4.2. Nachrichten veröffentlichen

Wir können Nachrichten mit der KafkaTemplate- Klasse senden :

@Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String msg) { kafkaTemplate.send(topicName, msg); }

Die Sende- API gibt ein ListenableFuture- Objekt zurück. Wenn wir den sendenden Thread blockieren und das Ergebnis der gesendeten Nachricht abrufen möchten, können wir die get- API des ListenableFuture- Objekts aufrufen . Der Thread wartet auf das Ergebnis, verlangsamt jedoch den Produzenten.

Kafka ist eine Plattform für die schnelle Stream-Verarbeitung. Es ist daher besser, die Ergebnisse asynchron zu behandeln, damit die nachfolgenden Nachrichten nicht auf das Ergebnis der vorherigen Nachricht warten. Wir können dies durch einen Rückruf tun:

public void sendMessage(String message) { ListenableFuture
    
      future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback
     
      () { @Override public void onSuccess(SendResult result) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } }); }
     
    

5. Nachrichten konsumieren

5.1. Verbraucherkonfiguration

Zum Konsumieren von Nachrichten müssen wir eine ConsumerFactory und eine KafkaListenerContainerFactory konfigurieren . Sobald diese Beans in der Spring Bean Factory verfügbar sind, können POJO-basierte Konsumenten mithilfe der @ KafkaListener- Annotation konfiguriert werden .

Die @ EnableKafka- Annotation ist für die Konfigurationsklasse erforderlich, um die Erkennung der @ KafkaListener- Annotation für im Frühjahr verwaltete Beans zu ermöglichen:

@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); return factory; } }

5.2. Nachrichten konsumieren

@KafkaListener(topics = "topicName", groupId = "foo") public void listenGroupFoo(String message) { System.out.println("Received Message in group foo: " + message); }

Für ein Thema können mehrere Listener mit jeweils einer anderen Gruppen-ID implementiert werden . Darüber hinaus kann ein Verbraucher auf Nachrichten zu verschiedenen Themen warten:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring unterstützt auch das Abrufen eines oder mehrerer Nachrichtenkopfzeilen mithilfe der Annotation @Header im Listener:

@KafkaListener(topics = "topicName") public void listenWithHeaders( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

5.3. Nachrichten von einer bestimmten Partition verbrauchen

Wie Sie vielleicht bemerkt haben, haben wir das Thema baeldung mit nur einer Partition erstellt. Für ein Thema mit mehreren Partitionen kann ein @KafkaListener jedoch explizit eine bestimmte Partition eines Themas mit einem anfänglichen Versatz abonnieren:

@KafkaListener( topicPartitions = @TopicPartition(topic = "topicName", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "3", initialOffset = "0")}), containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }

Da das initialOffset in diesem Listener an 0 gesendet wurde, werden alle zuvor verwendeten Nachrichten von den Partitionen 0 und 3 bei jeder Initialisierung dieses Listeners erneut verwendet. Wenn das Festlegen des Versatzes nicht erforderlich ist, können Sie die Partitionseigenschaft der Annotation @TopicPartition verwenden , um nur die Partitionen ohne den Versatz festzulegen :

@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. Hinzufügen eines Nachrichtenfilters für Listener

Listener können durch Hinzufügen eines benutzerdefinierten Filters so konfiguriert werden, dass bestimmte Nachrichtentypen verwendet werden. Dies kann durch Festlegen einer RecordFilterStrategy auf die KafkaListenerContainerFactory erfolgen :

@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains("World")); return factory; }

Ein Listener kann dann so konfiguriert werden, dass er diese Container-Factory verwendet:

@KafkaListener( topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter(String message) { System.out.println("Received Message in filtered listener: " + message); }

In diesem Listener werden alle Nachrichten, die dem Filter entsprechen, verworfen .

6. Benutzerdefinierte Nachrichtenkonverter

Bisher haben wir nur das Senden und Empfangen von Strings als Nachrichten behandelt. Wir können jedoch auch benutzerdefinierte Java-Objekte senden und empfangen. Dies erfordert die Konfiguration des entsprechenden Serializers in ProducerFactory und des Deserializers in ConsumerFactory .

Schauen wir uns eine einfache Bean-Klasse an , die wir als Nachrichten senden werden:

public class Greeting { private String msg; private String name; // standard getters, setters and constructor }

6.1. Benutzerdefinierte Nachrichten erstellen

In this example, we will use JsonSerializer. Let's look at the code for ProducerFactory and KafkaTemplate:

@Bean public ProducerFactory greetingProducerFactory() { // ... configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory(configProps); } @Bean public KafkaTemplate greetingKafkaTemplate() { return new KafkaTemplate(greetingProducerFactory()); }

This new KafkaTemplate can be used to send the Greeting message:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. Consuming Custom Messages

Similarly, let's modify the ConsumerFactory and KafkaListenerContainerFactory to deserialize the Greeting message correctly:

@Bean public ConsumerFactory greetingConsumerFactory() { // ... return new DefaultKafkaConsumerFactory( props, new StringDeserializer(), new JsonDeserializer(Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; }

The spring-kafka JSON serializer and deserializer uses the Jackson library which is also an optional maven dependency for the spring-kafka project. So let's add it to our pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

Anstatt die neueste Version von Jackson zu verwenden, wird empfohlen, die Version zu verwenden, die der pom.xml von spring-kafka hinzugefügt wird.

Schließlich müssen wir einen Listener schreiben, um Begrüßungsnachrichten zu konsumieren :

@KafkaListener( topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener(Greeting greeting) { // process greeting message }

7. Fazit

In diesem Artikel haben wir die Grundlagen der Spring-Unterstützung für Apache Kafka behandelt. Wir haben uns kurz die Klassen angesehen, die zum Senden und Empfangen von Nachrichten verwendet werden.

Den vollständigen Quellcode für diesen Artikel finden Sie auf GitHub. Stellen Sie vor dem Ausführen des Codes sicher, dass der Kafka-Server ausgeführt wird und die Themen manuell erstellt werden.

Persistenz unten

Ich habe gerade den neuen Learn Spring- Kurs angekündigt , der sich auf die Grundlagen von Spring 5 und Spring Boot 2 konzentriert:

>> Überprüfen Sie den Kurs