Einführung in die Spark-Graph-Verarbeitung mit GraphFrames

1. Einleitung

Die Grafikverarbeitung ist für viele Anwendungen von sozialen Netzwerken bis hin zu Werbung nützlich. In einem Big-Data-Szenario benötigen wir ein Tool, um diese Verarbeitungslast zu verteilen.

In diesem Tutorial werden wir Grafikmöglichkeiten mit Apache Spark in Java laden und untersuchen. Um komplexe Strukturen zu vermeiden, verwenden wir eine einfache und übergeordnete Apache Spark-Diagramm-API: die GraphFrames-API.

2. Grafiken

Definieren wir zunächst ein Diagramm und seine Komponenten. Ein Graph ist eine Datenstruktur mit Kanten und Eckpunkten. Die Kanten enthalten Informationen , die Beziehungen zwischen den Scheitelpunkten darstellen.

Die Scheitelpunkte sind Punkte in einem n- dimensionalen Raum, und Kanten verbinden die Scheitelpunkte gemäß ihren Beziehungen:

Im obigen Bild haben wir ein Beispiel für ein soziales Netzwerk. Wir können die durch Buchstaben dargestellten Eckpunkte und die Kanten sehen, die die Art der Beziehung zwischen den Eckpunkten tragen.

3. Maven Setup

Beginnen wir nun mit dem Projekt, indem wir die Maven-Konfiguration einrichten.

Fügen wir spark-graphx 2.11, graphframes und spark-sql 2.11 hinzu :

 org.apache.spark spark-graphx_2.11 2.4.4   graphframes graphframes 0.7.0-spark2.4-s_2.11   org.apache.spark spark-sql_2.11 2.4.4 

Diese Artefaktversionen unterstützen Scala 2.11.

Es kommt auch vor, dass sich GraphFrames nicht in Maven Central befindet. Fügen wir also auch das benötigte Maven-Repository hinzu:

  SparkPackagesRepo //dl.bintray.com/spark-packages/maven  

4. Spark-Konfiguration

Um mit GraphFrames arbeiten zu können, müssen wir Hadoop herunterladen und die Umgebungsvariable HADOOP_HOME definieren.

Im Fall von Windows als Betriebssystem laden wir auch die entsprechende winutils.exe in den Ordner HADOOP_HOME / bin herunter .

Als nächstes beginnen wir unseren Code mit der Erstellung der Grundkonfiguration:

SparkConf sparkConf = new SparkConf() .setAppName("SparkGraphFrames") .setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

Wir müssen auch eine SparkSession erstellen :

SparkSession session = SparkSession.builder() .appName("SparkGraphFrameSample") .config("spark.sql.warehouse.dir", "/file:C:/temp") .sparkContext(javaSparkContext.sc()) .master("local[*]") .getOrCreate();

5. Graphkonstruktion

Jetzt können wir mit unserem Hauptcode beginnen. Definieren wir also die Entitäten für unsere Scheitelpunkte und Kanten und erstellen die GraphFrame- Instanz.

Wir werden an den Beziehungen zwischen Benutzern aus einem hypothetischen sozialen Netzwerk arbeiten.

5.1. Daten

In diesem Beispiel definieren wir zunächst beide Entitäten als Benutzer und Beziehung :

