Einführung in Netflix Mantis

1. Übersicht

In diesem Artikel werfen wir einen Blick auf die von Netflix entwickelte Mantis-Plattform.

Wir werden die wichtigsten Mantis-Konzepte untersuchen, indem wir einen Stream-Verarbeitungsjob erstellen, ausführen und untersuchen.

2. Was ist Mantis?

Mantis ist eine Plattform zum Erstellen von Stream-Processing-Anwendungen (Jobs). Es bietet eine einfache Möglichkeit , die Bereitstellung und den Lebenszyklus von Jobs zu verwalten. Darüber hinaus erleichtert es die Ressourcenzuweisung, Ermittlung und Kommunikation zwischen diesen Jobs.

Daher können sich Entwickler auf die eigentliche Geschäftslogik konzentrieren und gleichzeitig die Unterstützung einer robusten und skalierbaren Plattform erhalten, um ihre nicht blockierenden Anwendungen mit hohem Volumen und geringer Latenz auszuführen.

Ein Mantis-Job besteht aus drei Teilen:

  • die Quelle , die für das Abrufen der Daten von einer externen Quelle verantwortlich ist
  • eine oder mehrere Stufen , die für die Verarbeitung der eingehenden Ereignisströme verantwortlich sind
  • und eine Senke , die die verarbeiteten Daten sammelt

Lassen Sie uns nun jeden von ihnen untersuchen.

3. Setup und Abhängigkeiten

Beginnen wir mit dem Hinzufügen der Abhängigkeiten von mantis- runtime und jackson-databaseind :

 io.mantisrx mantis-runtime   com.fasterxml.jackson.core jackson-databind 

Um die Datenquelle unseres Jobs einzurichten, implementieren wir nun die Mantis Source- Schnittstelle:

public class RandomLogSource implements Source { @Override public Observable
    
      call(Context context, Index index) { return Observable.just( Observable .interval(250, TimeUnit.MILLISECONDS) .map(this::createRandomLogEvent)); } private String createRandomLogEvent(Long tick) { // generate a random log entry string ... } }
    

Wie wir sehen können, werden einfach mehrmals pro Sekunde zufällige Protokolleinträge generiert.

4. Unser erster Job

Erstellen wir jetzt einen Mantis-Job, der einfach Protokollereignisse aus unserer RandomLogSource sammelt . Später werden wir Gruppen- und Aggregationstransformationen hinzufügen, um ein komplexeres und interessanteres Ergebnis zu erzielen.

Lassen Sie uns zunächst eine LogEvent- Entität erstellen :

public class LogEvent implements JsonType { private Long index; private String level; private String message; // ... }

Fügen wir dann unsere TransformLogStage hinzu.

Es ist eine einfache Phase, in der die ScalarComputation-Schnittstelle implementiert und ein Protokolleintrag aufgeteilt wird, um ein LogEvent zu erstellen . Außerdem werden falsch formatierte Zeichenfolgen herausgefiltert:

public class TransformLogStage implements ScalarComputation { @Override public Observable call(Context context, Observable logEntry) { return logEntry .map(log -> log.split("#")) .filter(parts -> parts.length == 3) .map(LogEvent::new); } }

4.1. Ausführen des Jobs

Zu diesem Zeitpunkt haben wir genug Bausteine, um unseren Mantis-Job zusammenzustellen:

public class LogCollectingJob extends MantisJobProvider { @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), new ScalarToScalar.Config()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }

Schauen wir uns unseren Job genauer an.

Wie wir sehen können, erweitert es MantisJobProvider. Zunächst werden Daten aus unserer RandomLogSource abgerufen und die TransformLogStage auf die abgerufenen Daten angewendet . Schließlich sendet es die verarbeiteten Daten an die eingebaute Senke, die eifrig Daten über SSE abonniert und liefert.

Jetzt konfigurieren wir unseren Job so, dass er beim Start lokal ausgeführt wird:

@SpringBootApplication public class MantisApplication implements CommandLineRunner { // ... @Override public void run(String... args) { LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance()); } }

Lassen Sie uns die Anwendung ausführen. Wir sehen eine Protokollnachricht wie:

... Serving modern HTTP SSE server sink on port: 86XX

Lassen Sie uns nun mit Curl eine Verbindung zum Waschbecken herstellen :

$ curl localhost:86XX data: {"index":86,"level":"WARN","message":"login attempt"} data: {"index":87,"level":"ERROR","message":"user created"} data: {"index":88,"level":"INFO","message":"user created"} data: {"index":89,"level":"INFO","message":"login attempt"} data: {"index":90,"level":"INFO","message":"user created"} data: {"index":91,"level":"ERROR","message":"user created"} data: {"index":92,"level":"WARN","message":"login attempt"} data: {"index":93,"level":"INFO","message":"user created"} ...

4.2. Waschbecken konfigurieren

Bisher haben wir die eingebaute Spüle zum Sammeln unserer verarbeiteten Daten verwendet. Mal sehen, ob wir unserem Szenario mehr Flexibilität verleihen können, indem wir eine benutzerdefinierte Senke bereitstellen.

Was ist, wenn wir beispielsweise Protokolle nach Nachrichten filtern möchten ?

Erstellen wir einen LogSink , der die Sink- Schnittstelle implementiert :

public class LogSink implements Sink { @Override public void call(Context context, PortRequest portRequest, Observable logEventObservable) { SelfDocumentingSink sink = new ServerSentEventsSink.Builder() .withEncoder(LogEvent::toJsonString) .withPredicate(filterByLogMessage()) .build(); logEventObservable.subscribe(); sink.call(context, portRequest, logEventObservable); } private Predicate filterByLogMessage() { return new Predicate("filter by message", parameters -> { if (parameters != null && parameters.containsKey("filter")) { return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0)); } return logEvent -> true; }); } }

In dieser Senkenimplementierung haben wir ein Prädikat konfiguriert, das den Filterparameter verwendet , um nur Protokolle abzurufen, die den im Filterparameter festgelegten Text enthalten :

$ curl localhost:8874?filter=login data: {"index":93,"level":"ERROR","message":"login attempt"} data: {"index":95,"level":"INFO","message":"login attempt"} data: {"index":97,"level":"ERROR","message":"login attempt"} ...

Hinweis Mantis bietet auch eine leistungsstarke Abfragesprache, MQL, die zum Abfragen, Transformieren und Analysieren von Stream-Daten auf SQL-Weise verwendet werden kann.

5. Bühnenverkettung

Lassen Sie uns nun an , wir wissen , wie viele interessiert sind ERROR , WARN oder INFO - Protokolleinträge haben wir in einem gegebenen Zeitintervall. Dazu fügen wir unserem Job zwei weitere Stufen hinzu und verketten sie.

5.1. Gruppierung

Zunächst erstellen wir eine GroupLogStage.

This stage is a ToGroupComputation implementation that receives a LogEvent stream data from the existing TransformLogStage. After that, it groups entries by logging level and sends them to the next stage:

public class GroupLogStage implements ToGroupComputation { @Override public Observable
    
