Aufbau einer Datenpipeline mit Flink und Kafka

1. Übersicht

Apache Flink ist ein Stream-Verarbeitungsframework, das problemlos mit Java verwendet werden kann. Apache Kafka ist ein verteiltes Stream-Verarbeitungssystem, das eine hohe Fehlertoleranz unterstützt.

In diesem Tutorial werden wir uns ansehen, wie Sie mit diesen beiden Technologien eine Datenpipeline erstellen.

2. Installation

Informationen zum Installieren und Konfigurieren von Apache Kafka finden Sie im offiziellen Handbuch. Nach der Installation können wir die folgenden Befehle verwenden, um die neuen Themen mit den Namen flink_input und flink_output zu erstellen :

 bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic flink_output bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic flink_input

Für dieses Tutorial verwenden wir die Standardkonfiguration und die Standardports für Apache Kafka.

3. Flink-Nutzung

Apache Flink ermöglicht eine Echtzeit-Stream-Verarbeitungstechnologie. Das Framework ermöglicht die Verwendung mehrerer Systeme von Drittanbietern als Stream-Quellen oder -Senken .

In Flink stehen verschiedene Anschlüsse zur Verfügung:

  • Apache Kafka (Quelle / Senke)
  • Apache Cassandra (Waschbecken)
  • Amazon Kinesis Streams (Quelle / Senke)
  • Elasticsearch (Spüle)
  • Hadoop FileSystem (Senke)
  • RabbitMQ (Quelle / Senke)
  • Apache NiFi (Quelle / Senke)
  • Twitter Streaming API (Quelle)

Um Flink zu unserem Projekt hinzuzufügen, müssen die folgenden Maven-Abhängigkeiten berücksichtigt werden:

 org.apache.flink flink-core 1.5.0   org.apache.flink flink-connector-kafka-0.11_2.11 1.5.0 

Durch Hinzufügen dieser Abhängigkeiten können wir zu und von Kafka-Themen konsumieren und produzieren. Sie finden die aktuelle Version von Flink auf Maven Central.

4. Kafka String Consumer

Um Daten von Kafka mit Flink zu konsumieren, müssen wir ein Thema und eine Kafka-Adresse angeben. Wir sollten auch eine Gruppen-ID angeben, die zum Halten von Offsets verwendet wird, damit wir nicht immer die gesamten Daten von Anfang an lesen.

Erstellen wir eine statische Methode, die die Erstellung von FlinkKafkaConsumer erleichtert:

public static FlinkKafkaConsumer011 createStringConsumerForTopic( String topic, String kafkaAddress, String kafkaGroup ) { Properties props = new Properties(); props.setProperty("bootstrap.servers", kafkaAddress); props.setProperty("group.id",kafkaGroup); FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011( topic, new SimpleStringSchema(), props); return consumer; }

Diese Methode verwendet ein Thema, kafkaAddress, und kafkaGroup und schafft die FlinkKafkaConsumer , die Daten aus bestimmten Thema als verbrauchen String da wir verwendet haben SimpleStringSchema zu dekodieren Daten.

Die Nummer 011 im Namen der Klasse bezieht sich auf die Kafka-Version.

5. Kafka String Producer

Um Daten für Kafka zu erstellen, müssen wir die Kafka-Adresse und das Thema angeben, die wir verwenden möchten. Auch hier können wir eine statische Methode erstellen, mit deren Hilfe wir Produzenten für verschiedene Themen erstellen können:

public static FlinkKafkaProducer011 createStringProducer( String topic, String kafkaAddress){ return new FlinkKafkaProducer011(kafkaAddress, topic, new SimpleStringSchema()); }

Diese Methode verwendet nur das Thema und kafkaAddress als Argumente, da bei der Erstellung des Kafka-Themas keine Gruppen-ID angegeben werden muss.

6. String Stream-Verarbeitung

Wenn wir einen voll funktionsfähigen Verbraucher und Produzenten haben, können wir versuchen, Daten von Kafka zu verarbeiten und unsere Ergebnisse dann wieder in Kafka zu speichern. Die vollständige Liste der Funktionen, die für die Stream-Verarbeitung verwendet werden können, finden Sie hier.

In diesem Beispiel werden wir Wörter in jedem Kafka-Eintrag groß schreiben und sie dann an Kafka zurückschreiben.

Zu diesem Zweck müssen wir eine benutzerdefinierte MapFunction erstellen :

public class WordsCapitalizer implements MapFunction { @Override public String map(String s) { return s.toUpperCase(); } }

Nach dem Erstellen der Funktion können wir sie in der Stream-Verarbeitung verwenden:

public static void capitalize() { String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String address = "localhost:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment .getExecutionEnvironment(); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic( inputTopic, address, consumerGroup); DataStream stringInputStream = environment .addSource(flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer( outputTopic, address); stringInputStream .map(new WordsCapitalizer()) .addSink(flinkKafkaProducer); }

Die Anwendung liest Daten aus dem Thema flink_input , führt Operationen für den Stream aus und speichert die Ergebnisse im Thema flink_output in Kafka.

Wir haben gesehen, wie man mit Flink und Kafka mit Strings umgeht. Oft ist es jedoch erforderlich, Operationen an benutzerdefinierten Objekten auszuführen. Wir werden in den nächsten Kapiteln sehen, wie das geht.

7. Benutzerdefinierte Objektdeserialisierung

Die folgende Klasse stellt eine einfache Nachricht mit Informationen zu Absender und Empfänger dar:

@JsonSerialize public class InputMessage { String sender; String recipient; LocalDateTime sentAt; String message; }

Früher haben wir SimpleStringSchema verwendet, um Nachrichten von Kafka zu deserialisieren. Jetzt möchten wir Daten direkt in benutzerdefinierte Objekte deserialisieren .

Dazu benötigen wir ein benutzerdefiniertes DeserializationSchema:

public class InputMessageDeserializationSchema implements DeserializationSchema { static ObjectMapper objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()); @Override public InputMessage deserialize(byte[] bytes) throws IOException { return objectMapper.readValue(bytes, InputMessage.class); } @Override public boolean isEndOfStream(InputMessage inputMessage) { return false; } @Override public TypeInformation getProducedType() { return TypeInformation.of(InputMessage.class); } }

