Reaktive Programmierung – Teil 2

#Java #ReaktiveProgrammierung #SpringReactorCore

Reaktive Architekturen werden immer beliebter. Beim Einzug in das Big-Data- und Cloud-Zeitalter entfalten sie Ihre Stärke. Neue syntaktische Elemente von JDK8 wie Lambda und Streams erhören die Akzeptanz und glätten die Lernkurve aus. Der zweite Teil dieser Serie bietet eine Übersicht über parallele Verarbeitung von Daten, Testen von Publisher und Subscriber, Konzept von Back-Pressure und Zukunft der reaktiven APIs.

Parallele Verarbeitung

Zusätzlich zu den Publisher-Operationen besteht im Spring Reactor 3 die Möglichkeit, ähnlich wie bei der Stream-API, die Verarbeitung parallel auszuführen, um so die Operation auf mehrere Maschinen-Kerne (CPUs) zu verteilen. Dafür wird nach der Operation die Methode parallel aufgerufen, die einen Publisher vom Typ ParallelFlux liefert (Abb. 1).

Operation “parallel” (Abb. 1).

Danach wird mit der Methode runOn angegeben, mit welchem Scheduler die Operation ausgeführt werden soll. In den meisten Fällen reicht ein mit Hilfe von Schedulers.parallel() erzeugter Scheduler aus. Der Scheduler verwaltet einen festen Pool von Single-Threaded ExecutorService basierten Workers und ist somit für die parallele Arbeit geeignet. Es gibt auch Anwendungsfälle, wo ein mit Hilfe von Javas Executor-Framework selbst erzeugter Scheduler zum Einsatz kommt.

Anschließend soll man den Reduce-Schritt manuell durchführen. Dafür wird die Methode sequential eingesetzt, die die Ergebnisse wieder zusammenführt. Aufbauend auf unserem Beispiel mit der zip-Operation aus Teil 1 sieht der Code für parallele Bearbeitung wie folgt aus.

Flux<String> titles = Flux.just(„Print 9×13“, „Photobook A4“);
Flux<Double> prices = Flux.just(0.09, 29.99);
Flux<Tuple2<String, Double>> zipped = Flux
  .zip(titles, prices)
  .parallel() // optional with number of CPUs
  .runOn(Schedulers.parallel())
  .sequential();
zipped.subscribe(System.out::print);

— AUSGABE —
[Print 9×13,0.09][Photobook A4,29.99]

Zusammenführen von Cold-Streams

Eine gewisse Herausforderung stellt die Aufgabe dar, mehrere Streams miteinander zu kombinieren. Eine Variante dieser Aufgabe wäre auch eine Umwandlung eines Streams, während weitere Datenquellen angezapft werden müssen. Ein klassisches Use-Case wäre das Nachladen von Daten aus einer anderen Quelle. Z.B. das Laden von Metadaten aus der Datenbank anhand von vorhandenen IDs. Die einfachste Lösung wäre ein synchrones Laden mittels Operation map.

public static MetaData loadingMetadata(int id) {
  MetaData result = dao.loadFromDatabase(id);
  return result;
}

Flux<Integer> ids = Flux.just(1001, 1002, 1003);
Flux<MetaData> metadata = ids.map(id->loadMetadata(id));

Wollen wir von der parallelen Verarbeitung profitieren, muss die Operation flatMap in einem neuen Mono-Publisher stattfinden. Im Falle eines separaten Publishers kann mithilfe der subscribeOn
Methode ein Scheduler zugeschaltet werden.

public static Mono<MetaData> loadMetadata(int id) {
  Mono<MetaData> result = Mono.create(s -> {
    s.next(dao.loadFromDatabase(id));
    s.complete();
  });

return result.subscribeOn(scheduler);

}

Flux<Integer> ids = Flux.just(1001, 1002, 1003);
Flux<MetaData> metadata = ids.flatMap(id->loadMetadata(id));

