Ein Leitfaden für Apache Crunch

1. Einleitung

In diesem Tutorial werden wir Apache Crunch anhand einer beispielhaften Datenverarbeitungsanwendung demonstrieren. Wir werden diese Anwendung mit dem MapReduce-Framework ausführen.

Wir werden zunächst einige Apache Crunch-Konzepte kurz behandeln. Dann springen wir in eine Beispiel-App. In dieser App werden wir Textverarbeitung durchführen:

  • Zunächst lesen wir die Zeilen aus einer Textdatei
  • Später werden wir sie in Wörter aufteilen und einige gebräuchliche Wörter entfernen
  • Anschließend gruppieren wir die verbleibenden Wörter, um eine Liste der eindeutigen Wörter und ihrer Anzahl zu erhalten
  • Schließlich schreiben wir diese Liste in eine Textdatei

2. Was ist Crunch?

MapReduce ist ein verteiltes, paralleles Programmierframework für die Verarbeitung großer Datenmengen auf einem Servercluster. Software-Frameworks wie Hadoop und Spark implementieren MapReduce.

Crunch bietet ein Framework zum Schreiben, Testen und Ausführen von MapReduce-Pipelines in Java. Hier schreiben wir die MapReduce-Jobs nicht direkt. Vielmehr definieren wir die Datenpipeline (dh die Operationen zum Ausführen von Eingabe-, Verarbeitungs- und Ausgabeschritten) mithilfe der Crunch-APIs. Crunch Planner ordnet sie den MapReduce-Jobs zu und führt sie bei Bedarf aus.

Daher wird jede Crunch-Datenpipeline von einer Instanz der Pipeline- Schnittstelle koordiniert . Diese Schnittstelle definiert auch Methoden zum Einlesen von Daten in eine Pipeline über Quellinstanzen und zum Ausschreiben von Daten aus einer Pipeline in Zielinstanzen .

Wir haben 3 Schnittstellen zur Darstellung von Daten:

  1. PCollection - eine unveränderliche, verteilte Sammlung von Elementen
  2. PTable , V > - eine unveränderliche, verteilte, ungeordnete Mehrfachzuordnung von Schlüsseln und Werten
  3. PGroupedTable , V > - eine verteilte, sortierte Zuordnung von Schlüsseln vom Typ K zu einem iterierbaren V, die genau einmal wiederholt werden kann

DoFn ist die Basisklasse für alle Datenverarbeitungsfunktionen . Es entspricht den Mapper- , Reducer- und Combiner- Klassen in MapReduce. Wir verbringen den größten Teil der Entwicklungszeit damit, logische Berechnungen damit zu schreiben und zu testen .

Nachdem wir Crunch besser kennen, können Sie damit die Beispielanwendung erstellen.

3. Einrichten eines Crunch-Projekts

Lassen Sie uns zunächst ein Crunch-Projekt mit Maven einrichten. Wir können dies auf zwei Arten tun:

  1. Fügen Sie die erforderlichen Abhängigkeiten in die Datei pom.xml eines vorhandenen Projekts ein
  2. Verwenden Sie einen Archetyp, um ein Starterprojekt zu generieren

Lassen Sie uns einen kurzen Blick auf beide Ansätze werfen.

3.1. Maven-Abhängigkeiten

Um Crunch zu einem vorhandenen Projekt hinzuzufügen, fügen wir die erforderlichen Abhängigkeiten in die Datei pom.xml ein .

Fügen wir zunächst die Crunch-Core- Bibliothek hinzu:

 org.apache.crunch crunch-core 0.15.0 

Als nächstes fügen wir die Hadoop-Client- Bibliothek hinzu, um mit Hadoop zu kommunizieren. Wir verwenden die Version, die der Hadoop-Installation entspricht:

 org.apache.hadoop hadoop-client 2.2.0 provided 

Wir können in Maven Central nach den neuesten Versionen der Crunch-Core- und Hadoop-Client-Bibliotheken suchen.

3.2. Maven Archetyp

Ein anderer Ansatz besteht darin, schnell ein Starterprojekt mit dem von Crunch bereitgestellten Maven-Archetyp zu generieren :

mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype 

Wenn Sie durch den obigen Befehl dazu aufgefordert werden, geben wir die Crunch-Version und die Projektartefaktdetails an.

4. Crunch Pipeline Setup

Nach dem Einrichten des Projekts müssen wir ein Pipeline- Objekt erstellen . Crunch hat 3 Pipeline- Implementierungen :

  • MRPipeline - wird in Hadoop MapReduce ausgeführt
  • SparkPipeline - Wird als eine Reihe von Spark-Pipelines ausgeführt
  • MemPipeline - führt In-Memory auf dem Client aus und ist nützlich für Unit-Tests

Normalerweise entwickeln und testen wir mit einer Instanz von MemPipeline . Später verwenden wir eine Instanz von MRPipeline oder SparkPipeline für die tatsächliche Ausführung.

