#Java #ReaktiveProgrammierung #SpringReactorCore

Reaktive Architekturen werden immer beliebter. Beim Einzug in das Big-Data- und Cloud-Zeitalter entfalten sie ihre Stärke. Neue syntaktische Elemente von JDK8 wie Lambda und Streams erhöhen die Akzeptanz und glätten die Lernkurve aus. Der 1. Teil der 3-teiligen Serie bietet eine praktische Einführung in das reaktive Framework Spring Reactor Core 3.

Die formale Definition aus Wikipedia beschreibt reaktive Programmierung als ein Programmierparadigma, welches sich an Datenflüssen orientiert. Darunter fällt eine Reihe von Implementierungen und Konzepten. Es sind sowohl verteilte Architekturen, wie JMS (Java Message Service) oder SOA (Service-Oriented-Architecture), als auch Einsätze innerhalb einer Anwendung, wie z.B. die Berechnung von Formeln einer Excel-Tabelle.

Das Gemeinsame an den Konzepten ist, dass der Softwareentwickler nur eine eingeschränkte Kontrolle über die ausführenden Threads oder Prozesse hat. Der Programmcode besteht daher aus zustandslosen Funktionen, die miteinander über Data-Streams verbunden sind.

Dieser Artikel beleuchtet praktische Aspekte der aktuellen Implementierung des Standards Reactive-Streams am Beispiel von Spring Reactor 3. Der reaktive Einsatz bietet zahlreiche Vorteile: unter anderem bessere Hardwareausnutzung, Performance und Wartbarkeit des Codes. Hier muss man unbedingt anmerken, dass der reaktive Einsatz keine allgemein bessere Vorgehensweise ist. Es ist lediglich eine andere Art von der Gestaltung des Codes und der Programmlogik. Je nach Use-Case und zu entwickelnder Logik kann der reaktive Einsatz sowohl deutliche Vorteile, als auch große Nachteile mit sich ziehen.

Die allgemeine Vorgehensweise ist am besten durch ein UML-Sequenzdiagramm abbildbar. In (Abb. 1) sind zwei Einsätze nebeneinander dargestellt. In beiden Diagrammen werden zwei Produkttitel gelesen (z.B. aus der Datenbank). Während es im klassischen Einsatz darum geht, eine Methode (in dem Beispiel loadTitles) zu implementieren, schreibt man in dem reaktiven Code einen Publisher. Zum Konsumieren von Elementen benötigt man einen Subscriber.

Java Iterator vs. Reactor. (Abb. 1)

Für beide Klassen bietet der Reactive-Streams-Standard die gleichnamigen Interfaces mit an. Die genannten Interfaces werden selten direkt implementiert. Spring Reactor bietet dafür eine Reihe von Implementierungen und Hilfsklassen. Die wichtigsten davon sind Flux und Mono (Abb. 2).

In dem folgenden Code ist ein einfacher Publisher dargestellt. Er holt die Daten aus einer klassischen, nicht reaktiven Schnittstelle (z.B. JDBC) und leitet sie an einen Subscriber weiter. So ein Publisher bildet einen typischen Baustein einer reaktiven Datenverarbeitung.

(Listing 1)
int[] ids = {1, 2, 3, 4, 5};

Flux<String> titles = Flux.create(s -> {
  try {
    for (String title :
    dao.loadTitles(ids)) {
      s.next(title);
}

      s.complete();
    } catch (SQLException e) {
      s.error(e);
    }
});

Oft geht es auch einfacher. Ein Publisher kann von einer Collection oder einem Array erzeugt werden. Dazu kommen ein paar Sonderfälle, wie ein leerer Publisher oder ein Publisher, der immer einen Fehler weitergibt.

Flux und Mono. (Abb. 2)

(Listing 2)
Flux<String> titles =
   Flux.just(„Print 9×13“, „Book A4“, „Calendar A4“);
Flux<String> noElements = Flux.empty();
Flux<String> alwaysError =
   Flux.error(new IOException(„Host not found“));

Hier und im weiteren Verlauf wird eine Untermenge von Publishers dargestellt. Es sind sogenannte Cold-Publishers. Sie erzeugen die gleiche Kette von Elementen für jeden Subscriber und beenden die Erzeugung mit einem Aufruf der Methode onSuccess oder onError. Ein solcher Publisher erzeugt, genauso wie eine Methode oder eine Funktion, durch die Definition noch keine Verarbeitung. Zum Zug kommt der Code nur, wenn ein Subscriber sich den Publisher abonniert.

Die andere Gruppe sind die Hot-Publishers. Es sind meistens Elemente einer verteilten Architektur, z.B eines JMS-Systems. Sie generieren ununterbrochen Elemente im Hintergrund und speisen diese in Subscribers ein, sobald sich welche bei dem Publisher melden. Eine Beschreibung der Hot-Publishers würde allerdings den Rahmen dieses Artikels sprengen.
Analog geht es bei der Verwertung von erzeugten Streams durch einen Publisher. Eine direkte Lösung wäre der Subscriber in dem folgenden Listing.

(Listing 3)
imagePublisher.subscribe(new Subscriber<Image>() {

   public void onSubscribe(Subscription subsc) {
      subsc.request(Long.MAX_VALUE);
   }
   public void onComplete() {
      printerService.printSeparator();
   }
   public void onError(Throwable error) {
      printerService.printErrorPage(error);
   }
   public void onNext(Image image) {
      printerService.print(image);
   }
});

Auch beim Erzeugen eines Subscribers ist eine direkte Implementierung des gleichnamigen Interfaces nicht notwendig. Zahlreiche statische Hilfsmethoden der Spring-Reactor-Bibliothek stehen dem Entwickler zur Verfügung.

(Listing 4)
imagePublisher.subscribe(
   picture->printerService.print(picture),
   error->printerService.printErrorPage(error));

Eine Besonderheit des Spring-Frameworks ist ein spezieller Publisher vom Typ Mono. Es ist ein Publisher, der höchstens ein Element generieren kann. Während ein Flux einem JDK8-Stream ähnlich ist, wäre ein Mono eine Alternative zu Optional. Die Mono-Klasse bietet erwartungsgemäß keine zusätzliche Funktionalität, sorgt aber durch einen anderen Satz von Hilfsmethoden für mehr Übersicht. Der folgende Code lädt zum Beispiel genau ein Bild aus dem Netz herunter.

(Listing 5)
Mono<Integer> mid = Mono.just(imageId);
CompletableFuture<Image> loadingFuture =
   mid.map(id->loadFromInstagram(id)).toFuture();
legacyService.processPicture(loadingFuture);

Die Stärke des reaktiven Konzepts entfaltet sich bei der Nutzung von komplexeren Ausführungsplänen. Zusätzliche Kontrolle der Ausführung durch die Reactor-Bibliothek ermöglicht eine deklarative Multithread-Ausführung. Der Code in (Listing 6) erlaubt das parallele Laden von beliebig vielen Bildern. Der gleiche Code wäre in der klassischen OOP bei weitem nicht so übersichtlich gewesen. Möglichkeiten einer parallelen Ausführung werden im weiteren Verlauf der Artikelserie ausführlicher beleuchtet.

(Listing 6)
Scheduler scheduler = Schedulers.elastic();
Flux<Integer> ids = Flux.just(1, 2, 3, 4, 5);
Function<Integer, Mono<Image>> loadPicture =
   id->Mono.create(s -> {
      s.success(loadFromInstagram(id));
   });
Flux<Picture> pictures =
   ids.flatMap(loadPicture).subscribeOn(scheduler);


Fehlerbehandlung

Die Fehlerbehandlung im reaktiven Code ist nicht so gut strukturiert, wie in klassischem Java-Code. Es gibt zwei Möglichkeiten eine Ausnahmesituation zu melden: eine Exception zu werfen, oder eine Exception-Klasse einer error Methode zu übergeben.

(Listing 7)
Mono<String> publisher = Mono.create(s -> {
   if (somethingWrong1) {
      throw new RuntimeException(„Problem 1“);
   }
   if (somethingWrong2) {
      s.error(new Exception(„Problem 2“));
   } else {
      s.success(„No Problems“);
   }
});

Bei der Fehlerbehandlung muss man sicherstellen, dass die Regeln der Reactive-Streams-Specification nicht verletzt werden. Es ist z.B. nicht zulässig, nach einem Error einen Success zu melden. In den meisten Fällen wird die Spring-Reactor-Bibliothek eine Verletzung der Spezifikation unterbinden. Je nach Situation kann es zur Exception oder zum Ignorieren eines Events führen. Besonders letzteres kann zu schwer zu findenden Unregelmäßigkeiten bei der Ausführung führen.