In beiden Fällen hat man das Problem, dass die Dateien einzeln geholt werden. Diese Vorgehensweise ist aufgrund der höheren Netzwerklast selten optimal. Eine Abhilfe schafft die Operation buffer. Im folgenden Listing erfolgt das Laden in Gruppen je 10 Stück.

public static Flux<MetaData> loadMetadata(Collection<Integer>ids) {
  Flux<MetaData> result = Flux.create(s -> {
    for (Metadata m: loadMetadata(ids)) {
     s.next(m);
    }
    s.complete();
  });
  return result.subscribeOn(scheduler);
}

Flux<Integer> ids = Flux.just(1001, 1002, 1003);
Flux<MetaData> metadata = ids.buffer(10).flatMap(id -> loadMetadata(id));

Bis jetzt erfolgte das Laden der Metadaten nur beim Antreffen von IDs. Eine besonders interessante Operation erlaubt das Zusammenfügen von unterschiedlichen Quellen, deren Laden unabhängig voneinander über mehrere Publishers erfolgt. Im folgenden Listing führen wir die geladenen Metadaten eines Bildes (Dimensions, Format, Geo-Information, etc.) und die eigentliche Bilddatei zusammen.

Flux<Metadata> metadata = … // geladen aus der Datenbank
Flux<File> files = …        // geladen vom File-Server

Flux<Long> wait = Flux.never();

Flux<Pair<MetaData, File>> pairs = metadata
  .join(files, (m)->wait, (f)->wait, Pair::of)
  .filter(p->p.left.id == p.right.id);

Operation join, deren Diagramm man in (Abb. 2) findet, fügt unabhängige Quellen zusammen. Im Gegensatz zur Operation zip, erzeugt join in unserem Beispiel alle möglichen Kombinationen.
Etwa so, wie es auch der SQL-Befehl INNER JOIN tun würde.

Operation „join“ (Abb. 2).

Genauso wie beim SQL ergibt ein blindes Zusammenführen von zwei Quellen wenig Sinn, deswegen muss ein solcher Stream nachträglich gefiltert werden.
In der klassischen Programmierung würde das Erstellen von allen Kombinationen einen enormen Speicherverbrauch bedeuten: Wenn man zum Beispiel zwei Streams je 1000 Elemente hat, werden dabei 1.000.000 Objekte erstellt und gleichzeitig im Speicher gehalten. Reaktive Programmierung ist an der Stelle viel sparsamer: Objekte werden zwar auch erzeugt, werden aber durch Filter-Operationen sofort wieder aussortiert und durch den Garbage-Collector entsorgt.

Zusammenführen von Hot-Streams

Die join Operation muss die Daten für das Zusammenführen offensichtlich im Speicher vorhalten. Bei einer Leseoperation von einer beschränkten Menge an Daten, ist es meistens kein Problem. Im Falle eines Hot-Streams bedient die join Operation ein ganz anderes Use-Case.
Als Beispiel dienen zwei Streams für Temperatur und Windgeschwindigkeit unterschiedlicher Messstationen. Alle Stationen schicken die Daten regelmäßig (etwa einmal pro Stunde) aber nicht geordnet. Die Operation join mit dem anschließenden Filtern hilft, die Daten einer Messstation zusammenzuführen.

Flux<Wind> wind = …
Flux<Temperature> temperature = …

Flux<Long> wait = Flux.interval(Duration.ofMinutes(90));

Flux<Pair<Wind, Temperature>> allCombinations = wind
  .join(temperature, (m)->wait, (f)->wait, Pair::of);

Flux<Pair<Wind, Temperature>> stations = allCombinations
  .filter(p->p.left.stationId.equals(p.right.stationId));

 

Hier ist die Rolle eines zusätzlichen Publishers wait sichtlich. Es wird die Dauer des Zwischenspeichers von Events angegeben. Wenn z.B. Event Wind für Bonn bereits angekommen ist, wird ein
Sammel-Event mit dem Wind und der vorhandenen Temperatur abgefeuert (falls vorhanden). Anschließend wird es 90 Minuten auf den nächsten Temperatur- oder Wind-Wert gewartet.

