Avatar of Jan Seeger

Als Diskussionsgrundlage verwenden wir die Implementierung einer paginierten Datenbankabfrage sowohl via Stream als auch via Iterator, damit wir die grundlegenden Unterschiede sehen können. Bei Paginierung werden die Ergebnisse nicht auf einmal aus der Datenbank geholt, sondern „Stück für Stück“. Wir kennen dieses Vorgehen aus der Google-Suche, auch wenn ich normalerweise alle Ergebnisse nach Seite Eins ignoriere.

Paginierung hat den Vorteil, dass große Ergebnismengen verarbeitet werden können, ohne die Daten komplett in den Arbeitsspeicher zu laden. Nach dem Verarbeiten eines Teilstücks können die verarbeiteten Daten verworfen, und das nächste Teilstück verarbeitet werden. Als Beispiel sei folgende Tabelle angegeben, bei der wir die Summe der Werte bestimmen wollen (und das zum Wohle des Beispiels nicht in der Datenbank tun). Stellvertretend soll dies für kompliziertere Operationen stehen, die nicht einfach in der Datenbank ausgeführt werden können. Die folgende Tabelle zeigt ein paar Zeilen Datenbank für unsere Demozwecke. Wie durch die Punkte impliziert, wird die eigentliche Datenbank natürlich deutlich größer sein.

Tabelle 1: Beispieldatenbank
ID Wert
1 15
2 24
3 45
4 38

Unpaginiert würden wir die gesamte Datenbank abfragen, und die Werte addieren. Dies würde bedeuten, dass wir die komplette lange Liste im Hauptspeicher halten müssen. Bei einer paginierten Abfrage würden wir immer z.B. 10 Werte abfragen, summieren, und dann die nächsten 10 Werte abfragen, und so weiter. Bei einer paginierten Vorgehensweise müssen wir also nie alle Daten vorhalten, und können auch Datenmengen verarbeiten, die nicht in den Hauptspeicher passen. Natürlich geht mit diesem Vorgehen auch ein gewisser Overhead einher, weil ja wiederholt Abfragen an die Datenbank geschickt werden, und somit mehr Zeit für round trips zwischen Server und Datenbank verbraucht wird.

SELECT wert FROM t1 ORDER BY id OFFSET 0 LIMIT 10;
Listing 1

In SQL können wir mit den „LIMIT“ und „OFFSET“-Parametern paginieren. Wir fragen also erst die ersten 10 Werte ab („LIMIT 10 OFFSET 0“), dann die nächsten 10 („LIMIT 10 OFFSET 10“), und so weiter. Listing 1 zeigt das Query für unsere Beispieldatenbank. Wichtig ist hier, dass die Abfrage geordnet ist, da sonst die Reihenfolge der Ergebnisse nicht definiert ist, und wir ohne konsistente Reihenfolge keine korrekte Paginierung erhalten.

Es gibt bessere Wege zur Paginierung, diese sind jedoch komplizierter zu implementieren, und stärker vom verwendeten Datenbanksystem abhängig: Eine Paginierung via Cursor oder Keyset pagination wird meistens vorzuziehen sein, die Implementierung würde jedoch den Rahmen dieses Artikels sprengen. Das generelle Schema der Iteration bleibt jedoch erhalten, und die Implementierung einer paginierten Abfrage wäre ähnlich.

Implementierung der Paginierung

Das Ziel unserer Implementierung ist es, ein Query mit großer Ergebnismenge so auszuführen, dass der Speicherbedarf der Lösung konstant ist, d.h. zu keiner Zeit mehr als x Ergebnisse vorgehalten werden, egal wie groß die gesamte Ergebnismenge ist. Der Benutzer sollte von dieser Eigenschaft nichts merken, und sollte mit dem Ergebnis-Stream bzw. Iterator umgehen können wie mit einem „normalen“ Stream oder Iterator. Natürlich kann man dies von Hand implementieren, aber Java gibt einem mächtige APIs in die Hand, damit man dies nicht tun muss. Ab in den Code also!

Paginierung mit Streams

Zuerst betrachten wir die Streams-Implementierung, in der natürlich völlig unvoreingenommenen Ansicht des Autors die elegantere und kürzere Lösung.