      call(Context context, Observable logEvent) { return logEvent.map(log -> new MantisGroup(log.getLevel(), log)); } public static ScalarToGroup.Config config(){ return new ScalarToGroup.Config() .description("Group event data by level") .codec(JacksonCodecs.pojo(LogEvent.class)) .concurrentInput(); } }
    

We've also created a custom stage config by providing a description, the codec to use for serializing the output, and allowed this stage's call method to run concurrently by using concurrentInput().

One thing to note is that this stage is horizontally scalable. Meaning we can run as many instances of this stage as needed. Also worth mentioning, when deployed in a Mantis cluster, this stage sends data to the next stage so that all events belonging to a particular group will land on the same worker of the next stage.

5.2. Aggregating

Before we move on and create the next stage, let's first add a LogAggregate entity:

public class LogAggregate implements JsonType { private final Integer count; private final String level; }

Now, let's create the last stage in the chain.

This stage implements GroupToScalarComputation and transforms a stream of log groups to a scalar LogAggregate. It does this by counting how many times each type of log appears in the stream. In addition, it also has a LogAggregationDuration parameter, which can be used to control the size of the aggregation window:

public class CountLogStage implements GroupToScalarComputation { private int duration; @Override public void init(Context context) { duration = (int)context.getParameters().get("LogAggregationDuration", 1000); } @Override public Observable call(Context context, Observable
    
      mantisGroup) { return mantisGroup .window(duration, TimeUnit.MILLISECONDS) .flatMap(o -> o.groupBy(MantisGroup::getKeyValue) .flatMap(group -> group.reduce(0, (count, value) -> count = count + 1) .map((count) -> new LogAggregate(count, group.getKey())) )); } public static GroupToScalar.Config config(){ return new GroupToScalar.Config() .description("sum events for a log level") .codec(JacksonCodecs.pojo(LogAggregate.class)) .withParameters(getParameters()); } public static List
     
       getParameters() { List
      
        params = new ArrayList(); params.add(new IntParameter() .name("LogAggregationDuration") .description("window size for aggregation in milliseconds") .validator(Validators.range(100, 10000)) .defaultValue(5000) .build()); return params; } }
      
     
    

5.3. Configure and Run the Job

The only thing left to do now is to configure our job:

public class LogAggregationJob extends MantisJobProvider { @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), TransformLogStage.stageConfig()) .stage(new GroupLogStage(), GroupLogStage.config()) .stage(new CountLogStage(), CountLogStage.config()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }

As soon as we run the application and execute our new job, we can see the log counts being retrieved every few seconds:

$ curl localhost:8133 data: {"count":3,"level":"ERROR"} data: {"count":13,"level":"INFO"} data: {"count":4,"level":"WARN"} data: {"count":8,"level":"ERROR"} data: {"count":5,"level":"INFO"} data: {"count":7,"level":"WARN"} ...

6. Conclusion

To sum up, in this article, we've seen what Netflix Mantis is and what it can be used for. Furthermore, we looked at the main concepts, used them to build jobs, and explored custom configurations for different scenarios.

Wie immer ist der vollständige Code auf GitHub verfügbar.