Bei der Ausnahmebehandlung ist die reaktive Programmierung ein gewisser Rückschritt gegenüber dem klassischen Java-Modell. Es besteht keine Möglichkeit, Exceptions durch separate Catch-Blöcke zu behandeln. Die Compiler-Kontrolle über die Behandlung von Checked-Exceptions geht hier leider ebenfalls verloren. Es erfordert mehr Aufmerksamkeit vom Entwickler: man muss dafür sorgen, dass die nach der Businesslogik vorgesehenen Ausnahmefälle richtig behandelt werden.

Logging

Bei einer komplexeren Verkettung von Publishern ist es oft notwendig, den Datenfluss zu verfolgen. In einem reaktiven Code erweist sich dies oft als umständlich. Insbesondere trifft es zu, wenn kurze Lambda-Ausdrücke und Hilfsfunktionen von Spring benutzt werden. Diese Arbeit kann durch einen loggenden Zwischen-Publisher erleichtert werden, was mithilfe der Log-Methode erfolgt.

(Listing 8)
Flux<String> titles =
   Flux.just(„Photobook“, „Calendar“).log();
Flux<String> format = titles.flatMap(
   t->Flux.just(t + “ A5″, t + “ A4″)).log();
format.subscribe(System.err::println);

— AUSGABE (verkürzt) —
[INFO] (main) | onSubscribe(FluxArray)
[INFO] (main) onSubscribe(FluxFlatMap)
[INFO] (main) | onNext(Photobook)
[INFO] (main) onNext(Photobook A5)
[INFO] (main) onNext(Photobook A4)
[INFO] (main) | onNext(Calendar)
[INFO] (main) onNext(Calendar A5)
[INFO] (main) onNext(Calendar A4)
[INFO] (main) | onComplete()
[INFO] (main) onComplete()

Durch eine Anbindung der SLF4J-Bibliothek werden alle gängigen Log-Formate unterstützt.

Publisher-Operationen

Wie in der funktionalen Welt üblich, haben Flux und Mono viele Operationen, die deren Werte manipulieren können. Fast alle diese Operationen liefern wiederum einen neuen Publisher (Flux oder Mono) zurück. Somit ist das Prinzip einer nicht blockierenden Anwendung gegeben, und es kann auf die Elemente zugegriffen werden, sobald diese verfügbar sind.

Operationen kann man grob in folgende Gruppen zusammenfassen:

• Transformationen (map, flatMap, scan …),
• Kombinationen (Verknüpfungen) (zip, zipWith, merge,mergeWith, concat, concatWith …),
• Filterung (filter, first, firstEmmiting, last, skip, take …),
• Mathematische Operationen (count, average, max …),
• Boolesche Ausdrücke (all, every, some, includes …).

Transformationen

Um die Werte eines Publishers mit einer synchronen Funktion (Lambda-Ausdruck) zu transformieren, kann die map Operation benutzt werden (Abb. 3). In dem folgenden Beispiel wird der Titel eines Produktes zu Großbuchstaben umgewandelt.

Map Operation für Flux. (Abb.3)

 

(Listing 9)
Mono.just(„Print 9×13“).
   map(p -> p.toUpperCase()).
   subscribe(System.out::print);
— AUSGABE —
PRINT 9X13

Für Flux kann man die gleiche Vorgehensweise wählen, mit dem Unterschied, dass die Transformation für jedes Element nacheinander
angewendet wird:

(Listing 10)
Flux.just(„Print 9×13“, „Photobook A4“).
   map(p -> p.toUpperCase()).
   subscribe(System.out::print);
— AUSGABE —
PRINT 9X13PHOTOBOOK A4

Soll für die Transformation eine Methode verwendet werden, die eine unbestimmte Zeit für die Bearbeitung in Anspruch nimmt, zum Beispiel ein Aufruf über Remote-API, ist die obere Vorgehensweise nicht optimal. Die Methode flatMap bietet die Möglichkeit an, die Verarbeitung in einen anderen Publisher auszulagern. Der zweite Publisher (siehe Beispiel mit Typ Mono) kann die Arbeit asynchron erledigen:

(Listing 11)
Flux.just(„Print 9×13“, „Photobook A4“).
   flatMap(p -> asyncCapitalise(p)).
subscribe(System.out::print);

Mono<String> asyncCapitalise(String x) {
   Mono<String> mono = Mono.create(s->{
      s.success(title.toUpperCase());
   });
   return mono.subscribeOn(Schedulers.elastic());
}
— AUSGABE —
PHOTOBOOK A4PRINT 9X13

Ein weiterer Vorteil der flatMap Operation ist, dass die transformierende Methode zu einer Eingabe keine oder mehrere Ausgaben liefern darf (Abb. 4):