List<Result> runQuery(int offset) {...}; // Dummy-Implementierung

Stream.iterate(0, (skip) -> skip + 10)
    .map(
         (offset) ->
         runQuery(offset))
    .takeWhile((result) -> result.size() > 0)
    .flatMap(Collection::stream)
    .mapToInt((result) -> (int)result.get("wert"))
    .sum();
Listing 2: Paginierung via Streams-API

Was tun wir? Zuerst erzeugen wir in Zeile 1 einen unendlichen Stream von Zahlen, die mögliche Offsets angeben. Dies tun wir, indem wir die Funktion „x + 10“ iterativ anwenden. Das erste Ergebnis ist 0 (das erste Argument von iterate), wir addieren 10 drauf, das Ergebnis ist 10, wir addieren 10, das Ergebnis ist 20, und so weiter, und so fort. Was machen wir nun mit diesen Offsets?

Wir mappen unser Query über die Offsets und geben die Ergebnisse zurück. Dies bedeutet, dass wir unser Query mit jedem Offset einmal ausführen — natürlich nicht sofort, sonst hätten wir eine unendliche Schleife. takeWhile() macht unseren Stream jetzt verwendbar: Der Integer-Stream ist unendlich lang, und der gemappte Stream ist natürlich auch unendlich lang. Unterbrechen tun wir die Unendlichkeit jetzt mit takeWhile(), das solange Elemente aus dem Stream entnimmt, wie es die Bedingung sagt. Unsere Bedingung ist „query hat mehr als 0 Ergebnisse“. Wenn ein Query keine Ergebnisse mehr hat, sind im Moment keine Daten verfügbar, die wir zurückgeben können. Wir können also aufhören, unseren Stream zu lesen, sobald das erste Mal eine leere Ergebnisliste zurückkommt. Wir haben jetzt eine Liste von Ergebnislisten, die machen wir mit flatMap() zu einer Liste von Ergebnissen, operieren mit mapToInt() unseren Wert aus dem Ergebnis, und summieren ihn mit sum().

Paginierung mit Iteratoren

List<Result> runQuery(int offset) {...}; // Dummy-Implementierung

var iterator = new Iterator<Result>() {
        private Iterator<Result> subIter = runQuery(0).iterator();
        private int offset = 0;
        @Override
        public boolean hasNext() {
            if (subIter.hasNext())
                return true;
            offset += 10;
            subIter = runQuery(offset).iterator();
            return subIter.hasNext();
        }
        @Override
        public Integer next() {
            return subIter.next();
        }
    };

var sum = 0;
while (iterator.hasNext())
    sum += iterator.next().getValue("wert");
Listing 3: Paginierung vermittelst Iteratoren

Listing 3 zeigt zum Vergleich die Paginierung mittels Iterator-API. Was tun wir mit diesem vielen Code? Wir erzeugen einen neuen Iterator, der unser Query paginiert. Voraussetzung (wie beim Streams-Beispiel oben) ist natürlich, dass das Query einen Offset-Parameter unterstützt und eine Liste an Ergebnissen zurückliefert. Wir bauen uns dann einen neuen Iterator, der das Query paginiert ausführt. Iteratoren benötigen zwei Methoden, hasNext() und next(). hasNext() gibt genau dann wahr zurück, wenn der Iterator noch Elemente hat, next() gibt ein Element zurück, wenn es noch eins gibt, ansonsten wirft es eine NoSuchElementException. Wir implementieren hasNext() jetzt einfach über einen unterliegenden Iterator auf unserer Query-Liste. Wenn diese Liste noch Ergebnisse enthält, können wir einfach auf diesem Iterator next() aufrufen. Wenn die Liste keine Ergebnisse mehr enthält, müssen wir uns eine neue Liste holen. Wir geben dann hasNext() der neuen Liste zurück. Wenn die neue Liste leer ist, ist unsere Iteration zu Ende, wenn noch Elemente enthalten sind, können wir weitermachen. Die next()-Implementierung ist entsprechend einfach, weil hasNext() sich um den unterliegenden Iterator kümmert, und nur true zurückgibt, wenn dieser weiter iterierbar ist.

Ergonomie