Testen

Bei der reaktiven Programmierung stellt sich die Frage, wie die Publisher-Subscriber-Kommunikation getestet werden kann. Dafür wird im Rahmen des Spring-Reactor-Projektes reactor-test
Abhängigkeit bereitgestellt. Mit Hilfe von StepVerifier Abstraktion können die Tests geschrieben werden.

Flux<String> titles = Flux.just(„Print 9×13“,
   „Photobook A4“, „Calendar A4“);
Duration verificationDuration = StepVerifier.create(titles).
   expectNextMatches(title -> title.equals(„Print 9×13“)).
   expectNextMatches(title -> title.equals(„Photobook A4“)).
   expectNextMatches(title -> title.equals(„Calendar A4“)).
   expectComplete().
   verify();

Zuerst wird eine StepVerifier Instanz mit dem zu prüfenden Publisher erzeugt. Dann werden die Events mit Hilfe von aufeinanderfolgenden Aufrufen der expectNextMatches Methode in der zu erwartenden Reihenfolge angegeben. Der Aufruf expectComplete folgt, wenn keine Events mehr zu erwarten sind. Der abschließende Aufruf von verify dient zur Überprüfung. Es gibt 2  mögliche Ergebnisse des Testlaufs: Sollte der Test erfolgreich sein, wird die Dauer der Testausführung als java.time.Duration Objekt zurückgeliefert. Falls beim Testlauf Fehler festgestellt werden, endet der mit dem AssertionError mit dem Hinweis, was genau schiefgelaufen ist.

Flux<String> titles = Flux.just(„Print 9×13“,
   „Photobook A4“, „Calendar A4“);
StepVerifier.create(titles).
   expectNextMatches(title -> title.equals(„Print 9×13“)).
   expectNextMatches(title -> title.equals(„Photobook A4“)).
   expectComplete().
   verify();

— AUSGABE —
Exception in thread „main“ java.lang.AssertionError:
expectation „expectComplete“ failed (expected: onComplete();
actual: onNext(Calendar A4))

Es besteht mit Spring-Reactor die Möglichkeit, Publisher-Elemente verzögert auszugeben (Methode delayElements).StepVerifier enthält ein für das Testen solcher Fälle nützliches Feature, nämlich die virtuelle Uhrzeit (Methode StepVerifier.withVirtualTime).

Flux<String> titles = Flux.just(„Print 9×13“,
   „Photobook A4“, „Calendar A4“);
StepVerifier.withVirtualTime(titles).
   delayElements(Duration.ofSeconds(1))).
   expectSubscription().
   expectNoEvent(Duration.ofSeconds(1)).
   expectNext(„Print 9×13“).
   expectNoEvent(Duration.ofSeconds(1)).
   expectNext(„Photobook A4“).
   expectNoEvent(Duration.ofSeconds(1)).
   expectNext(„Calendar A4“).
   expectComplete().
   verify();

Zeitgesteuerte Anwendungen sind nur sehr schwer mit eigens geschriebenen Tests zu prüfen. Mit dem Konzept der virtuellen Uhr beim StepVerifier ist es hingegen ohne Probleme möglich. Back-Pressure. Ein Konzept, welches von JDK8 vorgeschrieben ist und deshalb des Öfteren erwähnt wurde, ist Back-Pressure. Ein Use-Case dafür wäre, wenn der Publisher die Elemente schneller generieren kann als sie vom Consumer verarbeitet werden. Bei der Kommunikation ohne Back-Pressure werden die Daten zwischengepuffert. Es führt zum höheren Speicherverbrauch. In vielen Fällen es ist absolut in Ordnung und kann ohne Bauchschmerzen in Kauf genommen werden. Wenn der Speicherverbrauch keine Option ist, kommt Back-Pressure zum Einsatz.

In diesem Fall fragt der Consumer die Daten anders ab. Beim Request wird angegeben, wie viele Elemente in dem ersten Schritt erwünscht werden. Auf der (Abb. 3) ist dieser Einsatz neben der klassischen OOP dargestellt.