(Listing 12)
Flux.just(„Print 9×13“, „Photobook A4“).
   flatMap(p -> capitaliseAndUnderline(p)).
   subscribe(System.out::print);

Flux<String> capitaliseAndUnderline(String x) {
   return Flux.just(x.toUpperCase(), x.replace(“ „, „_“));
}
— AUSGABE —
PRINT 9X13Print_9x13PHOTOBOOK A4Photobook_A4

FlatMap Operation. (Abb. 4)

scan verändert die Werte eines Publishers mit einer Accumulator-Funktion. Dabei werden die Ergebnisse als Eingabeparameter für die weiteren Werte-Veränderungen benutzt (Abb. 5):

(Listing 13)
Flux.just(1,2,3,4,5).
   scan((x, y) -> x + y).
   subscribe(System.out::print);
— AUSGABE (mit Leerzeichen getrennt) —
1 3 6 10 15

Scan Operation. (Abb. 5)

Man kann die Vorgehensweise mit folgenden Ausdrücken beschreiben:

(Listing 14)
result[0] = source[0]
result[1] = accumulator(result[0], source[1])
result[2] = accumulator(result[1], source[2])
result[3] = accumulator(result[2], source[3])

Kombinationen

Mit der statischen zip bzw. zipWith Operation des Publishers ist es möglich, Werte eines Publishers mit den Werten eines anderen Publishers zusammenzuführen. Dabei wird das erste Element des ersten Publishers mit dem ersten Element des zweiten Publishers, das zweite Element mit dem zweiten Element und so weiter zusammen gruppiert. Als Rückgabewert bekommt man einen Publisher mit dem Tupel von zwei Elementen zurück. (Abb. 6)

Diese Operation ist hilfreich, wenn die Elemente aus unterschiedlichen Quellen stammen, zum Beispiel aus unterschiedlichen Microservices geliefert werden. So können die in Gruppen zusammengefasst werden, sobald die Ergebnisse ankommen. Falls ein Publisher mehr Elemente als ein anderer Publisher besitzt, werden die überflüssigen Elemente nicht berücksichtigt:

(Listing 15)
Flux<String> titles =
   Flux.just(„Print 9×13“, „Photobook A4“);
Flux<Double> prices =
   Flux.just(0.09, 29.99, 39.99);
Flux<Tuple2<String, Double>> zipped =
   Flux.zip(titles, prices);
zipped.subscribe(System.out::print);
— AUSGABE —
[Print 9×13,0.09][Photobook A4,29.99]

Zip Operation. (Abb. 6)

Die statische merge bzw. mergeWith Operation des Publishers fügt Elemente aus zwei Publisher in einem neuen Flux zusammen. Die Werte kommen dabei in der Reihenfolge an, in der sie verfügbar werden (Abb. 7):

(Listing 16)
Flux<String> flux1 = Flux.just(„a“, „b“, „c“, „d“);
// Flux emits item each 500ms:
Flux<String> flux1Delayed =
   Flux.interval(Duration.ofMillis(500)).
   zipWith(flux1, (i, string) -> string);
Flux<String> flux2 = Flux.just(„1“, „2“, „3“);
// Flux emits item each 700ms:
Flux<String> flux2Delayed =
   Flux.interval(Duration.ofMillis(700)).
   zipWith(flux2, (i, string) -> string);
Flux.merge(flux1Delayed, flux2Delayed).
   subscribe(System.out::print);
— AUSGABE —
a1b2cd3

 

Merge Operation. (Abb. 7)

Im Unterschied zu merge wartet die concat bzw. concatWith Operation, bis alle Werte des ersten Publishers angekommen sind. Anschließend fängt sie mit dem Hinzufügen der Werte aus dem zweiten Publisher an (Abb. 8):

(Listing 17)
Flux.concat(flux1Delayed, flux2Delayed).
subscribe(System.out::print);
— AUSGABE —
abcd123

Concat Operation. (Abb. 8)

Filterung

Mit der filter Operation besteht die Möglichkeit, Publisher-Werte zu filtern, in dem der Methode ein Kriterium in Form eines Lambda-Ausdruckes übergeben wird (Abb. 9):

(Listing 18)
Flux<String> titles =
   Flux.just(„Print 9×13“, „Photobook“, „Calendar“);
Flux<String> titlesStartingWithP =
   titles.filter(title -> title.startsWith(„P“));
titlesStartingWithP.subscribe(System.out::print);
— AUSGABE —
Print 9x13Photobook A4

Filter Operation. (Abb. 9)

