Einführung in Apache Flink mit Java

1. Übersicht

Apache Flink ist ein Big Data-Verarbeitungsframework, mit dem Programmierer die große Datenmenge auf sehr effiziente und skalierbare Weise verarbeiten können.

In diesem Artikel werden einige der wichtigsten API-Konzepte und Standard-Datentransformationen vorgestellt, die in der Apache Flink Java-API verfügbar sind . Der fließende Stil dieser API erleichtert die Arbeit mit dem zentralen Konstrukt von Flink - der verteilten Sammlung.

Zunächst werden wir uns die DataSet- API-Transformationen von Flink ansehen und sie zur Implementierung eines Wortzählprogramms verwenden. Anschließend werfen wir einen kurzen Blick auf die DataStream- API von Flink , mit der Sie Ereignisströme in Echtzeit verarbeiten können.

2. Maven-Abhängigkeit

Um zu beginnen, müssen wir Maven-Abhängigkeiten zu den Bibliotheken flink-java und flink-test-utils hinzufügen :

 org.apache.flink flink-java 1.2.0   org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Kern-API-Konzepte

Wenn wir mit Flink arbeiten, müssen wir einige Dinge wissen, die mit seiner API zusammenhängen:

  • Jedes Flink-Programm führt Transformationen für verteilte Datensammlungen durch. Es stehen verschiedene Funktionen zum Transformieren von Daten zur Verfügung, darunter Filtern, Zuordnen, Verbinden, Gruppieren und Aggregieren
  • Eine Senkenoperation in Flink löst die Ausführung eines Streams aus, um das gewünschte Ergebnis des Programms zu erzielen , z. B. das Speichern im Dateisystem oder das Drucken in der Standardausgabe
  • Flink-Transformationen sind faul, was bedeutet, dass sie erst ausgeführt werden, wenn eine Senkenoperation aufgerufen wird
  • Die Apache Flink-API unterstützt zwei Betriebsmodi: Batch und Echtzeit. Wenn Sie mit einer begrenzten Datenquelle arbeiten, die im Stapelmodus verarbeitet werden kann, verwenden Sie die DataSet- API. Wenn Sie unbegrenzte Datenströme in Echtzeit verarbeiten möchten, müssen Sie die DataStream- API verwenden

4. DataSet-API-Transformationen

Der Einstiegspunkt in das Flink-Programm ist eine Instanz der ExecutionEnvironment- Klasse - dies definiert den Kontext, in dem ein Programm ausgeführt wird.

Erstellen wir eine ExecutionEnvironment , um unsere Verarbeitung zu starten:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Beachten Sie, dass beim Starten der Anwendung auf dem lokalen Computer die Verarbeitung auf der lokalen JVM ausgeführt wird. Wenn Sie die Verarbeitung auf einem Cluster von Computern starten möchten, müssen Sie Apache Flink auf diesen Computern installieren und die ExecutionEnvironment entsprechend konfigurieren .

4.1. Erstellen eines DataSet

Um Datentransformationen durchführen zu können, müssen wir unser Programm mit den Daten versorgen.

Erstellen wir eine Instanz der DataSet- Klasse mit unserer ExecutionEnvironement :

DataSet amounts = env.fromElements(1, 29, 40, 50);

Sie können ein DataSet aus mehreren Quellen erstellen , z. B. aus Apache Kafka, einer CSV-Datei, einer Datei oder praktisch jeder anderen Datenquelle.

4.2. Filtern und reduzieren

Sobald Sie eine Instanz der DataSet- Klasse erstellt haben, können Sie Transformationen darauf anwenden.

Angenommen, Sie möchten Zahlen filtern, die über einem bestimmten Schwellenwert liegen, und als nächstes alle summieren . Sie können die Transformationen filter () und redu () verwenden , um dies zu erreichen:

int threshold = 30; List collect = amounts .filter(a -> a > threshold) .reduce((integer, t1) -> integer + t1) .collect(); assertThat(collect.get(0)).isEqualTo(90); 