Wenn wir eine In-Memory-Pipeline benötigen, können wir die statische Methode getInstance verwenden , um die MemPipeline- Instanz abzurufen :

Pipeline pipeline = MemPipeline.getInstance();

Erstellen wir zunächst eine Instanz von MRPipeline , um die Anwendung mit Hadoop auszuführen :

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

5. Eingabedaten lesen

Nach dem Erstellen des Pipeline-Objekts möchten wir die Eingabedaten lesen. Die Pipeline- Schnittstelle bietet eine bequeme Methode zum Lesen von Eingaben aus einer Textdatei , readTextFile (pathName).

Rufen wir diese Methode auf, um die Eingabetextdatei zu lesen:

PCollection lines = pipeline.readTextFile(inputPath);

Der obige Code liest die Textdatei als eine Sammlung von Zeichenfolgen .

Schreiben wir als nächsten Schritt einen Testfall zum Lesen der Eingabe:

@Test public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() { Pipeline pipeline = MemPipeline.getInstance(); PCollection lines = pipeline.readTextFile(INPUT_FILE_PATH); assertEquals(21, lines.asCollection() .getValue() .size()); }

In diesem Test überprüfen wir, ob wir beim Lesen einer Textdatei die erwartete Anzahl von Zeilen erhalten.

6. Datenverarbeitungsschritte

Nachdem wir die Eingabedaten gelesen haben, müssen wir sie verarbeiten. Die Crunch-API enthält eine Reihe von DoFn- Unterklassen für allgemeine Datenverarbeitungsszenarien :

  • FilterFn - filtert Mitglieder einer Sammlung basierend auf einer booleschen Bedingung
  • MapFn - Ordnet jeden Eingabedatensatz genau einem Ausgabedatensatz zu
  • CombineFn - kombiniert eine Reihe von Werten zu einem einzigen Wert
  • JoinFn - führt Verknüpfungen wie innere Verknüpfung, linke äußere Verknüpfung, rechte äußere Verknüpfung und vollständige äußere Verknüpfung aus

Implementieren wir die folgende Datenverarbeitungslogik mithilfe dieser Klassen:

  1. Teilen Sie jede Zeile in der Eingabedatei in Wörter auf
  2. Entfernen Sie die Stoppwörter
  3. Zähle die einzigartigen Wörter

6.1. Teilen Sie eine Textzeile in Wörter auf

Lassen Sie uns zunächst die Tokenizer- Klasse erstellen , um eine Zeile in Wörter aufzuteilen.

Wir werden die DoFn- Klasse erweitern. Diese Klasse hat eine abstrakte Methode namens process . Diese Methode verarbeitet die Eingabedatensätze aus einer PC-Sammlung und sendet die Ausgabe an einen Emitter.

Wir müssen die Aufteilungslogik in dieser Methode implementieren:

public class Tokenizer extends DoFn { private static final Splitter SPLITTER = Splitter .onPattern("\\s+") .omitEmptyStrings(); @Override public void process(String line, Emitter emitter) { for (String word : SPLITTER.split(line)) { emitter.emit(word); } } } 

In der obigen Implementierung haben wir die Splitter- Klasse aus der Guava-Bibliothek verwendet, um Wörter aus einer Zeile zu extrahieren.

Als nächstes schreiben wir einen Komponententest für die Tokenizer- Klasse:

@RunWith(MockitoJUnitRunner.class) public class TokenizerUnitTest { @Mock private Emitter emitter; @Test public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() { Tokenizer splitter = new Tokenizer(); splitter.process(" hello world ", emitter); verify(emitter).emit("hello"); verify(emitter).emit("world"); verifyNoMoreInteractions(emitter); } }

Der obige Test bestätigt, dass die richtigen Wörter zurückgegeben werden.

Lassen Sie uns abschließend die mit dieser Klasse aus der Eingabetextdatei gelesenen Zeilen aufteilen.

Die parallelDo- Methode der PCollection- Schnittstelle wendet das angegebene DoFn auf alle Elemente an und gibt eine neue PCollection zurück .

Rufen Sie diese Methode in der Zeilenauflistung auf und übergeben Sie eine Instanz von Tokenizer :

PCollection words = lines.parallelDo(new Tokenizer(), Writables.strings()); 

Als Ergebnis erhalten wir die Liste der Wörter in der Eingabetextdatei. Wir werden die Stoppwörter im nächsten Schritt entfernen.

6.2. Stoppwörter entfernen

Erstellen Sie ähnlich wie im vorherigen Schritt eine StopWordFilter- Klasse, um Stoppwörter herauszufiltern.

However, we'll extend FilterFn instead of DoFn. FilterFn has an abstract method called accept. We need to implement the filtering logic in this method:

public class StopWordFilter extends FilterFn { // English stop words, borrowed from Lucene. private static final Set STOP_WORDS = ImmutableSet .copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "s", "such", "t", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with" }); @Override public boolean accept(String word) { return !STOP_WORDS.contains(word); } }

Next, let's write the unit test for StopWordFilter class:

public class StopWordFilterUnitTest { @Test public void givenFilter_whenStopWordPassed_thenFalseReturned() { FilterFn filter = new StopWordFilter(); assertFalse(filter.accept("the")); assertFalse(filter.accept("a")); } @Test public void givenFilter_whenNonStopWordPassed_thenTrueReturned() { FilterFn filter = new StopWordFilter(); assertTrue(filter.accept("Hello")); assertTrue(filter.accept("World")); } @Test public void givenWordCollection_whenFiltered_thenStopWordsRemoved() { PCollection words = MemPipeline .collectionOf("This", "is", "a", "test", "sentence"); PCollection noStopWords = words.filter(new StopWordFilter()); assertEquals(ImmutableList.of("This", "test", "sentence"), Lists.newArrayList(noStopWords.materialize())); } }

This test verifies that the filtering logic is performed correctly.

Finally, let's use StopWordFilter to filter the list of words generated in the previous step. The filter method of PCollection interface applies the given FilterFn to all the elements and returns a new PCollection.

Let's call this method on the words collection and pass an instance of StopWordFilter:

PCollection noStopWords = words.filter(new StopWordFilter());

As a result, we get the filtered collection of words.

6.3. Count Unique Words

After getting the filtered collection of words, we want to count how often each word occurs. PCollection interface has a number of methods to perform common aggregations:

  • min – returns the minimum element of the collection
  • max – returns the maximum element of the collection
  • length – returns the number of elements in the collection
  • count – returns a PTable that contains the count of each unique element of the collection

Let's use the count method to get the unique words along with their counts:

// The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable counts = noStopWords.count();

7. Specify Output

As a result of the previous steps, we have a table of words and their counts. We want to write this result to a text file. The Pipeline interface provides convenience methods to write output:

void write(PCollection collection, Target target); void write(PCollection collection, Target target, Target.WriteMode writeMode); void writeTextFile(PCollection collection, String pathName);

Therefore, let's call the writeTextFile method:

pipeline.writeTextFile(counts, outputPath); 

8. Manage Pipeline Execution

All the steps so far have just defined the data pipeline. No input has been read or processed. This is because Crunch uses lazy execution model.

It doesn't run the MapReduce jobs until a method that controls job planning and execution is invoked on the Pipeline interface:

  • run – prepares an execution plan to create the required outputs and then executes it synchronously
  • done – runs any remaining jobs required to generate outputs and then cleans up any intermediate data files created
  • runAsync – similar to run method, but executes in a non-blocking fashion

Therefore, let's call the done method to execute the pipeline as MapReduce jobs:

PipelineResult result = pipeline.done(); 

The above statement runs the MapReduce jobs to read input, process them and write the result to the output directory.

9. Putting the Pipeline Together

So far we have developed and unit tested the logic to read input data, process it and write to the output file.

Next, let's put them together to build the entire data pipeline:

public int run(String[] args) throws Exception { String inputPath = args[0]; String outputPath = args[1]; // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class, getConf()); // Reference a given text file as a collection of Strings. PCollection lines = pipeline.readTextFile(inputPath); // Define a function that splits each line in a PCollection of Strings into // a PCollection made up of the individual words in the file. // The second argument sets the serialization format. PCollection words = lines.parallelDo(new Tokenizer(), Writables.strings()); // Take the collection of words and remove known stop words. PCollection noStopWords = words.filter(new StopWordFilter()); // The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable counts = noStopWords.count(); // Instruct the pipeline to write the resulting counts to a text file. pipeline.writeTextFile(counts, outputPath); // Execute the pipeline as a MapReduce. PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1; }

10. Hadoop Launch Configuration

The data pipeline is thus ready.

However, we need the code to launch it. Therefore, let's write the main method to launch the application:

public class WordCount extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordCount(), args); }

ToolRunner.run parses the Hadoop configuration from the command line and executes the MapReduce job.

11. Run Application

The complete application is now ready. Let's run the following command to build it:

mvn package 

As a result of the above command, we get the packaged application and a special job jar in the target directory.

Let's use this job jar to execute the application on Hadoop:

hadoop jar target/crunch-1.0-SNAPSHOT-job.jar 

The application reads the input file and writes the result to the output file. The output file contains unique words along with their counts similar to the following:

[Add,1] [Added,1] [Admiration,1] [Admitting,1] [Allowance,1]

In addition to Hadoop, we can run the application within IDE, as a stand-alone application or as unit tests.

12. Conclusion

In this tutorial, we created a data processing application running on MapReduce. Apache Crunch makes it easy to write, test and execute MapReduce pipelines in Java.

As usual, the full source code can be found over on Github.