Back-Pressure (Abb. 3).

Um die Vorgehensweise etwas übersichtlicher zu gestalten, wird eine Hilfsmethode aus dem Spring-Reactor-Lieferumfang genutzt. Die Methode log eines Publishers erlaubt es, interne Abläufe des Frameworks zu sehen. Nachfolgend wird erstmal der klassische Ablauf ohne Back-Pressure dargestellt.

Flux<String> products = Flux.just(„Print 9×13“,
„Photobook A4“, „Calendar A4“);
products.log().subscribe(System.out::println);

— AUSGABE —
INFO reactor.Flux.Array.2 – | onSubscribe()
INFO reactor.Flux.Array.2 – | request(unbounded)
INFO reactor.Flux.Array.2 – | onNext(Print 9×13)
Print 9×13
INFO reactor.Flux.Array.2 – | onNext(Photobook A4)
Photobook A4
INFO reactor.Flux.Array.2 – | onNext(Calendar A4)
Calendar A4
INFO reactor.Flux.Array.2 – | onComplete()

Die Methode request wird dabei ohne Angabe der Menge aufgerufen (unbounded). Der Publisher feuert daraufhin alle drei ihm zur Verfügung stehenden Elemente nacheinander. Anschließend wird eine etwas komplexere Implementierung mit Back-Pressure verwendet. Hier wird das Subscriber-Interface direkt implementiert. Eine vereinfachte Implementierung über JDK8 Lambda ist hier nicht mehr möglich.

Flux<String> products = Flux.just(„Print 9×13“,
   „Photobook A4“, „Calendar A4“);

