Aufbau einer Datenpipeline mit Kafka, Spark Streaming und Cassandra

1. Übersicht

Apache Kafka ist eine skalierbare, leistungsstarke Plattform mit geringer Latenz, die das Lesen und Schreiben von Datenströmen wie ein Messagingsystem ermöglicht . Wir können ziemlich einfach mit Kafka in Java beginnen.

Spark Streaming ist Teil der Apache Spark-Plattform, die eine skalierbare, fehlertolerante Verarbeitung von Datenströmen mit hohem Durchsatz ermöglicht . Obwohl in Scala geschrieben, bietet Spark Java-APIs zum Arbeiten.

Apache Cassandra ist ein verteilter und breitspaltiger NoSQL-Datenspeicher . Weitere Details zu Cassandra finden Sie in unserem vorherigen Artikel.

In diesem Tutorial werden diese kombiniert, um eine hoch skalierbare und fehlertolerante Datenpipeline für einen Echtzeitdatenstrom zu erstellen .

2. Installationen

Zu Beginn müssen Kafka, Spark und Cassandra lokal auf unserem Computer installiert sein, um die Anwendung auszuführen. Wir werden sehen, wie wir mit diesen Plattformen eine Datenpipeline entwickeln können.

Wir belassen jedoch alle Standardkonfigurationen einschließlich der Ports für alle Installationen, damit das Lernprogramm reibungslos ausgeführt werden kann.

2.1. Kafka

Die Installation von Kafka auf unserem lokalen Computer ist recht einfach und als Teil der offiziellen Dokumentation aufgeführt. Wir werden die Version 2.1.0 von Kafka verwenden.

Darüber hinaus muss Apache Zookeeper für Kafka ausgeführt werden. In diesem Lernprogramm wird jedoch die mit Kafka gepackte Zookeeper-Instanz mit einem Knoten verwendet.

Sobald es uns gelungen ist, Zookeeper und Kafka gemäß dem offiziellen Leitfaden lokal zu starten, können wir mit dem Erstellen unseres Themas mit dem Namen "Nachrichten" fortfahren:

 $KAFKA_HOME$\bin\windows\kafka-topics.bat --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic messages

Beachten Sie, dass das obige Skript für die Windows-Plattform gilt, es jedoch ähnliche Skripte auch für Unix-ähnliche Plattformen gibt.

2.2. Funke

Spark verwendet die Client-Bibliotheken von Hadoop für HDFS und YARN. Folglich kann es sehr schwierig sein, die kompatiblen Versionen von all diesen zusammenzustellen . Der offizielle Download von Spark ist jedoch mit gängigen Versionen von Hadoop vorinstalliert. Für dieses Tutorial verwenden wir das Paket "Version 2.3.0", das für Apache Hadoop 2.7 und höher vorgefertigt wurde.

Sobald das richtige Spark-Paket entpackt ist, können die verfügbaren Skripte zum Einreichen von Anträgen verwendet werden. Wir werden dies später sehen, wenn wir unsere Anwendung in Spring Boot entwickeln.

2.3. Kassandra

DataStax stellt eine Community-Edition von Cassandra für verschiedene Plattformen einschließlich Windows zur Verfügung. Wir können dies sehr einfach gemäß der offiziellen Dokumentation herunterladen und auf unserem lokalen Computer installieren. Wir werden Version 3.9.0 verwenden.

Sobald es uns gelungen ist, Cassandra auf unserem lokalen Computer zu installieren und zu starten, können wir mit der Erstellung unseres Schlüsselbereichs und unserer Tabelle fortfahren. Dies kann mit der CQL-Shell erfolgen, die mit unserer Installation geliefert wird:

CREATE KEYSPACE vocabulary WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; USE vocabulary; CREATE TABLE words (word text PRIMARY KEY, count int);

Beachten Sie, dass wir einen Namespace namens Vokabular und eine Tabelle namens Wörter mit zwei Spalten, Wort und Anzahl, erstellt haben .

3. Abhängigkeiten

Wir können Kafka- und Spark-Abhängigkeiten über Maven in unsere Anwendung integrieren. Wir werden diese Abhängigkeiten aus Maven Central ziehen:

  • Kernfunke
  • SQL Spark
  • Streaming Spark
  • Streaming von Kafka Spark
  • Cassandra Spark
  • Cassandra Java Spark

Und wir können sie unserem Pom entsprechend hinzufügen:

 org.apache.spark spark-core_2.11 2.3.0 provided   org.apache.spark spark-sql_2.11 2.3.0 provided   org.apache.spark spark-streaming_2.11 2.3.0 provided   org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.0   com.datastax.spark spark-cassandra-connector_2.11 2.3.0   com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2 

Beachten Sie, dass einige diese Abhängigkeiten gekennzeichnet sind , wie vorgesehen in ihrem Umfang. Dies liegt daran, dass diese von der Spark-Installation zur Verfügung gestellt werden, wo wir den Antrag zur Ausführung mit spark-submit einreichen.

4. Spark Streaming - Kafka-Integrationsstrategien

An dieser Stelle lohnt es sich, kurz auf die Integrationsstrategien für Spark und Kafka einzugehen.

Kafka introduced new consumer API between versions 0.8 and 0.10. Hence, the corresponding Spark Streaming packages are available for both the broker versions. It's important to choose the right package depending upon the broker available and features desired.

4.1. Spark Streaming Kafka 0.8

The 0.8 version is the stable integration API with options of using the Receiver-based or the Direct Approach. We'll not go into the details of these approaches which we can find in the official documentation. An important point to note here is that this package is compatible with Kafka Broker versions 0.8.2.1 or higher.

4.2. Spark Streaming Kafka 0.10

