Anleitung zu Stream.reduce ()

1. Übersicht

Die Stream-API bietet ein umfangreiches Repertoire an Zwischen-, Reduktions- und Terminalfunktionen, die auch die Parallelisierung unterstützen.

Insbesondere ermöglichen es Reduktionsstromoperationen, ein einzelnes Ergebnis aus einer Folge von Elementen zu erzeugen , indem wiederholt eine Kombinationsoperation auf die Elemente in der Folge angewendet wird.

In diesem Tutorial sehen wir uns die allgemeine Operation Stream.reduce () an und sehen sie in einigen konkreten Anwendungsfällen.

2. Die Schlüsselkonzepte: Identität, Akkumulator und Kombinierer

Bevor wir uns eingehender mit der Verwendung der Operation Stream.reduce () befassen, teilen wir die Teilnehmerelemente der Operation in separate Blöcke auf. Auf diese Weise werden wir die Rolle, die jeder spielt, leichter verstehen:

  • Identität - Ein Element, das der Anfangswert der Reduktionsoperation und das Standardergebnis ist, wenn der Stream leer ist
  • Akkumulator - eine Funktion, die zwei Parameter akzeptiert: ein Teilergebnis der Reduktionsoperation und das nächste Element des Streams
  • Kombinierer - Eine Funktion, die verwendet wird, um das Teilergebnis der Reduktionsoperation zu kombinieren, wenn die Reduktion parallelisiert ist oder wenn eine Nichtübereinstimmung zwischen den Typen der Akkumulatorargumente und den Typen der Akkumulatorimplementierung besteht

3. Verwenden von Stream.reduce ()

Schauen wir uns einige grundlegende Beispiele an, um die Funktionalität der Identitäts-, Akkumulator- und Kombiniererelemente besser zu verstehen:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);

In diesem Fall ist der Integer- Wert 0 die Identität. Es speichert den Anfangswert der Reduktionsoperation und auch das Standardergebnis, wenn der Strom von Ganzzahlwerten leer ist.

Ebenso der Lambda-Ausdruck :

subtotal, element -> subtotal + element

ist der Akkumulator , da er die Teilsumme der Integer- Werte und das nächste Element im Stream verwendet.

Um den Code noch präziser zu gestalten, können wir anstelle eines Lambda-Ausdrucks eine Methodenreferenz verwenden:

int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);

Natürlich können wir eine redu () - Operation für Streams verwenden, die andere Arten von Elementen enthalten.

Zum Beispiel können wir redu () für ein Array von String- Elementen verwenden und sie zu einem einzigen Ergebnis zusammenfügen:

List letters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");

Ebenso können wir zu der Version wechseln, die eine Methodenreferenz verwendet:

String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");

Verwenden Sie die Operation redu () , um die Großbuchstaben des Buchstabenarrays zu verbinden :

String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");

Darüber hinaus können wir redu () in einem parallelisierten Stream verwenden (dazu später mehr):

List ages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);

Wenn ein Stream parallel ausgeführt wird, teilt die Java-Laufzeit den Stream in mehrere Teilströme auf. In solchen Fällen müssen wir eine Funktion verwenden, um die Ergebnisse der Teilströme zu einem einzigen zu kombinieren . Dies ist die Rolle des Kombinierers - im obigen Snippet ist es die Integer :: sum- Methodenreferenz.

Komischerweise wird dieser Code nicht kompiliert:

List users = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge()); 

In diesem Fall haben wir einen Stream von Benutzerobjekten , und die Typen der Akkumulatorargumente sind Integer und User. Die Akkumulatorimplementierung ist jedoch eine Summe von Ganzzahlen, sodass der Compiler den Typ des Benutzerparameters einfach nicht ableiten kann.

Wir können dieses Problem mithilfe eines Kombinierers beheben:

int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);

Um es einfach auszudrücken: Wenn wir sequentielle Streams verwenden und die Typen der Akkumulatorargumente und die Typen ihrer Implementierung übereinstimmen, müssen wir keinen Kombinierer verwenden .