Beachten Sie, dass die collect () -Methode eine Senkenoperation ist, die die eigentlichen Datentransformationen auslöst.

4.3. Karte

Angenommen, Sie haben ein DataSet mit Personenobjekten :

private static class Person { private int age; private String name; // standard constructors/getters/setters }

Als Nächstes erstellen wir ein DataSet dieser Objekte:

DataSet personDataSource = env.fromCollection( Arrays.asList( new Person(23, "Tom"), new Person(75, "Michael")));

Nehmen wir an, dass Sie nur das entnehmende Alter Feld von jedem Objekt der Sammlung. Mit der map () -Transformation können Sie nur ein bestimmtes Feld der Person- Klasse abrufen :

List ages = personDataSource .map(p -> p.age) .collect(); assertThat(ages).hasSize(2); assertThat(ages).contains(23, 75);

4.4. Beitreten

Wenn Sie zwei Datensätze haben, möchten Sie diese möglicherweise in einem ID- Feld verknüpfen . Hierfür können Sie die join () -Transformation verwenden.

Erstellen wir Sammlungen von Transaktionen und Adressen eines Benutzers:

Tuple3 address = new Tuple3(1, "5th Avenue", "London"); DataSet
    
      addresses = env.fromElements(address); Tuple2 firstTransaction = new Tuple2(1, "Transaction_1"); DataSet
     
       transactions = env.fromElements(firstTransaction, new Tuple2(12, "Transaction_2")); 
     
    

Das erste Feld in beiden Tupeln ist vom Typ Integer , und dies ist ein ID- Feld, in dem beide Datensätze verknüpft werden sollen.

Um die eigentliche Verbindungslogik auszuführen, müssen wir eine KeySelector- Schnittstelle für Adresse und Transaktion implementieren :

private static class IdKeySelectorTransaction implements KeySelector
    
      { @Override public Integer getKey(Tuple2 value) { return value.f0; } } private static class IdKeySelectorAddress implements KeySelector
     
       { @Override public Integer getKey(Tuple3 value) { return value.f0; } }
     
    

Jeder Selektor gibt nur das Feld zurück, für das der Join ausgeführt werden soll.

Leider ist es hier nicht möglich, Lambda-Ausdrücke zu verwenden, da Flink generische Typinformationen benötigt.

Als nächstes implementieren wir die Zusammenführungslogik mit diesen Selektoren:

List
    
     > joined = transactions.join(addresses) .where(new IdKeySelectorTransaction()) .equalTo(new IdKeySelectorAddress()) .collect(); assertThat(joined).hasSize(1); assertThat(joined).contains(new Tuple2(firstTransaction, address)); 
    

4.5. Sortieren

Angenommen , Sie haben die folgende Sammlung von Tuple2:

Tuple2 secondPerson = new Tuple2(4, "Tom"); Tuple2 thirdPerson = new Tuple2(5, "Scott"); Tuple2 fourthPerson = new Tuple2(200, "Michael"); Tuple2 firstPerson = new Tuple2(1, "Jack"); DataSet
    
      transactions = env.fromElements( fourthPerson, secondPerson, thirdPerson, firstPerson); 
    

Wenn Sie diese Sammlung nach dem ersten Feld des Tupels sortieren möchten, können Sie die Transformation sortPartitions () verwenden :

List
    
      sorted = transactions .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING) .collect(); assertThat(sorted) .containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
    

5. Wortzahl

Das Problem der Wortanzahl wird häufig verwendet, um die Funktionen von Big Data-Verarbeitungsframeworks zu demonstrieren. Die grundlegende Lösung besteht darin, das Auftreten von Wörtern in einer Texteingabe zu zählen. Verwenden wir Flink, um eine Lösung für dieses Problem zu implementieren.