This is currently in an experimental state and is compatible with Kafka Broker versions 0.10.0 or higher only. This package offers the Direct Approach only, now making use of the new Kafka consumer API. We can find more details about this in the official documentation. Importantly, it is not backward compatible with older Kafka Broker versions.

Please note that for this tutorial, we'll make use of the 0.10 package. The dependency mentioned in the previous section refers to this only.

5. Developing a Data Pipeline

We'll create a simple application in Java using Spark which will integrate with the Kafka topic we created earlier. The application will read the messages as posted and count the frequency of words in every message. This will then be updated in the Cassandra table we created earlier.

Let's quickly visualize how the data will flow:

5.1. Getting JavaStreamingContext

Firstly, we'll begin by initializing the JavaStreamingContext which is the entry point for all Spark Streaming applications:

SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("WordCountingApp"); sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = new JavaStreamingContext( sparkConf, Durations.seconds(1));

5.2. Getting DStream from Kafka

Now, we can connect to the Kafka topic from the JavaStreamingContext:

Map kafkaParams = new HashMap(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("messages"); JavaInputDStream
    
      messages = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams));
    

Please note that we've to provide deserializers for key and value here. For common data types like String, the deserializer is available by default. However, if we wish to retrieve custom data types, we'll have to provide custom deserializers.

Here, we've obtained JavaInputDStream which is an implementation of Discretized Streams or DStreams, the basic abstraction provided by Spark Streaming. Internally DStreams is nothing but a continuous series of RDDs.

5.3. Processing Obtained DStream

We'll now perform a series of operations on the JavaInputDStream to obtain word frequencies in the messages:

JavaPairDStream results = messages .mapToPair( record -> new Tuple2(record.key(), record.value()) ); JavaDStream lines = results .map( tuple2 -> tuple2._2() ); JavaDStream words = lines .flatMap( x -> Arrays.asList(x.split("\\s+")).iterator() ); JavaPairDStream wordCounts = words .mapToPair( s -> new Tuple2(s, 1) ).reduceByKey( (i1, i2) -> i1 + i2 );

5.4. Persisting Processed DStream into Cassandra

Finally, we can iterate over the processed JavaPairDStream to insert them into our Cassandra table:

wordCounts.foreachRDD( javaRdd -> { Map wordCountMap = javaRdd.collectAsMap(); for (String key : wordCountMap.keySet()) { List wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); JavaRDD rdd = streamingContext.sparkContext().parallelize(wordList); javaFunctions(rdd).writerBuilder( "vocabulary", "words", mapToRow(Word.class)).saveToCassandra(); } } );

5.5. Running the Application

As this is a stream processing application, we would want to keep this running:

streamingContext.start(); streamingContext.awaitTermination();

6. Leveraging Checkpoints

In a stream processing application, it's often useful to retain state between batches of data being processed.

For example, in our previous attempt, we are only able to store the current frequency of the words. What if we want to store the cumulative frequency instead? Spark Streaming makes it possible through a concept called checkpoints.

We'll now modify the pipeline we created earlier to leverage checkpoints:

Please note that we'll be using checkpoints only for the session of data processing. This does not provide fault-tolerance. However, checkpointing can be used for fault tolerance as well.

There are a few changes we'll have to make in our application to leverage checkpoints. This includes providing the JavaStreamingContext with a checkpoint location:

streamingContext.checkpoint("./.checkpoint");

Here, we are using the local filesystem to store checkpoints. However, for robustness, this should be stored in a location like HDFS, S3 or Kafka. More on this is available in the official documentation.

Next, we'll have to fetch the checkpoint and create a cumulative count of words while processing every partition using a mapping function:

JavaMapWithStateDStream
    
      cumulativeWordCounts = wordCounts .mapWithState( StateSpec.function( (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2(word, sum); state.update(sum); return output; } ) );
    

Once we get the cumulative word counts, we can proceed to iterate and save them in Cassandra as before.

Please note that while data checkpointing is useful for stateful processing, it comes with a latency cost. Hence, it's necessary to use this wisely along with an optimal checkpointing interval.

7. Understanding Offsets

If we recall some of the Kafka parameters we set earlier:

kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false);

These basically mean that we don't want to auto-commit for the offset and would like to pick the latest offset every time a consumer group is initialized. Consequently, our application will only be able to consume messages posted during the period it is running.

If we want to consume all messages posted irrespective of whether the application was running or not and also want to keep track of the messages already posted, we'll have to configure the offset appropriately along with saving the offset state, though this is a bit out of scope for this tutorial.

This is also a way in which Spark Streaming offers a particular level of guarantee like “exactly once”. This basically means that each message posted on Kafka topic will only be processed exactly once by Spark Streaming.

8. Deploying Application

We can deploy our application using the Spark-submit script which comes pre-packed with the Spark installation:

$SPARK_HOME$\bin\spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local[2] \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Bitte beachten Sie, dass das Glas wir mit Maven erstellen sollten die Abhängigkeiten enthalten , die nicht gekennzeichnet sind , wie vorgesehen in ihrem Umfang.

Sobald wir diesen Antrag eingereicht und einige Nachrichten in dem zuvor erstellten Kafka-Thema veröffentlicht haben, sollten die kumulierten Wortzahlen in der zuvor erstellten Cassandra-Tabelle angezeigt werden.

9. Fazit

Zusammenfassend haben wir in diesem Tutorial gelernt, wie man mit Kafka, Spark Streaming und Cassandra eine einfache Datenpipeline erstellt. Wir haben auch gelernt, wie Sie Checkpoints in Spark Streaming nutzen, um den Status zwischen Stapeln aufrechtzuerhalten.

Wie immer ist der Code für die Beispiele auf GitHub verfügbar.