Wir gehen hier davon aus, dass die Nachrichten als JSON in Kafka gehalten werden.

Da wir ein Feld vom Typ LocalDateTime haben , müssen wir das JavaTimeModule angeben , das die Zuordnung von LocalDateTime- Objekten zu JSON übernimmt .

Flink-Schemas können keine Felder enthalten, die nicht serialisierbar sind, da alle Operatoren (wie Schemas oder Funktionen) zu Beginn des Jobs serialisiert werden.

Es gibt ähnliche Probleme in Apache Spark. Eine der bekannten Korrekturen für dieses Problem ist das Initialisieren von Feldern als statisch , wie wir es oben mit ObjectMapper getan haben . Es ist nicht die schönste Lösung, aber es ist relativ einfach und macht den Job.

Die Methode isEndOfStream kann für den Sonderfall verwendet werden, in dem Streams nur verarbeitet werden sollen, bis bestimmte Daten empfangen werden. Aber es wird in unserem Fall nicht benötigt.

8. Benutzerdefinierte Objektserialisierung

Nehmen wir nun an, dass unser System die Möglichkeit haben soll, eine Sicherung von Nachrichten zu erstellen. Wir möchten, dass der Prozess automatisch abläuft und jede Sicherung aus Nachrichten besteht, die während eines ganzen Tages gesendet werden.

Außerdem sollte einer Sicherungsnachricht eine eindeutige ID zugewiesen werden.

Zu diesem Zweck können wir die folgende Klasse erstellen:

public class Backup { @JsonProperty("inputMessages") List inputMessages; @JsonProperty("backupTimestamp") LocalDateTime backupTimestamp; @JsonProperty("uuid") UUID uuid; public Backup(List inputMessages, LocalDateTime backupTimestamp) { this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID(); } }

Bitte beachten Sie, dass der UUID-Generierungsmechanismus nicht perfekt ist, da er Duplikate zulässt. Dies reicht jedoch für den Umfang dieses Beispiels aus.

Wir möchten unser Backup- Objekt als JSON in Kafka speichern , daher müssen wir unser SerializationSchema erstellen :

public class BackupSerializationSchema implements SerializationSchema { ObjectMapper objectMapper; Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class); @Override public byte[] serialize(Backup backupMessage) { if(objectMapper == null) { objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()); } try { return objectMapper.writeValueAsString(backupMessage).getBytes(); } catch (com.fasterxml.jackson.core.JsonProcessingException e) { logger.error("Failed to parse JSON", e); } return new byte[0]; } }

9. Timestamping Messages

Since we want to create a backup for all messages of each day, messages need a timestamp.

Flink provides the three different time characteristics EventTime, ProcessingTime, and IngestionTime.

In our case, we need to use the time at which the message has been sent, so we'll use EventTime.

To use EventTimewe need a TimestampAssigner which will extract timestamps from our input data:

public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks { @Override public long extractTimestamp(InputMessage element, long previousElementTimestamp) { ZoneId zoneId = ZoneId.systemDefault(); return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000; } @Nullable @Override public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) { return new Watermark(extractedTimestamp - 1500); } }

We need to transform our LocalDateTime to EpochSecond as this is the format expected by Flink. After assigning timestamps, all time-based operations will use time from sentAt field to operate.

Since Flink expects timestamps to be in milliseconds and toEpochSecond() returns time in seconds we needed to multiply it by 1000, so Flink will create windows correctly.

Flink defines the concept of a Watermark. Watermarks are useful in case of data that don't arrive in the order they were sent. A watermark defines the maximum lateness that is allowed for elements to be processed.

Elements that have timestamps lower than the watermark won't be processed at all.

10. Creating Time Windows

To assure that our backup gathers only messages sent during one day, we can use the timeWindowAll method on the stream, which will split messages into windows.

Wir müssen jedoch weiterhin Nachrichten aus jedem Fenster aggregieren und als Backup zurückgeben .

Dazu benötigen wir eine benutzerdefinierte AggregateFunction :

public class BackupAggregator implements AggregateFunction
    
      { @Override public List createAccumulator() { return new ArrayList(); } @Override public List add( InputMessage inputMessage, List inputMessages) { inputMessages.add(inputMessage); return inputMessages; } @Override public Backup getResult(List inputMessages) { return new Backup(inputMessages, LocalDateTime.now()); } @Override public List merge(List inputMessages, List acc1) { inputMessages.addAll(acc1); return inputMessages; } }
    

11. Aggregieren von Backups

Nachdem wir die richtigen Zeitstempel zugewiesen und unsere AggregateFunction implementiert haben , können wir endlich unsere Kafka-Eingabe nehmen und verarbeiten:

public static void createBackup () throws Exception { String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest(); flinkKafkaConsumer.assignTimestampsAndWatermarks( new InputMessageTimestampAssigner()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource(flinkKafkaConsumer); inputMessagesStream .timeWindowAll(Time.hours(24)) .aggregate(new BackupAggregator()) .addSink(flinkKafkaProducer); environment.execute(); }

12. Schlussfolgerung

In diesem Artikel haben wir vorgestellt, wie Sie mit Apache Flink und Apache Kafka eine einfache Datenpipeline erstellen.

Wie immer ist der Code auf Github zu finden.