Als ersten Schritt unserer Lösung erstellen wir eine LineSplitter- Klasse, die unsere Eingabe in Token (Wörter) aufteilt und für jedes Token ein Tupel2 von Schlüssel-Wert-Paaren sammelt . In jedem dieser Tupel ist der Schlüssel ein im Text gefundenes Wort, und der Wert ist die Ganzzahl (1).

This class implements the FlatMapFunction interface that takes String as an input and produces a Tuple2:

public class LineSplitter implements FlatMapFunction
    
      { @Override public void flatMap(String value, Collector
     
       out) { Stream.of(value.toLowerCase().split("\\W+")) .filter(t -> t.length() > 0) .forEach(token -> out.collect(new Tuple2(token, 1))); } }
     
    

We call the collect() method on the Collector class to push data forward in the processing pipeline.

Our next and final step is to group the tuples by their first elements (words) and then perform a sum aggregate on the second elements to produce a count of the word occurrences:

public static DataSet
    
      startWordCount( ExecutionEnvironment env, List lines) throws Exception { DataSet text = env.fromCollection(lines); return text.flatMap(new LineSplitter()) .groupBy(0) .aggregate(Aggregations.SUM, 1); }
    

We are using three types of the Flink transformations: flatMap(), groupBy(), and aggregate().

Let's write a test to assert that the word count implementation is working as expected:

List lines = Arrays.asList( "This is a first sentence", "This is a second sentence with a one word"); DataSet
    
      result = WordCount.startWordCount(env, lines); List
     
       collect = result.collect(); assertThat(collect).containsExactlyInAnyOrder( new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1), new Tuple2("is", 2), new Tuple2("this", 2), new Tuple2("second", 1), new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));
     
    

6. DataStream API

6.1. Creating a DataStream

Apache Flink also supports the processing of streams of events through its DataStream API. If we want to start consuming events, we first need to use the StreamExecutionEnvironment class:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

Next, we can create a stream of events using the executionEnvironment from a variety of sources. It could be some message bus like Apache Kafka, but in this example, we will simply create a source from a couple of string elements:

DataStream dataStream = executionEnvironment.fromElements( "This is a first sentence", "This is a second sentence with a one word");

We can apply transformations to every element of the DataStream like in the normal DataSet class:

SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);

To trigger the execution, we need to invoke a sink operation such as print() that will just print the result of transformations to the standard output, following with the execute() method on the StreamExecutionEnvironment class:

upperCase.print(); env.execute();

It will produce the following output:

1> THIS IS A FIRST SENTENCE 2> THIS IS A SECOND SENTENCE WITH A ONE WORD

6.2. Windowing of Events

When processing a stream of events in real time, you may sometimes need to group events together and apply some computation on a window of those events.

Suppose we have a stream of events, where each event is a pair consisting of the event number and the timestamp when the event was sent to our system, and that we can tolerate events that are out-of-order but only if they are no more than twenty seconds late.

For this example, let's first create a stream simulating two events that are several minutes apart and define a timestamp extractor that specifies our lateness threshold:

SingleOutputStreamOperator
    
      windowed = env.fromElements( new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor 
     
      (Time.seconds(20)) { @Override public long extractTimestamp(Tuple2 element) { return element.f1 * 1000; } });
     
    

Next, let's define a window operation to group our events into five-second windows and apply a transformation on those events:

SingleOutputStreamOperator
    
      reduced = windowed .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .maxBy(0, true); reduced.print();
    

It will get the last element of every five-second window, so it prints out:

1> (15,1491221519)

Note that we do not see the second event because it arrived later than the specified lateness threshold.

7. Conclusion

In this article, we introduced the Apache Flink framework and looked at some of the transformations supplied with its API.

We implemented a word count program using Flink's fluent and functional DataSet API. Then we looked at the DataStream API and implemented a simple real-time transformation on a stream of events.

Die Implementierung all dieser Beispiele und Codefragmente finden Sie auf GitHub - dies ist ein Maven-Projekt, daher sollte es einfach zu importieren und auszuführen sein.