4. Parallel reduzieren

Wie wir zuvor gelernt haben, können wir reduct () für parallelisierte Streams verwenden.

Wenn wir parallelisierte Streams verwenden, sollten wir sicherstellen, dass redu () oder andere aggregierte Operationen, die für die Streams ausgeführt werden, folgende sind:

  • assoziativ : Das Ergebnis wird durch die Reihenfolge der Operanden nicht beeinflusst
  • Nicht störend : Der Vorgang wirkt sich nicht auf die Datenquelle aus
  • zustandslos und deterministisch : Die Operation hat keinen Status und erzeugt für eine bestimmte Eingabe dieselbe Ausgabe

Wir sollten alle diese Bedingungen erfüllen, um unvorhersehbare Ergebnisse zu vermeiden.

Wie erwartet werden Operationen, die an parallelisierten Streams ausgeführt werden, einschließlich redu (), parallel ausgeführt, wodurch Mehrkern-Hardwarearchitekturen genutzt werden.

Aus offensichtlichen Gründen sind parallelisierte Streams viel leistungsfähiger als die sequentiellen Gegenstücke . Trotzdem können sie übertrieben sein, wenn die auf den Stream angewendeten Vorgänge nicht teuer sind oder die Anzahl der Elemente im Stream gering ist.

Parallelisierte Streams sind natürlich der richtige Weg, wenn wir mit großen Streams arbeiten und teure Aggregatoperationen ausführen müssen.

Erstellen wir einen einfachen JMH-Benchmark-Test (Java Microbenchmark Harness) und vergleichen Sie die jeweiligen Ausführungszeiten, wenn Sie die Operation redu () für einen sequentiellen und einen parallelisierten Stream verwenden:

@State(Scope.Thread) private final List userList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } 

In the above JMH benchmark, we compare execution average times. We simply create a List containing a large number of User objects. Next, we call reduce() on a sequential and a parallelized stream and check that the latter performs faster than the former (in seconds-per-operation).

These are our benchmark results:

Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op

5. Throwing and Handling Exceptions While Reducing

In the above examples, the reduce() operation doesn't throw any exceptions. But it might, of course.

For instance, say that we need to divide all the elements of a stream by a supplied factor and then sum them:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider); 

This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.

We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:

public static int divideListElements(List values, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }

While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.

To fix this issue, we can use the extract function refactoring technique, and extract the try/catch block into a separate method:

private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result } 

Now, the implementation of the divideListElements() method is again clean and streamlined:

public static int divideListElements(List values, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); } 

Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Let's also test the divideListElements() method, when the supplied List of Integer values contains a 0:

List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21); 

Finally, let's test the method implementation when the divider is 0, too:

List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);

6. Complex Custom Objects

We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator, and combiner for the data type.

Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.

First, let's start with our Review object. Each Review should contain a simple comment and score:

public class Review { private int points; private String review; // constructor, getters and setters }

Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:

public class Rating { double points; List reviews = new ArrayList(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }

We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.

Next, let's define a list of Users, each with their own sets of reviews.

User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); List users = Arrays.asList(john, julie); 

Nachdem John und Julie berücksichtigt wurden, verwenden wir Stream.reduce () , um eine durchschnittliche Bewertung für beide Benutzer zu berechnen. Lassen Sie uns als Identität eine neue Bewertung zurückgeben, wenn unsere Eingabeliste leer ist :

Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);

Wenn wir rechnen, sollten wir feststellen, dass die durchschnittliche Punktzahl 3,6 beträgt:

assertThat(averageRating.getPoints()).isEqualTo(3.6);

7. Fazit

In diesem Tutorial haben wir gelernt, wie man die Operation Stream.reduce () verwendet. Darüber hinaus haben wir gelernt, wie Reduzierungen für sequentielle und parallelisierte Streams durchgeführt werden und wie Ausnahmen beim Reduzieren behandelt werden .

Wie üblich sind alle in diesem Tutorial gezeigten Codebeispiele auf GitHub verfügbar.