public class User { private Long id; private String name; // constructor, getters and setters } public class Relationship implements Serializable { private String type; private String src; private String dst; private UUID id; public Relationship(String type, String src, String dst) { this.type = type; this.src = src; this.dst = dst; this.id = UUID.randomUUID(); } // getters and setters }

Als Nächstes definieren wir einige Benutzer- und Beziehungsinstanzen :

List users = new ArrayList(); users.add(new User(1L, "John")); users.add(new User(2L, "Martin")); users.add(new User(3L, "Peter")); users.add(new User(4L, "Alicia")); List relationships = new ArrayList(); relationships.add(new Relationship("Friend", "1", "2")); relationships.add(new Relationship("Following", "1", "4")); relationships.add(new Relationship("Friend", "2", "4")); relationships.add(new Relationship("Relative", "3", "1")); relationships.add(new Relationship("Relative", "3", "4"));

5.2. GraphFrame- Instanz

Um nun unser Beziehungsdiagramm zu erstellen und zu bearbeiten , erstellen wir eine Instanz von GraphFrame . Der GraphFrame- Konstruktor erwartet zwei Dataset- Instanzen, von denen die erste die Eckpunkte und die zweite die Kanten darstellt:

Dataset userDataset = session.createDataFrame(users, User.class); Dataset relationshipDataset = session.createDataFrame(relationships, Relation.class); GraphFrame graph = new GraphFrame(userDataframe, relationshipDataframe);

Zuletzt werden wir unsere Eckpunkte und Kanten in der Konsole protokollieren, um zu sehen, wie es aussieht:

graph.vertices().show(); graph.edges().show();
+---+------+ | id| name| +---+------+ | 1| John| | 2|Martin| | 3| Peter| | 4|Alicia| +---+------+ +---+--------------------+---+---------+ |dst| id|src| type| +---+--------------------+---+---------+ | 2|622da83f-fb18-484...| 1| Friend| | 4|c6dde409-c89d-490...| 1|Following| | 4|360d06e1-4e9b-4ec...| 2| Friend| | 1|de5e738e-c958-4e0...| 3| Relative| | 4|d96b045a-6320-4a6...| 3| Relative| +---+--------------------+---+---------+

6. Grafikoperatoren

Nachdem wir nun eine GraphFrame- Instanz haben, wollen wir sehen, was wir damit machen können.

6.1. Filter

Mit GraphFrames können wir Kanten und Scheitelpunkte nach einer Abfrage filtern.

Als nächstes filtern wir die Scheitelpunkte nach der Eigenschaft name in User :

graph.vertices().filter("name = 'Martin'").show();

An der Konsole sehen wir das Ergebnis:

+---+------+ | id| name| +---+------+ | 2|Martin| +---+------+

Wir können auch direkt im Diagramm filtern, indem wir filterEdges oder filterVertices aufrufen :

graph.filterEdges("type = 'Friend'") .dropIsolatedVertices().vertices().show();

Da wir nun die Kanten gefiltert haben, haben wir möglicherweise noch einige isolierte Eckpunkte. Also rufen wir dropIsolatedVertices () auf.

Als Ergebnis haben wir einen Untergraphen, immer noch eine GraphFrame- Instanz, mit nur den Beziehungen, die den Status "Freund" haben:

+---+------+ | id| name| +---+------+ | 1| John| | 2|Martin| | 4|Alicia| +---+------+

6.2. Grad

Ein weiterer interessanter Funktionsumfang ist der Gradsatz der Operationen. Diese Operationen geben die Anzahl der Kanten zurück, die auf jeden Scheitelpunkt fallen.

The degrees operation just returns the count of all edges of each vertex. On the other hand, inDegrees counts only incoming edges, and outDegrees counts only outgoing edges.

Let's count the incoming degrees of all vertices in our graph:

graph.inDegrees().show();

As a result, we have a GraphFrame that shows the number of incoming edges to each vertex, excluding those with none:

+---+--------+ | id|inDegree| +---+--------+ | 1| 1| | 4| 3| | 2| 1| +---+--------+

7. Graph Algorithms

GraphFrames also provides popular algorithms ready to use — let's take a look at some of them.

7.1. Page Rank

The Page Rank algorithm weighs the incoming edges to a vertex and transforms it into a score.

The idea is that each incoming edge represents an endorsement and makes the vertex more relevant in the given graph.

For example, in a social network, if a person is followed by various people, he or she will be ranked highly.

Running the page rank algorithm is quite straightforward:

graph.pageRank() .maxIter(20) .resetProbability(0.15) .run() .vertices() .show();

To configure this algorithm, we just need to provide:

  • maxIter – the number of iterations of page rank to run – 20 is recommended, too few will decrease the quality, and too many will degrade the performance
  • resetProbability – the random reset probability (alpha) – the lower it is, the bigger the score spread between the winners and losers will be – valid ranges are from 0 to 1. Usually, 0.15 is a good score

The response is a similar GraphFrame, though this time we see an additional column giving the page rank of each vertex:

+---+------+------------------+ | id| name| pagerank| +---+------+------------------+ | 4|Alicia|1.9393230468864597| | 3| Peter|0.4848822786454427| | 1| John|0.7272991738542318| | 2|Martin| 0.848495500613866| +---+------+------------------+

In our graph, Alicia is the most relevant vertex, followed by Martin and John.

7.2. Connected Components

The connected components algorithm finds isolated clusters or isolated sub-graphs. These clusters are sets of connected vertices in a graph where each vertex is reachable from any other vertex in the same set.

We can call the algorithm without any parameters via the connectedComponents() method:

graph.connectedComponents().run().show();

The algorithm returns a GraphFrame containing each vertex and the component to which each is connected:

+---+------+------------+ | id| name| component| +---+------+------------+ | 1| John|154618822656| | 2|Martin|154618822656| | 3| Peter|154618822656| | 4|Alicia|154618822656| +---+------+------------+

Our graph has only one component — this means that we do not have isolated sub-graphs. The component has an auto-generated id, which is 154618822656, in our case.

Although we have one more column here – the component id – our graph is still the same.

7.3. Triangle Counting

Triangle counting is commonly used as community detection and counting in a social network graph. A triangle is a set of three vertices, where each vertex has a relationship to the other two vertices in the triangle.

In a social network community, it's easy to find a considerable number of triangles connected to each other.

We can easily perform a triangle counting directly from our GraphFrame instance:

graph.triangleCount().run().show();

The algorithm also returns a GraphFrame with the number of triangles passing through each vertex.

+-----+---+------+ |count| id| name| +-----+---+------+ | 1| 3| Peter| | 2| 1| John| | 2| 4|Alicia| | 1| 2|Martin| +-----+---+------+

8. Conclusion

Apache Spark is a great tool for computing a relevant amount of data in an optimized and distributed way. And, the GraphFrames library allows us to easily distribute graph operations over Spark.

Wie immer ist der vollständige Quellcode für das Beispiel auf GitHub verfügbar.