Spring Cloud Datenfluss mit Apache Spark

1. Einleitung

Spring Cloud Data Flow ist ein Toolkit zum Erstellen von Datenintegrations- und Echtzeit-Datenverarbeitungs-Pipelines.

Pipelines sind in diesem Fall Spring Boot-Anwendungen, die mithilfe von Spring Cloud Stream- oder Spring Cloud Task-Frameworks erstellt wurden.

In diesem Tutorial zeigen wir, wie Spring Cloud Data Flow mit Apache Spark verwendet wird.

2. Datenfluss Lokaler Server

Zunächst müssen wir den Datenflussserver ausführen, um unsere Jobs bereitstellen zu können.

Um den Datenflussserver lokal auszuführen, müssen Sie ein neues Projekt mit der lokalen Abhängigkeit von Spring-Cloud-Starter-Datenflussserver- lokal erstellen :

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 1.7.4.RELEASE 

Danach müssen wir die Hauptklasse auf dem Server mit @EnableDataFlowServer kommentieren :

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } }

Sobald wir diese Anwendung ausführen, haben wir einen lokalen Datenflussserver an Port 9393.

3. Erstellen eines Projekts

Wir erstellen einen Spark-Job als eigenständige lokale Anwendung, sodass wir keinen Cluster benötigen, um ihn auszuführen.

3.1. Abhängigkeiten

Zuerst fügen wir die Spark-Abhängigkeit hinzu:

 org.apache.spark spark-core_2.10 2.4.0  

3.2. Einen Job erstellen

Und für unseren Job, approximieren wir pi:

public class PiApproximation { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation"); JavaSparkContext context = new JavaSparkContext(conf); int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2; int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices; List xs = IntStream.rangeClosed(0, n) .mapToObj(element -> Integer.valueOf(element)) .collect(Collectors.toList()); JavaRDD dataSet = context.parallelize(xs, slices); JavaRDD pointsInsideTheCircle = dataSet.map(integer -> { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y )  integer + integer2); System.out.println("The pi was estimated as:" + count / n); context.stop(); } }

4. Datenfluss-Shell

Data Flow Shell ist eine Anwendung, mit der wir mit dem Server interagieren können . Shell verwendet die DSL-Befehle, um Datenflüsse zu beschreiben.

Um die Datenfluss-Shell verwenden zu können, müssen Sie ein Projekt erstellen, mit dem wir es ausführen können. Zunächst benötigen wir die Spring-Cloud-Dataflow-Shell- Abhängigkeit:

 org.springframework.cloud spring-cloud-dataflow-shell 1.7.4.RELEASE 

Nach dem Hinzufügen der Abhängigkeit können wir die Klasse erstellen, in der unsere Datenfluss-Shell ausgeführt wird:

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } }

5. Bereitstellen des Projekts

Für die Bereitstellung unseres Projekts verwenden wir den sogenannten Task Runner, der für Apache Spark in drei Versionen verfügbar ist: Cluster , Garn und Client . Wir werden mit der lokalen Client- Version fortfahren .

Der Task Runner ist das, was unseren Spark-Job ausführt.

Dazu müssen wir zuerst unsere Aufgabe mithilfe der Datenfluss-Shell registrieren :

app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT 

Mit dieser Aufgabe können wir mehrere verschiedene Parameter angeben, von denen einige optional sind. Einige Parameter sind jedoch erforderlich, um den Spark-Job ordnungsgemäß bereitzustellen:

  • spark.app-class , die Hauptklasse unseres eingereichten Jobs
  • spark.app-jar , ein Weg zu dem fetten Glas, das unseren Job enthält
  • spark.app- name , der Name, der für unseren Job verwendet wird
  • spark.app-args , die Argumente, die an den Job übergeben werden

Wir können den registrierten Task- Spark-Client verwenden , um unseren Job einzureichen, wobei wir uns daran erinnern, die erforderlichen Parameter anzugeben:

task create spark1 --definition "spark-client \ --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \ --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

Beachten Sie, dass spark.app- jar bei unserem Job der Weg zum Fat-jar ist.

Nach erfolgreicher Erstellung der Aufgabe können wir sie mit dem folgenden Befehl ausführen:

task launch spark1

Dies wird die Ausführung unserer Aufgabe aufrufen.

6. Zusammenfassung

In diesem Tutorial haben wir gezeigt, wie das Spring Cloud Data Flow-Framework zum Verarbeiten von Daten mit Apache Spark verwendet wird. Weitere Informationen zum Spring Cloud Data Flow Framework finden Sie in der Dokumentation.

Alle Codebeispiele finden Sie auf GitHub.