Mathematische Operationen

Um zu ermitteln, wie viele Elemente ein Publisher liefert, wird die count Methode verwendet. Weil die Methode nur einen Wert zurückliefert, ist der Rückgabetyp ein Mono<Long>. Damit erhält man eine Anzahl der Elemente als Long zurück, sobald der Publisher ein onComplete Event sendet:

(Listing 19)
Mono<Long> count =
   Flux.just(„Print 9×13“, „Photobook A4“).
   count();
count.subscribe(System.out::print);
— AUSGABE —
2

Boolesche Ausdrücke

Für die Prüfung, ob alle Elemente eine Bedingung erfüllen, wird die Methode all eingesetzt. Als Parameter übergibt man ein Kriterium in Form eines Lambda-Ausdruckes. Als Rückgabewert wird ein Mono<Boolean> geliefert (Abb. 10):

(Listing 20)
Mono<Boolean> titlesBiggerThen5 =
   Flux.just(„Print 9×13“, „Photobook A4“).
   all(title -> title.length() > 5);
titlesBiggerThen5.subscribe(System.out::print);
— AUSGABE —
true

All Operation. (Abb. 10)

 

Kompatibilität mit Java 9 Flow-API

Spring-Reactor-Core-3-Projekt implementiert die vier Interfaces aus dem Projekt Reactive-Streams. Die gleichen Interfaces hat Oracle in Java 9 als Flow-API in das Package java.util.concurrent übernommen. Wenn im Projekt Java 9 zum Einsatz kommt, kann man die Flow-API-Interfaces anstatt Interfaces von Reactive-Streams verwenden. Dafür gibt es im Spring-Reactor-Core-3-Projekt die JdkFlowAdapter Klasse. Diese stellt zu diesem Zweck die Konvertierungsmethoden flowPublisherToFlux und publisherToFlowPublisher bereit. Da Java zur Zeit keine eigene Implementierung der Reactive-Streams bereitstellt, sind die Entwickler eher dazu gezwungen, eine der bereits existierenden Implementierungen aus der Open-Source-Welt zu verwenden. Alleine wegen der vier genannten Interfaces auf Java 9 umzusteigen, bringt keine nennenswerten Vorteile. Es wird nur dann in Frage kommen, falls Java in den kommenden Releases eine eigene konkurrenzfähige Implementierung bereitstellt.

Fazit und Ausblick

Die Standardisierung unterschiedlicher Einsätze durch Reactive-Streams-Initiative ermöglicht in der absehbaren Zukunft die nahtlose Integration von externen Bibliotheken und Schnittstellen. Durch Einbinden der reaktiven Komponenten für Web-Client, Datenbank und View bekommt die Software einen neuen Schwung. In der nächsten JAVAPRO Ausgabe geht es im zweiten Teil dieser Serie um die fortgeschrittenen Themen und Konzepte rund um das Spring-Reactor-Core-3-Framework, wie Cold- und Hot-Publisher, Backpressure, parallele Verarbeitung, blockierende Operationen und Testen von Publishern. Außerdem widmen wir uns der Zukunft der reaktiven APIs und dem Vergleich zwischen reaktiven und streaming APIs.

Vadym Kazulkin ist Chief Software Architekt bei ip.labs GmbH, einer 100% Tochter der FUJIFILM Gruppe mit Sitz in Bonn. Ip.labs ist das weltweit führende White Label E-Commerce Unternehmen im Bereich Software-Anwendungen für den digitalen Fotoservice, mit einem modernen und motivierenden Arbeitsumfeld, das technologisch immer einen Schritt voraus ist. Vadyms Schwerpunkte sind derzeit die AWS-Cloud und Konzeption und Implementierung der hochskalierbaren und hochverfügbaren Anwendungen.

Rodion Alukhanov ist Senior Software-Entwickler bei ip.labs GmbH mit Sitz in Bonn. Seine mehrjährige Erfahrung streckt sich von MS-DOS über die Hausautomation bis in die AWS-Cloud. Seine Schwerpunkte sind Cloud-Migration, Big Data und Integration Tests.

Alexander Peters ist Senior Software-Entwickler bei ip.labs GmbH mit Sitz in Bonn. Er hat mehr als 10 Jahre Berufserfahrung in der webbasierten Software-Entwicklung mit Java-EE und Spring Framework. Seine Schwerpunkte liegen bei der serverseitigen Anwendungsentwicklung sowie bei der Definition und Implementierung von externen und internen Schnittstellen.

Carolyn Molski


Leave a Reply