Einführung in Apache Storm

1. Übersicht

Dieses Tutorial ist eine Einführung in Apache Storm, ein verteiltes Echtzeit-Rechensystem.

Wir konzentrieren uns auf und behandeln:

  • Was genau ist Apache Storm und welche Probleme löst es?
  • Seine Architektur und
  • Wie man es in einem Projekt verwendet

2. Was ist Apache Storm?

Apache Storm ist ein kostenloses und Open Source verteiltes System für Echtzeitberechnungen.

Es bietet Fehlertoleranz, Skalierbarkeit und garantiert Datenverarbeitung und eignet sich besonders gut für die Verarbeitung unbegrenzter Datenströme.

Einige gute Anwendungsfälle für Storm können die Verarbeitung von Kreditkartenvorgängen zur Betrugserkennung oder die Verarbeitung von Daten aus Smart Homes zur Erkennung fehlerhafter Sensoren sein.

Storm ermöglicht die Integration in verschiedene auf dem Markt verfügbare Datenbanken und Warteschlangensysteme.

3. Maven-Abhängigkeit

Bevor wir Apache Storm verwenden, müssen wir die Storm-Core-Abhängigkeit in unser Projekt aufnehmen:

 org.apache.storm storm-core 1.2.2 provided 

Wir sollten den bereitgestellten Bereich nur verwenden, wenn wir beabsichtigen, unsere Anwendung auf dem Storm-Cluster auszuführen.

Um die Anwendung lokal auszuführen, können wir einen sogenannten lokalen Modus verwenden, der den Storm-Cluster in einem lokalen Prozess simuliert. In diesem Fall sollten wir den bereitgestellten entfernen .

4. Datenmodell

Das Datenmodell von Apache Storm besteht aus zwei Elementen: Tupeln und Streams.

4.1. Tupel

Ein Tupel ist eine geordnete Liste benannter Felder mit dynamischen Typen. Dies bedeutet, dass wir die Feldtypen nicht explizit deklarieren müssen.

Storm muss wissen, wie alle in einem Tupel verwendeten Werte serialisiert werden. Standardmäßig können primitive Typen, Strings und Byte- Arrays bereits serialisiert werden .

Und da Storm die Kryo-Serialisierung verwendet, müssen wir den Serializer mithilfe von Config registrieren , um die benutzerdefinierten Typen verwenden zu können. Wir können dies auf zwei Arten tun:

Zunächst können wir die zu serialisierende Klasse mit ihrem vollständigen Namen registrieren:

Config config = new Config(); config.registerSerialization(User.class);

In diesem Fall serialisiert Kryo die Klasse mit FieldSerializer. Standardmäßig werden alle nicht vorübergehenden Felder der Klasse, sowohl private als auch öffentliche, serialisiert.

Stattdessen können wir sowohl die zu serialisierende Klasse als auch den Serializer bereitstellen, den Storm für diese Klasse verwenden soll:

Config config = new Config(); config.registerSerialization(User.class, UserSerializer.class);

Um den benutzerdefinierten Serializer zu erstellen, müssen wir die generische Klasse Serializer erweitern , die über zwei Methoden zum Schreiben und Lesen verfügt.

4.2. Strom

Ein Stream ist die Kernabstraktion im Storm-Ökosystem. Der Stream ist eine unbegrenzte Folge von Tupeln.

Storms ermöglicht die parallele Verarbeitung mehrerer Streams.

Jeder Stream hat eine ID, die während der Deklaration bereitgestellt und zugewiesen wird.

5. Topologie

Die Logik der Echtzeit-Storm-Anwendung ist in die Topologie gepackt. Die Topologie besteht aus Ausgüssen und Schrauben .

5.1. Tülle

Ausläufe sind die Quellen der Ströme. Sie senden Tupel an die Topologie.

Tupel können von verschiedenen externen Systemen wie Kafka, Kestrel oder ActiveMQ gelesen werden.

Ausläufe können zuverlässig oder unzuverlässig sein . Zuverlässig bedeutet, dass der Auslauf antworten kann, dass das Tupel, das nicht von Storm verarbeitet wurde. Unzuverlässig bedeutet, dass der Auslauf nicht antwortet, da er einen Feuer-und-Vergessen-Mechanismus verwendet, um die Tupel auszugeben.

Um den benutzerdefinierten Auslauf zu erstellen, müssen Sie die IRichSpout- Schnittstelle implementieren oder eine Klasse erweitern, die die Schnittstelle bereits implementiert, z. B. eine abstrakte BaseRichSpout- Klasse.

Erstellen wir einen unzuverlässigen Auslauf:

public class RandomIntSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector outputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); outputCollector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis())); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp")); } }

Unser benutzerdefinierter RandomIntSpout generiert jede Sekunde eine zufällige Ganzzahl und einen Zeitstempel.

5.2. Bolzen

Bolzen verarbeiten Tupel im Stream. Sie können verschiedene Vorgänge wie Filtern, Aggregationen oder benutzerdefinierte Funktionen ausführen.

Einige Operationen erfordern mehrere Schritte, und daher müssen wir in solchen Fällen mehrere Schrauben verwenden.

Um den benutzerdefinierten Bolzen zu erstellen , müssen wir IRichBolt oder für einfachere Operationen die IBasicBolt- Schnittstelle implementieren.

Es stehen auch mehrere Hilfsklassen zur Implementierung von Bolt zur Verfügung. In diesem Fall verwenden wir BaseBasicBolt :

