Einführung in Hazelcast Jet

1. Einleitung

In diesem Tutorial lernen wir Hazelcast Jet kennen. Es handelt sich um eine verteilte Datenverarbeitungs-Engine, die von Hazelcast, Inc. bereitgestellt wird und auf Hazelcast IMDG basiert.

Wenn Sie mehr über Hazelcast IMDG erfahren möchten, finden Sie hier einen Artikel für den Einstieg.

2. Was ist Hazelcast Jet?

Hazelcast Jet ist eine verteilte Datenverarbeitungs-Engine, die Daten als Streams behandelt. Es kann Daten verarbeiten, die in einer Datenbank oder in Dateien gespeichert sind, sowie Daten, die von einem Kafka-Server gestreamt werden.

Darüber hinaus kann es Aggregatfunktionen über unendliche Datenströme ausführen, indem die Streams in Teilmengen unterteilt und eine Aggregation über jede Teilmenge angewendet wird. Dieses Konzept wird in der Jet-Terminologie als Fensterung bezeichnet.

Wir können Jet in einem Cluster von Computern bereitstellen und dann unsere Datenverarbeitungsaufträge an ihn senden. Jet lässt alle Mitglieder des Clusters die Daten automatisch verarbeiten. Jedes Mitglied des Clusters verbraucht einen Teil der Daten, und das macht es einfach, auf jede Durchsatzstufe zu skalieren.

Hier sind die typischen Anwendungsfälle für Hazelcast Jet:

  • Echtzeit-Stream-Verarbeitung
  • Schnelle Stapelverarbeitung
  • Verteilte Verarbeitung von Java 8-Streams
  • Datenverarbeitung in Microservices

3. Setup

Um Hazelcast Jet in unserer Umgebung einzurichten, müssen wir unserer pom.xml nur eine einzige Maven-Abhängigkeit hinzufügen .

So machen wir das:

 com.hazelcast.jet hazelcast-jet 4.2 

Durch das Einbeziehen dieser Abhängigkeit wird eine 10-MB-JAR-Datei heruntergeladen, die uns die gesamte Infrastruktur bietet, die wir zum Erstellen einer verteilten Datenverarbeitungspipeline benötigen.

Die neueste Version für Hazelcast Jet finden Sie hier.

4. Beispielanwendung

Um mehr über Hazelcast Jet zu erfahren, erstellen wir eine Beispielanwendung, die eine Eingabe von Sätzen und ein Wort verwendet, um diese Sätze zu finden, und die Anzahl der angegebenen Wörter in diesen Sätzen zurückgibt.

4.1. Die Pipeline

Eine Pipeline bildet das Grundkonstrukt für eine Jet-Anwendung. Die Verarbeitung innerhalb einer Pipeline erfolgt folgendermaßen:

  • Daten aus einer Quelle lesen
  • transformiere die Daten
  • Daten in eine Senke schreiben

Für unsere Anwendung liest die Pipeline aus einer verteilten Liste , wendet die Transformation von Gruppierung und Aggregation an und schreibt schließlich in eine verteilte Karte .

So schreiben wir unsere Pipeline:

private Pipeline createPipeLine() { Pipeline p = Pipeline.create(); p.readFrom(Sources.list(LIST_NAME)) .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+"))) .filter(word -> !word.isEmpty()) .groupingKey(wholeItem()) .aggregate(counting()) .writeTo(Sinks.map(MAP_NAME)); return p; }

Sobald wir aus der Quelle gelesen haben, durchlaufen wir die Daten und teilen sie mit einem regulären Ausdruck im Raum auf. Danach filtern wir die Leerzeichen heraus.

Schließlich gruppieren wir die Wörter, aggregieren sie und schreiben die Ergebnisse auf eine Karte.

4.2. Die Arbeit

Nachdem unsere Pipeline definiert ist, erstellen wir einen Job zum Ausführen der Pipeline.

So schreiben wir eine countWord- Funktion, die Parameter akzeptiert und die Anzahl zurückgibt:

public Long countWord(List sentences, String word) { long count = 0; JetInstance jet = Jet.newJetInstance(); try { List textList = jet.getList(LIST_NAME); textList.addAll(sentences); Pipeline p = createPipeLine(); jet.newJob(p).join(); Map counts = jet.getMap(MAP_NAME); count = counts.get(word); } finally { Jet.shutdownAll(); } return count; }

Wir erstellen zuerst eine Jet-Instanz, um unseren Job zu erstellen und die Pipeline zu verwenden. Als nächstes kopieren wir die Eingangsliste auf eine verteilte Liste , so dass es über alle Instanzen zur Verfügung steht.

Anschließend senden wir einen Job mithilfe der oben erstellten Pipeline. Die Methode newJob () gibt einen ausführbaren Job zurück, der von Jet asynchron gestartet wird. Die Join- Methode wartet auf den Abschluss des Jobs und löst eine Ausnahme aus, wenn der Job mit einem Fehler abgeschlossen wird.

Wenn der Job abgeschlossen ist, werden die Ergebnisse in einer verteilten Map abgerufen , wie wir in unserer Pipeline definiert haben. Wir erhalten also die Karte von der Jet-Instanz und erhalten die Anzahl der Wörter dagegen.

Zuletzt haben wir die Jet-Instanz heruntergefahren. Es ist wichtig, es nach Beendigung unserer Ausführung herunterzufahren, da die Jet-Instanz ihre eigenen Threads startet . Andernfalls bleibt unser Java-Prozess auch nach dem Beenden unserer Methode noch aktiv.

Hier ist ein Komponententest, der den Code testet, den wir für Jet geschrieben haben:

@Test public void whenGivenSentencesAndWord_ThenReturnCountOfWord() { List sentences = new ArrayList(); sentences.add("The first second was alright, but the second second was tough."); WordCounter wordCounter = new WordCounter(); long countSecond = wordCounter.countWord(sentences, "second"); assertEquals(3, countSecond); }

5. Schlussfolgerung

In diesem Artikel haben wir etwas über Hazelcast Jet erfahren. Weitere Informationen und Funktionen finden Sie im Handbuch.

Wie üblich finden Sie den Code für die in diesem Artikel verwendeten Beispiele auf Github.