products .subscribe(new Subscriber<String>() {
   private long count = 0;
   private Subscription subscription;

public void onSubscribe(Subscription subscription) {

     this.subscription = subscription;
     subscription.request(2);
   }
   public void onNext(String t) {
     count++;
     if (count>=2) {
     count = 0;
     subscription.request(2);
     }
   }

}

— AUSGABE —
INFO reactor.Flux.Array.2 – | onSubscribe()
INFO reactor.Flux.Array.2 – | request(2)
INFO reactor.Flux.Array.2 – | onNext(Print 9×13)
Print 9×13
INFO reactor.Flux.Array.2 – | onNext(Photobook A4)
Photobook A4
INFO reactor.Flux.Array.2 – | request(2)
INFO reactor.Flux.Array.2 – | onNext(Calendar A4)
Calendar A4
INFO reactor.Flux.Array.2 – | onComplete()

Hier kommt der request Aufruf mehrmals zum Einsatz. Es ist auch kein durch die Bibliothek “versteckter” Aufruf. Wir tun es explizit selbst im Code, wenn die bereits bestellten Events abgearbeitet
sind.
Im Gegensatz zu RxJava2, wo zwei unterschiedliche Implementierungen des Publishers existieren, liefert Spring-Core die Back-Pressure-Funktionalität in jedem Publisher mit. Ob man Back-Pressure-Funktionalität von der Seite des Consumers nutzt, oder den Publisher in dem “unbound”-Modus betreibt, ist die Frage der Implementierung. Das Aktivieren von Back-Pressure alleine macht den Consumer nicht “besser”. Unterstützung von Back-Pressure erhöht die Komplexität der Anwendung erheblich und sollte nur dort eingesetzt werden, wo es auch notwendig ist.

Blockierende Operationen

Spring-Reactor-Core-3-API bietet die Möglichkeit, reaktive und somit nicht blockierende Programmierung zu verlassen und in den blockierenden Modus zu wechseln. Für diesen Zweck sind die Methoden toIterable und toStream für Flux und die Methoden block und toFuture für Mono vorgesehen.

Flux<String> titles = Flux.just(„Print 9×13“,
   „Photobook A4“, „Calendar A4“);
Iterable<String> iterableTitles=titles.toIterable();
Stream<String> streamOfTitles = titles.toStream();
Mono<String> title= Mono.just(„Print 9×13“);
String blockingTitle = title.block();
CompletableFuture completableFutureTitle = title.toFuture();

Um zu verstehen, wieso der Wechsel in den blockierenden Modus generell mit Risiken verbunden ist, lohnt sich der Blick hinter die Kulissen von Spring-Reactor-Core 3. Dort wird statt des Thread-Pools eine angepasste Variante des Ring-Buffer-Konzepts genutzt, welche aus dem LMAX-Disruptor-Projekt stammt. LMAX ist eine Java-basierte Trading-Plattform, die 6 Millionen
Transaktionen pro Sekunde pro Thread verarbeiten kann. Das Konzept hat viele Gemeinsamkeiten mit dem Event-Loop-Konzept, welches z.B. bei Node.js zum Einsatz kommt. Dabei geht
es im Grunde darum, eine Struktur zu nutzen, wo viele Publisher ihre Events publizieren und viele Subscriber diese Events verarbeiten können. Beides geschieht höchst konkurrierend (Abb. 4).

Ring-Buffer-Konzept vom LMAX-Disruptor (Abb. 4).

Dabei wird das Konzept der mechanischen Sympathie genutzt, der im Blog von Martin Thompson detailliert beschrieben ist. Es wird Gebrauch von solchen Regeln der Java-Memory-Model gemacht, die eine lockfreie Programmierung ermöglichen. Dabei werden auch Eigenschaften der darunterliegenden Hardware optimal ausgenutzt. Dadurch werden Daten in den prozessornahen
L-Caches abgelegt. Im Gegensatz dazu führt der Einsatz des Java-Synchronisationsmittels (wie ein implizites oder explizites Locking oder volatile Variable) zum Schreiben und Lesen der Daten in bzw. aus dem Hauptspeicher. Zugriffszeiten auf L-Caches sind deutlich kürzer, als die auf den Hauptspeicher.. Disruptor arbeitet nur mit ganz wenigen Threads. Bereits eine Blockierung an einer Stelle führt dazu, dass die ganze Anwendung zum Stehen gebracht werden kann. Das Ziel sollte es sein, die Anwendung durch alle Schichten hindurch komplett reaktiv und somit nicht blockierend zu schreiben.

Kompatibilität zwischen Spring-Reactor-3 und Java-9-Flow-API

Spring-Reactor-3-Projekt implementiert vier Interfaces aus dem Projekt Reactive-Streams5. Die gleichen Interfaces hat Oracle in die Java 9 Flow-API in das Package java.util.concurrent übernommen. Wenn in einem Projekt Java 9 zum Einsatz kommt, kann man Flow-API-Interfaces anstatt Interfaces von Reactive-Streams verwenden. Zu diesem Zweck gibt es im Spring-Reactor-3-Projekt die JdkFlowAdaptor Klasse. Diese stellt die Konvertierungsmethoden flowPublisherToFlux und publisherToFlowPublisher bereit. Da Java zur Zeit keine eigene Implementierung der Reactive-Streams bereitstellt, sind die Entwickler eher dazu gezwungen, eine der bereits existierenden Implementierungen aus der Open-Source-Welt zu verwenden. Alleine wegen der vier genannten Interfaces auf Java 9 umzusteigen, bringt keine nennenswerten Vorteile. Ein Umstieg wird nur dann in Frage kommen, wenn Java in den kommenden Releases eine eigene konkurrenzfähige Implementierung bereitstellt.

Spring-Reactor-3 vs RxJava 2

David Karnok, der ungarische Forscher, Projekt-Lead von RxJava2 und der Commiter für Spring-Reactor-Core-3-Projekt, empfiehlt RxJava 2 für Projekte, wo Java ab Version 6 zum Einsatz kommt oder die Checked-Exceptions behandelt werden müssen. Wenn das letzte nicht gegeben ist und Java ab Version 8 eingesetzt wird, empfiehlt er Spring-Reactor-3 zu verwenden. Ansonsten sind beide genannten Implementierungen der Reactive-Streams für den produktiven Einsatz reif.

Vergleich zwischen den reaktiven und Streaming-Implementierungen

David Karnok hat die Performanz der verschiedenen Reaktiv- und Streaming-APIs verglichen.. Die Performanz von Spring-Reactor-3 und RxJava 2 war beim Testlauf vergleichbar, die der Streaming-APIs (wie Java 8 Stream-API) im Vergleich zu reaktiven APIs war 20 Prozent besser. Das Resultat ist dadurch zu erklären, dass die Streaming-APIs vor allem dann glänzen, wenn es um das Ausführen von nicht latenzsensitiven I/O-Operationen geht. Reaktive APIs glänzen hingegen im Falle der latenzsensitiven I/O-Operationen mit der potenziell unbegrenzten Anzahl an zu verarbeiteten Events.

Zukunft der reaktiven APIs

Laut der gängigen Klassifizierung sind wir bei der vierten Generation der reaktiven APIs angekommen. Alles hat seinen Anfang mit java.util.Observable und Callback-basierten APIs wie addXXXListener und removeXXXListener z.B. in Swing, AWT und Android genommen, die keine allgemeine Abstraktion für alle möglichen Anwendungsfälle darstellte. Diese Abstraktion wurde
mit der RxNET und RxJava 1 zwar geschaffen, jedoch hatten diese APIs viele Schwächen, wie zum Beispiel das fehlende Back-Pressure-Konzept. Außerdem konnten viele Operationen wie take,
zum Holen der bestimmten Anzahl an Elementen nicht unterbrochen werden. Diese Schwächen wurden erst in den nächsten Versionen dieser Bibliotheken adressiert und behoben. Die letzten großen architektonischen Umbaumaßnahmen betrafen die Anwendungsfälle, wo mehrere Operationen aneinander gekettet waren. Es ging in erster Linie darum, die Performanz in solchen Fällen zu optimieren. Es ist z.B unnötig bei einer Serie von concatWith Aufrufen mit einem abschließenden last Aufruf die ganzen concatWith Aufrufe tatsächlich durchzuführen. Das letzte Event kann auf die geschicktere Art und Weise ermittelt werden, in dem nur Events aus dem letzten concatWith Aufruf berücksichtigt werden. In diesem Gebiet gibt es noch viel Optimierungspotenzial.
Die fünfte Generation der reaktiven APIs wird sich wahrscheinlich genau diesem Thema widmen.

Vadym Kazulkin ist Chief Software Architekt bei ip.labs GmbH, einer 100% Tochter der FUJIFILM Gruppe mit Sitz in Bonn. Ip.labs ist das weltweit führende White Label E-Commerce Unternehmen im Bereich Software-Anwendungen für den digitalen Fotoservice, mit einem modernen und motivierenden Arbeitsumfeld, das technologisch immer einen Schritt voraus ist. Vadyms Schwerpunkte sind derzeit die AWS-Cloud und Konzeption und Implementierung der hochskalierbaren und hochverfügbaren Anwendungen.

Rodion Alukhanov ist Senior Software-Entwickler bei ip.labs GmbH mit Sitz in Bonn. Seine mehrjährige Erfahrung streckt sich von MS-DOS über die Hausautomation bis in die AWS-Cloud. Seine Schwerpunkte sind Cloud-Migration, Big Data und Integration Tests.

Alexander Peters ist Senior Software-Entwickler bei ip.labs GmbH mit Sitz in Bonn. Er hat mehr als 10 Jahre Berufserfahrung in der webbasierten Software-Entwicklung mit Java-EE und Spring Framework. Seine Schwerpunkte liegen bei der serverseitigen Anwendungsentwicklung sowie bei der Definition und Implementierung von externen und internen Schnittstellen.

Carolyn Molski


Leave a Reply

2 comments

  1. Hallo,

    danke für den Artikel.
    in welcher Zeitschrift ist dieser zweite Teil zu finden?

    • Carolyn Molski

      Hallo Steven,

      Teil 1 findest du in der Ausgabe 1/2018 und Teil 2 wurde in Ausgabe 2/2018 veröffentlicht.