Einführung in KafkaStreams in Java

1. Übersicht

In diesem Artikel sehen wir uns die KafkaStreams- Bibliothek an.

KafkaStreams wurde von den Entwicklern von Apache Kafka entwickelt . Das Hauptziel dieser Software ist es, Programmierern die Erstellung effizienter Streaming-Anwendungen in Echtzeit zu ermöglichen, die als Microservices fungieren können.

Mit KafkaStreams können wir Kafka-Themen verwenden, Daten analysieren oder transformieren und möglicherweise an ein anderes Kafka-Thema senden.

Um KafkaStreams zu demonstrieren , erstellen wir eine einfache Anwendung, die Sätze aus einem Thema liest, das Auftreten von Wörtern zählt und die Anzahl pro Wort druckt.

Es ist wichtig zu beachten, dass die KafkaStreams- Bibliothek nicht reaktiv ist und keine Unterstützung für asynchrone Vorgänge und die Behandlung von Gegendruck bietet.

2. Maven-Abhängigkeit

Um mit dem Schreiben der Stream-Verarbeitungslogik mit KafkaStreams zu beginnen, müssen wir Kafka-Streams und Kafka-Clients eine Abhängigkeit hinzufügen :

 org.apache.kafka kafka-streams 1.0.0   org.apache.kafka kafka-clients 1.0.0  

Wir müssen auch Apache Kafka installiert und gestartet haben, da wir ein Kafka-Thema verwenden werden. Dieses Thema ist die Datenquelle für unseren Streaming-Job.

Wir können Kafka und andere erforderliche Abhängigkeiten von der offiziellen Website herunterladen.

3. Konfigurieren der KafkaStreams-Eingabe

Das erste, was wir tun werden, ist die Definition des Eingabe-Kafka-Themas.

Wir können das heruntergeladene Confluent- Tool verwenden - es enthält einen Kafka-Server. Es enthält auch den Kafka-Konsolen-Produzenten , mit dem wir Nachrichten an Kafka veröffentlichen können.

Lassen Sie uns zunächst unseren Kafka-Cluster ausführen:

./confluent start

Sobald Kafka gestartet ist, können wir unsere Datenquelle und den Namen unserer Anwendung mit APPLICATION_ID_CONFIG definieren :

String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties(); streamsConfiguration.put( StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

Ein entscheidender Konfigurationsparameter ist BOOTSTRAP_SERVER_CONFIG. Dies ist die URL zu unserer lokalen Kafka-Instanz, die wir gerade gestartet haben:

private String bootstrapServers = "localhost:9092"; streamsConfiguration.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Als Nächstes müssen wir den Typ des Schlüssels und den Wert der Nachrichten übergeben, die von inputTopic verwendet werden:

streamsConfiguration.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

Die Stream-Verarbeitung ist häufig zustandsbehaftet. Wenn wir Zwischenergebnisse speichern möchten, müssen wir den Parameter STATE_DIR_CONFIG angeben .

In unserem Test verwenden wir ein lokales Dateisystem:

streamsConfiguration.put( StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); 

4. Erstellen einer Streaming-Topologie

Sobald wir unser Eingabethema definiert haben, können wir eine Streaming-Topologie erstellen. Dies ist eine Definition, wie Ereignisse behandelt und transformiert werden sollen.

In unserem Beispiel möchten wir einen Wortzähler implementieren. Für jeden an inputTopic gesendeten Satz möchten wir ihn in Wörter aufteilen und das Vorkommen jedes Wortes berechnen.

Wir können eine Instanz der KStreamsBuilder- Klasse verwenden, um mit der Erstellung unserer Topologie zu beginnen:

KStreamBuilder builder = new KStreamBuilder(); KStream textLines = builder.stream(inputTopic); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) .groupBy((key, word) -> word) .count();

Um die Wortzahl zu implementieren, müssen wir zunächst die Werte mithilfe des regulären Ausdrucks aufteilen.

The split method is returning an array. We're using the flatMapValues() to flatten it. Otherwise, we'd end up with a list of arrays, and it'd be inconvenient to write code using such structure.

Finally, we're aggregating the values for every word and calling the count() that will calculate occurrences of a specific word.

5. Handling Results

We already calculated the word count of our input messages. Now let's print the results on the standard output using the foreach() method:

wordCounts .foreach((w, c) -> System.out.println("word: " + w + " -> " + c));

On production, often such streaming job might publish the output to another Kafka topic.

We could do this using the to() method:

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String(); Serde longSerde = Serdes.Long(); wordCounts.to(stringSerde, longSerde, outputTopic);

The Serde class gives us preconfigured serializers for Java types that will be used to serialize objects to an array of bytes. The array of bytes will then be sent to the Kafka topic.

We're using String as a key to our topic and Long as a value for the actual count. The to() method will save the resulting data to outputTopic.

6. Starting KafkaStream Job

Up to this point, we built a topology that can be executed. However, the job hasn't started yet.

We need to start our job explicitly by calling the start() method on the KafkaStreams instance:

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); Thread.sleep(30000); streams.close();

Note that we are waiting 30 seconds for the job to finish. In a real-world scenario, that job would be running all the time, processing events from Kafka as they arrive.

We can test our job by publishing some events to our Kafka topic.

Let's start a kafka-console-producer and manually send some events to our inputTopic:

./kafka-console-producer --topic inputTopic --broker-list localhost:9092 >"this is a pony" >"this is a horse and pony" 

This way, we published two events to Kafka. Our application will consume those events and will print the following output:

word: -> 1 word: this -> 1 word: is -> 1 word: a -> 1 word: pony -> 1 word: -> 2 word: this -> 2 word: is -> 2 word: a -> 2 word: horse -> 1 word: and -> 1 word: pony -> 2

We can see that when the first message arrived, the word pony occurred only once. But when we sent the second message, the word pony happened for the second time printing: “word: pony -> 2″.

6. Conclusion

In diesem Artikel wird erläutert, wie Sie eine primäre Stream-Verarbeitungsanwendung mit Apache Kafka als Datenquelle und der KafkaStreams- Bibliothek als Stream-Verarbeitungsbibliothek erstellen .

Alle diese Beispiele und Codefragmente finden Sie im GitHub-Projekt - dies ist ein Maven-Projekt, daher sollte es einfach zu importieren und auszuführen sein, wie es ist.