Publié par
Il y a 6 mois · 8 minutes · Back

Introduction aux flux réactifs en Java

Je vous propose de vous présenter différents outils permettant de développer des applications non bloquantes en Java.

Il y a seulement quelques années, une application de grande taille était composée de dizaines de serveurs, avait des temps de réponse de l’ordre de la seconde, plusieurs heures de maintenance et plusieurs gigaoctets de données. Les applications actuelles sont déployées sur des plates-formes variées, allant des simples appareils mobiles jusqu’à des grappes de serveurs distribuant leur charge à des milliers de processeurs multicœurs. Les utilisateurs attendent des temps de réponse de l’ordre de la milliseconde et une disponibilité continue des services, les données sont dorénavant mesurées en pétaoctets.

Notre conviction est qu’il faut une approche cohérente de l’architecture des systèmes modernes et que tous les aspects requis pour cela sont déjà reconnus individuellement: nous parlons de systèmes qui doivent toujours être disponibles pour répondre, être robustes, souples et orientés message. Nous les appelons Systèmes Réactifs.

Extrait du Manifeste Réactif.

Vous l’avez compris, avec l’évolution des systèmes d’information il est aujourd’hui nécessaire de s’orienter vers de nouvelles architectures. C’est là que la programmation réactive intervient en proposant des solutions non bloquantes aux différents traitements.

Présentation

Les flux réactifs sont un projet visant à proposer un standard pour l’implémentation de flux asynchrones sans « contre pression bloquante » (« non-blocking back pressure »). En effet, lorsqu’on manipule un flux, l’envoi a souvent tendance à être beaucoup plus rapide que la consommation, il faut par conséquent être attentif à ne pas envoyer trop de données et ainsi remplir puis bloquer le flux.

Avec l’arrivée de Java 9, Oracle a ajouté de nouvelles interfaces qui s’apparentent fortement à celles proposées par les flux réactifs. Attention cependant, l’API Flow de Java 9 n’émet pas de signal pour gérer les back-pressure.

Les composants qui constituent un flux réactif sont :

  • Publisher, c’est l’entrée de notre flux et ce qui va produire et émettre des signaux vers le(s) Subscriber(s)
  • Subscriber, représente le consommateur du flux afin de traiter un signal dans un Processor
  • Processor, va quant à lui permettre de transformer l’objet publié dans le Publisher en objet consommé par le Subscriber
  • Subscription, décrit l’action de souscrire à un Publisher avec un Subscriber. C’est utilisé à la fois pour signaler le désir de données ainsi que pour annuler une demande.

Backpressure

Le phénomène de « backpressure » survient lorsque le flux est surchargé. Comme le rappelle le Manifeste Réactif, il est inacceptable que le système échoue, bloque ou pire perdre des données lorsqu’il est sous tension. Il est donc nécessaire que le flux réactif signale au Publisher qu’il est soumis à certaines contraintes, et ainsi réduire la charge. Ainsi, le Publisher peut choisir parmi plusieurs stratégies pour réduire la pression et ne pas planter le système.

L’une des solutions pour avoir des backpressures non bloquantes est de s’orienter vers un modèle orienté « pull » (ou contraire du « push »). De cette façon, le Subscriber demande au Publisher un certain nombre de messages, en attendant de nouvelles demandes avant d’en envoyer d’autres.

Marble Diagrams

Dès que vous commencerez à manipuler un flux réactif, vous rencontrerez dans les documentations des librairies qui implémentent les flux réactifs des « marble diagrams ». Ces diagrammes sont la représentation visuelle du flux et des opérations effectuées sur les objets émis par le flux.

Voici une image vous permettant de les comprendre :

legend

Implémentations

Découvrons ensemble deux implémentations de ces flux réactifs : RxJava et Reactor.

RxJava

Probablement l’outil le plus connu, surtout dans la communauté des développeurs Android (qui permet par exemple d’effectuer des appels HTTP de manière asynchrone et ainsi ne pas bloquer l’IHM), cette bibliothèque a tout à fait sa place dans une architecture back-end Java.

Instanciation

Un flux réactif, implémenté grâce à RxJava ressemble simplement à ça :

Flowable<String> publisher = Flowable.fromIterable(Arrays.asList("Hello", "RxJava"));
Consumer<String> subscriber = System.out::println;
publisher.subscribe(subscriber);

Ici, on va émettre les chaines de caractères « Hello » et « RxJava » dans le flux, auquel on souscrit dans le but d’effectuer un System.out.println. Sachez qu’avec un modèle de flux basé sur un modèle « pull », absolument rien ne se passera tant qu’on n’aura pas fait de subscribe.

Opérateurs

Il est aussi possible de chaîner les traitements grâce aux opérateurs que peut proposer RxJava, comme map, flatMap, etc. :

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
 
inventorySource
 .flatMap(inventoryItem -> myReactiveService.getDemandAsync(inventoryItem.getId()))
 .map(demand -> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand))
 .subscribe();

Dans le code ci-dessus, on opère sur le flux inventorySource, afin d’effectuer une demande pour chaque Inventory.

Multithreading

Afin de définir où et quand exécuter les opérations sur les éléments qui sont dans le flux (de manière asynchrone ou non suivant le besoin), on utilise un Scheduler, dont on peut récupérer différentes instances depuis la classe utilitaire Schedulers.

Par exemple :

