Apache Spark: Unterschiede zwischen Dataframes, Datasets und RDDs

1. Übersicht

Apache Spark ist ein schnelles, verteiltes Datenverarbeitungssystem. Es verarbeitet In-Memory-Daten und verwendet In-Memory-Caching und optimierte Ausführung, was zu einer schnellen Leistung führt. Es bietet High-Level-APIs für beliebte Programmiersprachen wie Scala, Python, Java und R.

In diesem kurzen Tutorial werden drei der grundlegenden Spark-Konzepte erläutert: Datenrahmen, Datensätze und RDDs.

2. DataFrame

Spark SQL hat seit Spark 1.3 eine tabellarische Datenabstraktion namens DataFrame eingeführt. Seitdem ist es eines der wichtigsten Features in Spark geworden. Diese API ist nützlich, wenn wir strukturierte und halbstrukturierte verteilte Daten verarbeiten möchten.

In Abschnitt 3 werden Resilient Distributed Datasets (RDD) erläutert. DataFrames speichern Daten effizienter als RDDs. Dies liegt daran, dass sie die unveränderlichen, speicherinternen, ausfallsicheren, verteilten und parallelen Funktionen von RDDs verwenden, aber auch ein Schema auf die Daten anwenden. DataFrames übersetzt auch SQL-Code in optimierte RDD-Operationen auf niedriger Ebene.

Wir können DataFrames auf drei Arten erstellen:

  • Konvertieren vorhandener RDDs
  • Ausführen von SQL-Abfragen
  • Externe Daten laden

Das Spark-Team hat SparkSession in Version 2.0 eingeführt. Es vereint alle unterschiedlichen Kontexte und stellt sicher, dass Entwickler sich nicht um die Erstellung unterschiedlicher Kontexte kümmern müssen:

SparkSession session = SparkSession.builder() .appName("TouristDataFrameExample") .master("local[*]") .getOrCreate(); DataFrameReader dataFrameReader = session.read();

Wir werden die Tourist.csv- Datei analysieren :

Dataset data = dataFrameReader.option("header", "true") .csv("data/Tourist.csv");

Da Spark 2.0 DataFrame zu einem Dataset vom Typ Row wurde , können wir einen DataFrame als Alias ​​für einen Dataset verwenden .

Wir können bestimmte Spalten auswählen, an denen wir interessiert sind. Wir können auch nach einer bestimmten Spalte filtern und gruppieren:

data.select(col("country"), col("year"), col("value")) .show(); data.filter(col("country").equalTo("Mexico")) .show(); data.groupBy(col("country")) .count() .show();

3. Datensätze

Ein Datensatz besteht aus stark typisierten, strukturierten Daten . Sie bieten den bekannten objektorientierten Programmierstil sowie die Vorteile der Typensicherheit, da Datasets die Syntax überprüfen und Fehler beim Kompilieren abfangen können.

Dataset ist eine Erweiterung von DataFrame. Daher können wir einen DataFrame als untypisierte Ansicht eines Datasets betrachten.

Das Spark-Team veröffentlichte die Dataset- API in Spark 1.6 und wie bereits erwähnt: „Das Ziel von Spark-Datasets besteht darin, eine API bereitzustellen, mit der Benutzer Transformationen in Objektdomänen einfach ausdrücken und gleichzeitig die Leistungs- und Robustheitsvorteile der Spark SQL-Ausführung bieten können Motor".

Zuerst müssen wir eine Klasse vom Typ TouristData erstellen :