An diesem Beispiel kann man meiner Meinung nach die „ästhetischen“ Unterschiede zwischen Iteratoren und Streams sehen: Der Stream-Code ist sehr viel „flüssiger“, und erlaubt es mir, über die Daten und Operationen nachzudenken. Beim Iteratoren-Beispiel verschwindet die eigentliche Logik hinter einem Wald aus Boilerplate, anonymen Klassen, while-Schleifen und anderem.

Ergonomie also: Streams: 1, Iteratoren 0!

Man muss natürlich anmerken, dass das Beispiel klein und recht gut auf Streams zugeschnitten ist. Mit externen Bibliotheken wie z.B. Google Guava kann man auch Iteratoren besser handhabbar machen, sodass das Beispiel dann fast gleich ausschaut.

Ein anderer Gesichtspunkt ist die Kontrolle über die Evaluation: Streams sind lazy, d.h. die „intermediate“-Operationen eines Streams werden erst dann ausgeführt, wenn eine „terminale“ Operation auf dem Stream aufgerufen wird. Wir haben dies im Beispiel oben gesehen, bei dem die Map über den Stream erst aufgerufen wird, wenn sum() ausgeführt wird. Bei Iteratoren kann man meistens davon ausgehen, dass „teurer“ Code im next()-Aufruf ausgeführt wird. Bei Streams weiß man dies nicht so genau, und man sollte sich genau überlegen, wann man den Stream konsumiert. Meine persönliche Meinung ist, dass ich Streams für einfache Transformationen bevorzuge, wenn funktionale Unterschiede keine Rolle spielen.

Es gibt natürlich auch noch andere Unterschiede zwischen Streams und Iteratoren. Über diese lässt sich zwar nicht so trefflich diskutieren wie über die Code-Schönheit, relevant sind sie jedoch allemal. Wir gehen deshalb auch noch auf die Unterschiede bei Mutabilität und Parallelisierung von Streams und Iteratoren ein.

Parallelisierung

Im Gegensatz zu Iteratoren sind Streams einfach parallelisierbar. Um einen Iterator zu parallelisieren, muss man sich selber mit thread pools und Nebenläufigkeit herumschlagen. Bei Streams kann ich statt stream() einfach parallelStream() aufrufen, und erhalte eine parallelisierbare Variante meiner Transformation.

Hierbei muss man allerdings gut darauf achten, welche Operationen man auf dem Stream ausführt, und welche Reihenfolge der Stream-Elemente man erwartet. Ein Stream kann geordnet oder ungeordnet sein, je nach Quelle des Streams. So gibt List.of(1, 2, 3, 4).stream().forEachOrdered(System.out::println) stets „1, 2, 3, 4“ aus, Set.of(1, 2, 3, 4).stream().forEachOrdered(System.out::println) gibt die Elemente in einer zufälligen Reihenfolge aus. Parallelismus ändert die Sortierung des Streams nicht, solange ich nicht unordered() aufrufe. Es ändert jedoch die Reihenfolge, in der Operationen angewendet werden.

ForEach ist hier ein Sonderfall, da es bei parallelen Streams explizit die Sortierung des Streams ignoriert, egal ob ordered oder unordered. List.of(1,2,3).stream().parallel().forEach(System.out::println) kann also die Zahlen in einer beliebigen Reihenfolge ausgeben.

Ich muss also darauf achten, dass die Lambdas, die ich in meiner Pipeline verwende, den Anforderungen der API gerecht werden. So produziert zum Beispiel der Code in Listing 5 auf parallelen und nicht-parallelen Streams unterschiedliche Ergebnisse.

List.of(1, 2, 3).stream().reduce(0, (x, y) -> x - y)
    == (((0 - 1) - 2) - 3)
    == -6;

List.of(1, 2, 3).parallelStream().reduce(0, (x, y) -> x - y)
    == (0 - (1 - (2 - 3)))
    == -2;
Listing 4: Parallelisierung und ihre Tücken