public class PrintingBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { System.out.println(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }

Dieser benutzerdefinierte PrintingBolt druckt einfach alle Tupel auf die Konsole.

6. Erstellen einer einfachen Topologie

Lassen Sie uns diese Ideen in einer einfachen Topologie zusammenfassen. Unsere Topologie wird einen Auslauf und drei Schrauben haben.

6.1. RandomNumberSpout

Am Anfang erstellen wir einen unzuverlässigen Auslauf. Jede Sekunde werden zufällige Ganzzahlen aus dem Bereich (0,100) generiert:

public class RandomNumberSpout extends BaseRichSpout { private Random random; private SpoutOutputCollector collector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { random = new Random(); collector = spoutOutputCollector; } @Override public void nextTuple() { Utils.sleep(1000); int operation = random.nextInt(101); long timestamp = System.currentTimeMillis(); Values values = new Values(operation, timestamp); collector.emit(values); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }

6.2. FilteringBolt

Als Nächstes erstellen wir eine Schraube, die alle Elemente mit einer Operation von 0 herausfiltert :

public class FilteringBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { int operation = tuple.getIntegerByField("operation"); if (operation > 0) { basicOutputCollector.emit(tuple.getValues()); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("operation", "timestamp")); } }

6.3. AggregatingBolt

Als nächstes erstellen wir einen komplizierteren Bolzen , der alle positiven Operationen von jedem Tag zusammenfasst.

Zu diesem Zweck verwenden wir eine bestimmte Klasse, die speziell für die Implementierung von Schrauben erstellt wurde, die unter Windows anstatt unter einzelnen Tupeln ausgeführt werden: BaseWindowedBolt .

Windows are an essential concept in stream processing, splitting the infinite streams into finite chunks. We can then apply computations to each chunk. There are generally two types of windows:

Time windows are used to group elements from a given time period using timestamps. Time windows may have a different number of elements.

Count windows are used to create windows with a defined size. In such a case, all windows will have the same size and the window will not be emitted if there are fewer elements than the defined size.

Our AggregatingBolt will generate the sum of all positive operations from a time window along with its beginning and end timestamps:

public class AggregatingBolt extends BaseWindowedBolt { private OutputCollector outputCollector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.outputCollector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override public void execute(TupleWindow tupleWindow) { List tuples = tupleWindow.get(); tuples.sort(Comparator.comparing(this::getTimestamp)); int sumOfOperations = tuples.stream() .mapToInt(tuple -> tuple.getIntegerByField("operation")) .sum(); Long beginningTimestamp = getTimestamp(tuples.get(0)); Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1)); Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp); outputCollector.emit(values); } private Long getTimestamp(Tuple tuple) { return tuple.getLongByField("timestamp"); } }

Note that, in this case, getting the first element of the list directly is safe. That's because each window is calculated using the timestamp field of the Tuple, so there has to be at least one element in each window.

6.4. FileWritingBolt

Finally, we'll create a bolt that will take all elements with sumOfOperations greater than 2000, serialize them and write them to the file:

public class FileWritingBolt extends BaseRichBolt { public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class); private BufferedWriter writer; private String filePath; private ObjectMapper objectMapper; @Override public void cleanup() { try { writer.close(); } catch (IOException e) { logger.error("Failed to close writer!"); } } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); try { writer = new BufferedWriter(new FileWriter(filePath)); } catch (IOException e) { logger.error("Failed to open a file for writing.", e); } } @Override public void execute(Tuple tuple) { int sumOfOperations = tuple.getIntegerByField("sumOfOperations"); long beginningTimestamp = tuple.getLongByField("beginningTimestamp"); long endTimestamp = tuple.getLongByField("endTimestamp"); if (sumOfOperations > 2000) { AggregatedWindow aggregatedWindow = new AggregatedWindow( sumOfOperations, beginningTimestamp, endTimestamp); try { writer.write(objectMapper.writeValueAsString(aggregatedWindow)); writer.newLine(); writer.flush(); } catch (IOException e) { logger.error("Failed to write data to file.", e); } } } // public constructor and other methods }

Note that we don't need to declare the output as this will be the last bolt in our topology

6.5. Running the Topology

Finally, we can pull everything together and run our topology:

public static void runTopology() { TopologyBuilder builder = new TopologyBuilder(); Spout random = new RandomNumberSpout(); builder.setSpout("randomNumberSpout"); Bolt filtering = new FilteringBolt(); builder.setBolt("filteringBolt", filtering) .shuffleGrouping("randomNumberSpout"); Bolt aggregating = new AggregatingBolt() .withTimestampField("timestamp") .withLag(BaseWindowedBolt.Duration.seconds(1)) .withWindow(BaseWindowedBolt.Duration.seconds(5)); builder.setBolt("aggregatingBolt", aggregating) .shuffleGrouping("filteringBolt");  String filePath = "./src/main/resources/data.txt"; Bolt file = new FileWritingBolt(filePath); builder.setBolt("fileBolt", file) .shuffleGrouping("aggregatingBolt"); Config config = new Config(); config.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Test", config, builder.createTopology()); }

To make the data flow through each piece in the topology, we need to indicate how to connect them. shuffleGroup allows us to state that data for filteringBolt will be coming from randomNumberSpout.

For each Bolt, we need to add shuffleGroup which defines the source of elements for this bolt. The source of elements may be a Spout or another Bolt. And if we set the same source for more than one bolt, the source will emit all elements to each of them.

In this case, our topology will use the LocalCluster to run the job locally.

7. Conclusion

In diesem Tutorial haben wir Apache Storm vorgestellt, ein verteiltes Echtzeit-Rechensystem. Wir haben einen Auslauf und einige Schrauben erstellt und diese zu einer vollständigen Topologie zusammengezogen.

Und wie immer finden Sie alle Codebeispiele auf GitHub.