public class TouristData { private String region; private String country; private String year; private String series; private Double value; private String footnotes; private String source; // ... getters and setters }

Um jeden unserer Datensätze dem angegebenen Typ zuzuordnen, benötigen wir einen Encoder. Encoder übersetzen zwischen Java-Objekten und dem internen Binärformat von Spark :

// SparkSession initialization and data load Dataset responseWithSelectedColumns = data.select(col("region"), col("country"), col("year"), col("series"), col("value").cast("double"), col("footnotes"), col("source")); Dataset typedDataset = responseWithSelectedColumns .as(Encoders.bean(TouristData.class));

Wie bei DataFrame können wir nach bestimmten Spalten filtern und gruppieren:

typedDataset.filter((FilterFunction) record -> record.getCountry() .equals("Norway")) .show(); typedDataset.groupBy(typedDataset.col("country")) .count() .show();

Wir können auch Operationen wie Filtern nach Spalte ausführen, die einem bestimmten Bereich entsprechen, oder die Summe einer bestimmten Spalte berechnen, um den Gesamtwert davon zu erhalten:

typedDataset.filter((FilterFunction) record -> record.getYear() != null && (Long.valueOf(record.getYear()) > 2010 && Long.valueOf(record.getYear())  record.getValue() != null && record.getSeries() .contains("expenditure")) .groupBy("country") .agg(sum("value")) .show();

4. RDDs

Das Resilient Distributed Dataset oder RDD ist die primäre Programmierabstraktion von Spark. Es stellt eine Sammlung von Elementen dar, die unveränderlich, belastbar und verteilt sind .

Eine RDD kapselt einen großen Datensatz. Spark verteilt die in RDDs enthaltenen Daten automatisch auf unseren Cluster und parallelisiert die Operationen, die wir an ihnen ausführen .

Wir können RDDs nur durch Operationen von Daten in stabilem Speicher oder durch Operationen auf anderen RDDs erstellen.

Fehlertoleranz ist wichtig, wenn wir mit großen Datenmengen arbeiten und die Daten auf Cluster-Computern verteilt werden. RDDs sind aufgrund der in Spark integrierten Fehlerbehebungsmechanik belastbar. Spark stützt sich auf die Tatsache, dass RDDs sich merken, wie sie erstellt wurden, damit wir die Linie leicht zurückverfolgen können, um die Partition wiederherzustellen .

Es gibt zwei Arten von Operationen, die wir für RDDs ausführen können: Transformationen und Aktionen .

4.1. Transformationen

Wir können Transformationen auf eine RDD anwenden, um ihre Daten zu bearbeiten. Nachdem diese Manipulation durchgeführt wurde, erhalten wir eine brandneue RDD, da RDDs unveränderliche Objekte sind .

Wir werden überprüfen, wie Map und Filter implementiert werden, zwei der häufigsten Transformationen.

Zuerst müssen wir eine erstellen JavaSparkContext und laden die Daten als RDD von der Tourist.csv - Datei:

SparkConf conf = new SparkConf().setAppName("uppercaseCountries") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD tourists = sc.textFile("data/Tourist.csv");

Als nächstes wenden wir die Kartenfunktion an, um den Namen des Landes aus jedem Datensatz abzurufen und den Namen in Großbuchstaben umzuwandeln. Wir können diesen neu generierten Datensatz als Textdatei auf der Festplatte speichern:

JavaRDD upperCaseCountries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1].toUpperCase(); }).distinct(); upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

If we want to select only a specific country, we can apply the filter function on our original tourists RDD:

JavaRDD touristsInMexico = tourists .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico")); touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2. Actions

Actions will return a final value or save the results to disc, after doing some computation on the data.

Two of the recurrently used actions in Spark are Count and Reduce.

Let's count the total countries on our CSV file:

// Spark Context initialization and data load JavaRDD countries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1]; }).distinct(); Long numberOfCountries = countries.count();

Now, we'll calculate the total expenditure by country. We'll need to filter the records containing expenditure in their description.

Instead of using a JavaRDD, we'll use a JavaPairRDD. A pair of RDD is a type of RDD that can store key-value pairs. Let's check it next:

JavaRDD touristsExpenditure = tourists .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure")); JavaPairRDD expenditurePairRdd = touristsExpenditure .mapToPair(line -> { String[] columns = line.split(COMMA_DELIMITER); return new Tuple2(columns[1], Double.valueOf(columns[6])); }); List
    
      totalByCountry = expenditurePairRdd .reduceByKey((x, y) -> x + y) .collect();
    

5. Conclusion

Zusammenfassend sollten wir DataFrames oder Datasets verwenden, wenn wir domänenspezifische APIs benötigen. Wir benötigen allgemeine Ausdrücke wie Aggregation, Summe oder SQL-Abfragen. Oder wenn wir zur Kompilierungszeit Typensicherheit wünschen.

Auf der anderen Seite sollten wir RDDs verwenden, wenn Daten unstrukturiert sind und wir kein bestimmtes Schema implementieren müssen oder wenn wir Transformationen und Aktionen auf niedriger Ebene benötigen.

Wie immer sind alle Codebeispiele auf GitHub verfügbar.