Beide Ergebnisse sind Zufall: reduce gibt keine Reihenfolge vor, in der Operationen ausgeführt werden, und hier kann man diesen Unterschied sehen. In Listing 5 kommt das davon, dass 0 keine Identität von - ist (0 - x ≠ x), und dass - nicht assoziativ ist ((x - y) - z ≠ x -(y - z)). Beide Bedingungen sind in der Dokumentation von reduce() angegeben. Wenn man diese Seitenbedingungen nicht erfüllt, kann es bei der Parallelisierung zu interessanten Ergebnissen kommen, es lohnt sich also stets, die Anforderungen von (Terminal)-Operationen zu kennen. Bei Iteratoren hat man dieses Problem nicht, muss sich jedoch selber um die Feinheiten der Parallelisierung kümmern, welches meiner Meinung nach viel größere Sorgfalt voraussetzt.

Die Streams-API ist inhärent für Parallelität designt, weshalb man bei Nicht-Nutzung von Parallelitäts-Features vereinzelt auf Eigenschaften stößt, die sich nicht unbedingt erschließen. So kann man man z.B. nicht direkt einen Stream aus einem Iterator erzeugen, sondern muss erst aus dem Iterator einen Spliterator machen, der dann mittels StreamSupport.stream zu einem Stream wird. Man kann die Parallelität eines Streams zwar mit isParallel() abfragen, diese Eigenschaften des Streams sind jedoch nicht im Typsystem hinterlegt, weshalb alle Operationen des Streams potentiell parallel ausgeführt werden können müssen.

Neben Parallelität ist der Umgang mit veränderlichen Daten ein anderer wichtiger Unterschied zwischen Streams und Iteratoren.

Mutabilität

Im Gegensatz zu Streams kann man über den Iterator die unterliegende Quelle während der Iteration strukturell verändern, indem man Elemente entfernt. Der (sinnfreie) Code in Listing 5 ist also kein Problem.

var list = new ArrayList<>(List.of(1, 2, 3));
var iter = list.iterator();
while (iter.hasNext()) {
    System.out.println(iter.next());
    iter.remove();
}
System.out.println(list.size()); // 0
Listing 5: Veränderung während der Iteration

Nach der Iteration ist die Liste leer, und der Iterator ist durchgelaufen. Dies ist ein Sonderfall, den die Iterator-API anbietet. Modifikation von list (z.B. über „add“) lösen weiterhin eine ConcurrentModificationException aus.

Das Problem kann jedoch auch bei sinnvollem Code auftreten: Listing 6 versucht, diesen Umstand zu zeigen. Bei der Stream-Variante wird im Hintergrund eine zweite Liste erzeugt, um das Ergebnis aufzubewahren. Am Ende der Schleife ist diese Liste halb so groß wie die Original-Liste. Ein ausreichend schlauer Compiler kann dies zwar unter Umständen wegoptimieren, darauf verlassen würde ich mich jedoch nicht.

Die Iterator-Variante hingegen benötigt maximal so viel Platz wie die Original-Liste, da Elemente sich immer nur in einer der beiden Listen langelisteoder result befinden. Der Stream-Verteidiger wird natürlich argumentieren, dass langeliste selbst in einem Stream vorgehalten werden sollte, in diesem Beispiel ist die Liste allerdings von der API vorgegeben, ein Fall, der in echtem Code häufig auftreten wird.

var langeliste = new ArrayLis<Integer>(...);
langeliste = langeliste.stream()
    .filter((x) -> x % 2 == 0)
    .collect(Collectors.toList());
var langeliste = new ArrayList<Integer>(...);
var result = new ArrayList<Integer>();
var iter = langeliste.iterator();
while (iter.hasNext()) {
    var value = iter.next();
    iter.remove();
    if (value % 2 == 0)
        result.add(value);
}
Listing 6: Platz sparen mit Iteratoren

Wenn also Performance-Optimierungen fällig sind, lohnt es sich unter Umständen, Iteratoren zu benutzen, da man hier die Kontrolle über die Evaluation hat, und auch Modifikationen möglich sind. Natürlich nur, wenn man über veränderliche Listen iteriert.

Fazit

Das andere Ziel der Streams-API ist es natürlich, performante Datenverarbeitung im großen Stil zu ermöglichen. Hier ist mir mangels Erfahrung noch nicht klar, ob die in diesem Artikel beschriebenen Einschränkungen von Streams relevanter werden. Meine Vermutung ist, dass bei Beachtung einiger Seitenbedingungen die Streams-API genauso schön ist wie bei kleinen Problemen, belegen kann ich dies jedoch noch nicht.

Sprechen Sie mit uns

TAGS