Flowable.fromCallable(() -> {
 Thread.sleep(1000); // Simule un traitement long, comme un appel HTTP...
 return "Done";
})
 .subscribeOn(Schedulers.newThread())
 .observeOn(Schedulers.single())
 .subscribe(System.out::println, Throwable::printStackTrace);

L’appel à la méthode subscribeOn permet de souscrire au Publisher de manière asynchrone. Quant à la méthode observeOn, elle modifie le Publisher pour qu’il émette les données via le Scheduler passé en paramètre. Ainsi, le Thread.sleep(1000) va être executé dans un autre thread que celui du System.out.println.

Par ailleurs, notez l’utilisation de Throwable::printStackTrace, qui va être appelé en cas d’erreur durant le System.out::println.

Project Reactor

Reactor est l’implémentation des flux réactifs par Spring. Project Reactor est d’ailleurs intégré dans Spring 5, avec les WebFlux.

Instanciation

L’instanciation d’un flux réactif avec Reactor est identique à RxJava :

Flux.just("Hello world").subscribe(System.out::println);

Contrairement à RxJava, Reactor permet en plus de manipuler un Mono, qui va lui émettre un seul élément, au maximum. Concrètement, il s’agit simplement d’une quesion de sémantique :  par exemple, une requête HTTP ne va produire qu’une réponse, il n’y a donc que très peu d’intérêt à exprimer le résultat dans un Flux car un Mono n’offre que des opérateurs pertinents pour un contexte de 0 à 1 élément.

Mono.fromCallable(() -> HttpClients.createDefault().execute(new HttpGet("https://xebia.fr/")))
 .subscribe(httpResponse -> {
 try {
 System.out.println(IOUtils.toString(httpResponse.getEntity().getContent()));
 } catch (IOException e) {
 e.printStackTrace();
 }
});

Opérateurs

Reactor propose aussi tout un tas d’opérateurs pour ces Flux et Mono, comme flatMap, map, distinct etc :

Flux.fromIterable(getSomeLongList())
 .mergeWith(Flux.interval(100))
 .doOnNext(serviceA::someObserver)
 .map(d -&amp;amp;gt; d * 2)
 .take(3)
 .onErrorResumeWith(errorHandler::fallback)
 .doAfterTerminate(serviceM::incrementTerminate)
 .subscribe(System.out::println);

Multithreading

Il est aussi possible d’utiliser des Scheduler, comparable à RxJava :

Flux.range(1, 10000).publishOn(Schedulers.parallel()).subscribe(System.out::println);

Addons

De plus, cette bibliothèque propose un Adapter qui permet de convertir un Flowable RxJava en Flux Reactor.

Cas d’utilisations

La question que vous devez maintenant vous poser est « où est-ce que je pourrai placer mon premier flux réactif au sein de mon projet ? ». Pour vous aider, vous trouverez ci-dessous deux cas d’utilisations des flux réactifs :

  • Appels HTTP – Aujourd’hui, énormément d’applications backend sont REST et utilisent le protocole HTTP, qui est bloquant et synchrone. Et généralement, l’appel vers un service en entraîne aussi un autre, et ainsi de suite. Si vous devez attendre la fin de tous vos appels avant de pouvoir construire une réponse, votre client va très vite laisser tomber. Ce type d’appels en chaine est un excellent candidat aux flux réactifs. C’est d’ailleurs ce que Spring va proposer dans sa version 5 avec WebFlux.
  • Consommation de messages – Les flux réactifs ont entièrement leur place dans la consommation de messages, ou d’événements. Attention cependant, les différentes implémentations de flux réactifs ont tendance à se vendre en montrant combien de messages elles arrivent à consommer à la seconde : ne vous y fiez pas ! Mais s’il y a un moyen de consommer davantage de messages plus rapidement, vous devriez vous y intéresser.

Conclusion

Nous avons donc découvert ensemble deux implémentations des flux réactifs : RxJava et Project Reactor.

Avant de foncer tête baissée et d’ajouter l’une de ces librairies dans vos dépendances, rappelez vous qu’il n’existe pas de solution miracle (« No Silver Bullet »). Et la programmation réactive n’échappe pas à cette règle.

Project Reactor et RxJava sont deux outils très semblables, qui vont très probablement devenir de plus en plus courant. Lequel choisir ? À mon humble avis, en tant que développeur back, je serai davantage partant à l’utilisation de Reactor. En effet, Project Reactor est directement embarqué avec Spring 5, et la bibliothèque est écrite en Java 8. Tandis que RxJava se doit de rester compatible avec Android et est écrit en Java 6. De plus, Reactor propose des addons pour adapter les classes de RxJava 1 et 2 si cette bibliothèque est déjà présente dans votre projet.

One thought on “Introduction aux flux réactifs en Java”

  1. Publié par Colas, Il y a 2 mois

    Bonjour, merci pour cet article très intéressant. J’ai une question concernant les transactions gérées par spring ainsi que les rollbacks en cas d’erreur lors d’un appel d’un WS ‘reactif’: ya-t’il un intérêt de rendre un WS rest ‘reactif’ si ce dernier ne fait qu’appeller un service qui lui exécutera une requête en base de données?
    Autre question, peut être totalement hors sujet: j’imagine que tant que la requête en base de données n’est pas encore terminée, le thread java sera libéré pour traiter un second appel WS ‘reactif’, mais si la requête sql du premier appel WS génère une exception, ya-t’il moyen d’exécuter un rollback sachant que le thread java est occupé à construire le flux pour le second appel?

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *