Publié par
Il y a 2 années · 12 minutes · Back

Faites couler une rivière Java 8 depuis une source OpenCSV

Aujourd’hui, on commence à avoir l’habitude de l’API Stream de Java 8. Vous l’utilisez probablement pour lire des fichiers : l’API Java IO a été revue pour ajouter une petite méthode stream() magique sur les lecteurs de flot pour lesquels c’était applicable. Vous l’utilisez peut-être pour lire des fichiers au format CSV. Ou pas… Vous avez peut-être constaté comme moi qu’OpenCSV, la populaire API de lecture/écriture de données aux format CSV, n’avait pas de méthode magique ! Nous allons donc voir comment créer un générateur de Stream pour bénéficier à la fois des services d’OpenCSV et à la fois du confort des Streams. Cela nous offrira également une occasion de comprendre certains de leurs mécanismes.

Un outil pour nous aider : StreamSupport

Le point de départ pour générer un Stream sera la classe utilitaire java.util.stream.StreamSupport.

Comme on le voit dans la javadoc, cette classe possède des méthodes pour générer des Stream, pour itérer des types de base (int, long, double) plus des méthodes pour générer un Stream typé de manière générique. Nous souhaitons adapter un lecteur de flux CSV. Or, on associe souvent un enregistrement à un type d’objet. Nous somme donc particulièrement intéressés par les méthodes qui permettent de générer un Stream de type générique. Nous pourrons ainsi itérer directement sur des objets « métier ».

Le contrat de StreamSupport est de fournir un Stream si on lui fournit un Spliterator : nous devons donc nous intéresser à cette classe pour aller plus loin.

L’implémentation personnalisable grâce à Spliterator

Les caractéristiques du Spliterator

Nous constatons que les méthodes de StreamSupport existent en deux versions : celle qui accepte un java.util.Spliterator et celle qui accepte un fournisseur de Spliterator plus ses caractéristiques. Ces dernières sont très importantes car elle définissent comment nos objets vont être itérés, et quelles sont les optimisations possibles. De ces caractéristiques, vont découler des possibilités et des limites. Si nous commettons une erreur dans la liste de caractéristiques, nous risquons de provoquer des comportements imprévisibles et/ou incohérents.

Je souhaiterais faire ici une petite parenthèse critique :

  • Nous constatons que les caractéristiques sont spécifiées par un int et non pas une énumération de type sûr (Enum en Java). Les concepteurs ont préféré simplifier la combinaison des caractéristiques qui se fera par des | binaires.
  • Il y a une incohérence qui peut générer des erreurs (comme dit explicitement dans la javadoc de StreamSupport.stream(Supplier, int, boolean)) : si les caractéristiques fournies à l’appel de stream() ne sont pas les mêmes que celles retournée par la méthode characteristics() de Spliterator, le comportement peut être erroné. Ce risque de se tromper est le prix à payer pour éviter le get() du Supplier une seule fois et à la fin. Or StreamSupport.stream() va avoir besoin des caractéristiques avant…

Ces remarques faites, penchons-nous sur les caractéristiques du Spliterator :

CONCURRENT

Caractéristique des flots dont la source peut être modifiée de manière concurrente car cette modification est gérée de manière convenable, et ne nécessite pas de synchronisation explicite de la part de l’utilisateur de notre Stream (la javadoc donne l’exemple de ConcurrentHashMap).

IMMUTABLE

Caractéristique des flots dont la source est immuable (la javadoc donne l’exemple de CopyOnWriteArrayList). Elle est importante dans le sens où elle garantit que la source ne pourra pas être modifiée après la création du Spliterator.

DISTINCT

Caractéristique des flots dont tous les éléments sont uniques, comme dans les Set. Ce sont les méthodes usuelles equals/hashcode qui définissent l’égalité ou la différence entre éléments.

NONNULL

Caractéristique indiquant qu’un élément du flot de données ne sera pas nul.

ORDERED

Caractéristique des flots dont l’ordre des éléments est connu – mais par forcément trié. Les éléments sont donc supposés arriver dans un ordre invariant d’un parcours sur l’autre.

SORTED

Caractéristique des flots dont les éléments sont triés. On peut donc la considérer comme supplémentaire à ORDERED. Pour que cette caractéristique puisse être utilisée, il faut au choix :

SIZED

Caractéristique des flots dont on connaît la taille, comme les collections. On pourrait traduire par « à nombre fini d’éléments ». Lorsque cette caractéristique est affichée, on s’attend à ce que la méthode  estimateSize() du Spliterator retourne la taille exacte du flot de données (contrat endurci).

SUBSIZED

Caractéristique qu’on peut considérer comme supplémentaire à SIZED, et spécifique aux flots SIZED dont le partitionnement donnera deux flots également SIZED et SUBSIZED. Si un flot est SUBSIZED, les flots résultant de son partitionnement seront également SIZED… Et SUBSIZED ! Si nous déclarons un flot SUBSIZED, nous devons donc garantir un flot SIZED « récursivement » sur toutes sous-parties du flot découpé, ainsi que les sous-sous-parties issues du découpage des sous-parties, etc.

De manière générale, la taille estimée du Spliterator de départ (obtenue par appel à estimateSize() ) doit toujours être supérieure à chacune des tailles estimées des deux Spliterator obtenus par partitionnement (méthode trySplit()). Mais dans ce cas, nous devons observer une exigence supplémentaire : la taille estimée du Spliterator de départ étant exacte, elle doit également être la somme des tailles « estimées » des deux Spliterators obtenus par partitionnement.

Late-binding et fail-fast, quèsaco ?

La javadoc nous indique également que les sources qui ne sont ni IMMUTABLE ni CONCURRENT doivent préciser dans la documentation le moment où les modifications de la sources sont permises, et celui où la modification concurrente est détectée.

Si une modification de la source est permise entre la création du Spliterator et sa première utilisation, alors on dit qu’il est « late-binding » (qui s’attache tardivement à la source).

Si une modification est détectée dès la première utilisation, le Spliterator est dit « fail-fast » (qui notifie une erreur de modification concurrente au plus tôt).

Nous devinons donc que « late-binding » et « fail-fast » sont des propriétés qui donnent des marges de manœuvre dans l’utilisation du Spliterator, et peuvent donc permettre de meilleures performances.  Le premier réduit la fenêtre pendant laquelle des interférences sont possibles, et le second permet de détecter les erreurs de concurrence au plus tôt.

Les méthodes essentielles du Spliterator

tryAdvance et forEachRemaining

Ces méthodes sont les bases pour la partie itération du Spliterator.

tryAdvance(Consumer<? super T>) est la méthode qui permet un pas d’itération. L’implémentation consistera à faire ce qui est nécessaire pour obtenir un élément, et appeler le consommateur passé en paramètre. Dans ce cas, il faut retourner true pour signaler qu’il reste des éléments et qu’on a appelé le consommateur ; si l’implémentation ne peut plus fournir d’élément supplémentaire, elle n’appelle pas le consommateur et retourne false.

La méthode forEachRemaining(Consumer<? super T>)  est un tryAdvance de masse. Elle est utilisée par le framework Stream lorsqu’il est plus opportun de lancer l’itération sur tous les éléments plutôt que d’appeler tryAdvance pour chaque élément. L’implémentation par défaut est d’appeler tryAdvance en boucle jusqu’à ce que cette dernière méthode retourne false. Il est recommandé de la redéfinir si possible pour améliorer les performances. Un exemple simple d’optimisation pour éviter d’appeler tryAdvance pour chaque élément est d’intégrer dans forEachRemaining une boucle qui répète l’action unitaire de l’implémentation de tryAdvance.

forEachRemaining permet donc deux niveaux d’optimisation :

  1. basique : éviter un appel de méthode en codant une boucle qui en son sein fait le traitement unitaire de tryAdvance
  2. élaboré : coder un algorithme qui est capable de traiter plus rapidement N éléments que N appels à tryAdvance.

trySplit

C’est la méthode qui est appelée par le framework Stream lorsqu’il découvre qu’on lui demande de paralléliser l’itération et que les caractéristiques du Spliterator le permettent. Retourner null signifie qu’il n’est pas (ou plus) possible de partitionner le jeu de données, sinon il faut retourner un nouveau Spliterator qui va permettre d’itérér sur une partie des éléments. Attention, « l’ancien » Spliterator (le this de la méthode trySplit) ne doit plus pouvoir itérer sur les éléments qui seront parcourus par le nouveau ! Il a tout simplement délégué une partie de son travail.

Influence des caractéristiques

Les caractéristiques ci-dessus peuvent être d’une importance capitale pour la stabilité du comportement de notre Stream ainsi que pour la cohérence des résultats. En effet, suivant les caractéristiques, des optimisations peuvent être décidées. Celles-ci ont également une influence sur les performances, notamment lors de la parallélisation.

Une implémentation personnalisée pour OpenCSV

Afin de pouvoir créer un Stream à partir d’un flux d’entrée OpenCSV, nous allons implémenter un Spliterator adapté.

package org.joemojo.util.stream;

import au.com.bytecode.opencsv.CSVReader;

import java.io.IOException;
import java.util.Spliterator;
import java.util.function.Consumer;

/**
 * Ce Spliterator va permettre de créer des Stream&amp;lt;String[]&amp;gt;.
 * Chaque élément du Stream généré sera donc un tableau de String issu du découpage d'un enregistrement du fichier d'origine par OpenCSV.
 */
public class CSVLineSpliterator implements Spliterator&amp;lt;String[]&amp;gt; {
   CSVReader source;
   //Le Spliterator est créé à partir d'un CSVReader d'OpenCSV
   public CSVLineSpliterator(CSVReader source){
      this.source = source;
   }

   @Override
   public boolean tryAdvance(Consumer&amp;lt;? super String[]&amp;gt; action) {
      //tryAdvance sera appelé pour produire chaque élément du Stream. Il suffit donc de lire une enregistrement à chaque fois
      try{
         String [] line = this.source.readNext();
         //Rien : on retourne false pour signaler la fin du Stream
         if(line == null) return false;
         //Un enregistrement trouvé : (1/2) il faut notifier le consommateur
         action.accept(line);
         // (2/2) ... et signaler qu'a priori, on peut continuer à produire des éléments
         return true;
      }catch (IOException ex){
         return false; //IOException : simple fin prématurée. Dans un code de production, une log ne serait pas de trop
      }
   }

   @Override
   public Spliterator&amp;lt;String[]&amp;gt; trySplit() {
      //On ne sait pas partitionner un CSVReader, donc on doit retourner null
      return null;
   }

   @Override
   public long estimateSize() {
      //Un CSVReader ne nous permet pas de prévoir la taille du fichier d'origine, donc on retourne la valeur qui sera comprise comme &amp;quot;estimation impossible&amp;quot;
      return Long.MAX_VALUE;
   }

   @Override
   public int characteristics() {
      //IMMUTABLE car ici, pour simplifier, on considère que personne ne pourra perturber le Stream en nous forçant à &amp;quot;sauter des lignes&amp;quot; en lisant dans notre Stream
      //NONNULL car on pourra toujours produire un String[], même s'il ne contient que des chaînes vides.
      //ORDERED car les lignes sont rencontrées dans l'ordre du fichier, l'ordre est donc fixé à l'avance.
      return IMMUTABLE | NONNULL | ORDERED;
      //On notera qu'on ne sait pas gérer les accès concurrents sur un CSVReader, qu'on ne connait pas le nombre d'enregistrements qu'on va rencontrer, que les enregistrements ne sont pas triés et que ces derniers ne sont pas garantis tous différents ; les conditions pour les autres caractéristiques ne sont donc pas remplies
   }
}

On notera que dans le code ci-dessus, la caractéristique IMMUTABLE n’est vraie que dans l’exemple, mais fausse en général. Un Spliterator défini comme ci-dessus ne peut garantir que personne n’interfèrera dans la lecture en modifiant la source car il ne l’a pas lui-même créé. Ici, « modifier » la source signifie demander au CSVReader de lire des lignes depuis l’extérieur, ce qui change le résultat final car il manquera des éléments dans notre Stream.

Nous n’avons pas redéfini forEachRemaining, l’implémentation par défaut appellera donc tryAdvance pour chaque élément ; aucune optimisation n’est donc à espérer de ce côté.

Que pourrions-nous faire pour améliorer ce premier jet ?

Améliorations possibles

La première amélioration, serait de faire remonter l’IOException du tryAdvance. En effet, elle serait remontée à l’appelant et le développeur aurait la main sur le traitement à faire en cas d’arrêt non souhaité du flot. Sans la remontée d’exception, une erreur de lecture sera vue comme un arrêt normal du flot !

Si nous prenons un InputStream ou un Reader plus générique en paramètre pour masquer OpenCSV à l’utilisateur (ce qui n’est pas forcément le but), on n’a pas gagné l’immuabilité car la référence étrangère ne nous permet pas de nous assurer que seul notre Spliterator y accèdera.

Si nous prenons un chemin en paramètre, on peut alors maîtriser complètement la source. Cependant cela nécessite de fournir à l’utilisateur les mêmes options que le constructeur de CSVReader.

L’avantage d’avoir un Spliterator IMMUTABLE ou CONCURRENT est la mise en place d’un mécanisme de partitionnement, et ainsi donc accéder aux avantages de la parallélisation. Ici cela ne sera pas fait car le coût pour partitionner correctement est un parcours d’au moins 50% du fichier CSV. Il ne suffit pas de compter les lignes, il faut parser les enregistrements (qui peuvent être sur plusieurs lignes) et s’arrêter après le premier enregistrement qui permet de dépasser les 50% de la taille physique du fichier.

Enfin, nous pourrions voir s’il est possible de redéfinir forEachRemaining de manière à aller plus vide qu’en appelant tryAdvance pour chaque élément. Là non plus ce n’est pas évident pour un fichier CSV parsé.

Le code source du Spliterator pour OpenCSV avec un exemple est disponible sur github.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *