Cascalog ou comment écrire ses MapReduces de façon concise

Hadoop est devenu une référence dans l’univers du BigData, et MapReduce, un nouveau paradigme pour exploiter les données. Implémenter directement les traitements de données avec MapReduce donne certainement le plus de flexibilité, mais cela revient à utiliser de l’assembleur. Le manque d’abstraction et la verbosité induite nuisent à la productivité.

Il existe des alternatives, plus haut niveau. Pig et Hive en sont les plus connues. Déterminer la plus pertinente est un exercice difficile car la réponse dépend grandement de votre contexte. Le but de cet article est de vous présenter Cascalog, sans doute l’alternative la plus concise. Basée sur Clojure, cette solution vous laisse dans un environnement familier : la JVM.

Les concepts, pour tout le monde

Cascalog est un langage de requêtage pour Hadoop inspiré par Datalog. Il s’agit d’un DSL Clojure et l’ensemble des librairies pour l’utiliser sont gratuites et open source. Cascalog est un autre outil de Backtype, les créateurs de Storm.

Compter le nombre d’occurrences de chaque mot est le tutorial de base d’Hadoop. C’est un exemple conceptuellement simple qui peut s’effectuer en une seule passe de MapReduce. Un ensemble de documents existe sur le système de fichiers. La fonction map émet séparément chaque mot de chaque document. Ensuite, la fonction reduce regroupe toutes les valeurs émises en fonction du mot. Le nombre d’occurrences s’exprime ainsi par la taille de chaque groupe.

Réalisé avec l’API java, cet exemple prend tout de même 59 lignes. La version en Cascalog est plus concise et s’exprime par la ligne suivante :

	
(?<- (stdout) [?word ?count] (docs _ _ ?text) (split ?text :> ?word) (c/count ?count))

?<- est l’opérateur de définition et d’exécution de la requête. Il est suffisant pour un cas simple mais nous verrons dans la seconde partie qu’il est possible de déclarer plusieurs requêtes séparément, avec l’opérateur <- , puis de les exécuter avec l’opérateur ?-.

(stdout) définit la sortie ou sink, c’est à dire le format et la destination des résultats. Pour un exemple ou du prototypage, la sortie standard est suffisante. Cette configuration se base cependant sur un Tap Cascading et vous offre un large choix de possibilités.

[?word ?count] définit les variables, identifiables par leur préfixe ?, qui devront faire partie des résultats. Cette partie est très similaire au début d’un select en SQL.

(docs _ _ ?text) (split ?text :> ?word) (c/count ?count) sont des prédicats. Ce sont eux qui définissent le traitement associé à la requête. Leur ordre n’a pas d’importance. Ils se regroupent en trois types : générateurs, opérations et agrégateurs.

1) Les générateurs

Les générateurs définissent les sources de données. Ce sont soit des Taps Cascading ou d’autres requêtes. A cet endroit, il sera nécessaire d’associer les valeurs lues à des variables. Lorsque plusieurs générateurs sont présents dans une même requête une jointure est effectuée, conditionnée par les variables qu’ils peuvent avoir en commun.

(docs _ _ ?text) associe la variable ?text au contenu de chaque document présent dans la source docs. Le _ signale une valeur que l’on ne souhaite pas utiliser.

(docs _ ?author ?text)(persons ?author ?birthday ?location) vous permettrait de rattacher les informations personnelles des utilisateurs à leur documents.

2) Les opérations

Les opérations manipulent les variables afin de filtrer les données non désirables ou d’en extraire une information complémentaire sous la forme d’une autre variable assignée grâce à l’opérateur :>.

(split ?text :> ?word) découpe le texte lu ?text en une série de mots qui seront associés à la variable ?word.

Split n’est pas une opération standard, nous verrons dans la second partie comment la définir.

3) Les agrégateurs

Les agrégateurs partitionnent les données et calculent une information supplémentaire par segment. La définition du regroupement est faite implicitement par les variables définies en sortie et il n’y a donc pas besoin d’expliciter encore cette information.

(c/count ?count) compte la taille d’un segment. Au vu de la définition de la sortie, il s’agit bien de ce que l’on cherchait : le nombre d’occurrence de chaque mots.

A travers cet aperçu vous avez pu aborder la concision de Cascalog. C’est un langage déclaratif, et tout comme le SQL, il demande seulement une description haut niveau de votre requête. C’est ensuite la responsabilité de l’outil d’optimiser le plan d’exécution. Certes, il se peut que vous puissiez améliorer les performances avec votre propre implémentation, mais il ne faut pas confondre productivité et performance. Il est important de valider le besoin et une solution possible avant de chercher la vitesse. Le but de la prochaine partie est de vous montrer un exemple plus complet afin de vous offrir un cas plus concret pour évaluer les avantages de Cascalog.

Le pas à pas, même pour les non clojuriens

Installation

En assumant que vous avez un jdk récent installé (1.6), il ne vous reste plus qu’à installer Cascalog en décompressant l’archive puis à télécharger le script leiningen dans le même dossier. Leiningen est un peu le maven des projets Clojure et va vous permettre d’automatiser certaines actions, comme lancer la console interactive ( REPL : Read Eval Print Loop). Lors du premier lancement, les dépendances devront être téléchargées. Votre console ne sera sans doute pas aussi claire mais vous devriez finalement accéder au prompt.

./lein repl
REPL started; server listening on localhost port 23624
user=>

Clojure étant un lisp, le code est constitué de listes (entre parenthèses) et utilise une notation préfixée. L’appel d’une fonction est représentée par une liste dont le premier élément est la fonction. Ainsi une somme s’écrit (+ 1 2). Toute expression retourne une valeur implicitement. Si la valeur retournée est inutile, elle sera nil.

La seconde tache consiste à mettre en place l’environnement.

	
(use 'cascalog.playground) (bootstrap)

La fonction use vous permet d’appeler directement les fonctions dans cascalog.playground telle que bootstrap qui mettra en place les imports nécessaires. Le fichier associé au namespace est disponible dans /src/clj/cascalog/playground.clj mais pourquoi voudriez vous quitter le REPL? Les fonctions source et doc vous seront utiles pour approfondir vos connaissances. Essayez (doc ?<-) ou encore (source bootstrap). Pour information, voici l’ensemble des opérateurs spécifiques à Cascalog évoqués dans cet article : stdout, ?<-, ?-, <-, count, defmapcatop et cross-join.

La troisième et dernière tache consiste à charger les données que nous allons utiliser : des extraits de Lessons from the identity trial, un livre disponible sous licence creative common. Pour cela placez le fichier docs.clj dans votre répertoire cascalog et exécutez:

(load-file "docs.clj")

Prise en main des données

Le symbole docs fait référence à une liste de documents possédant un identifiant, un auteur et un extrait. Le générateur docs vu précédemment lit ces même données par l’intermédiaire d’un MemorySourceTap. Voici comment vous les listeriez, sans l’extrait, avec Cascalog.

(?<- (stdout)  [?author ?id] (docs ?id ?author _))

En l’absence d’agrégateurs, les résultats sont dédoublonnés par un reduce implicite. Ce comportement est modifiable par l’ajout d’un prédicat.

(?<- (stdout)  [?author] (docs _ ?author _))
(?<- (stdout)  [?author] (docs _ ?author _) (:distinct false))

Cascalog étant inspiré de Datalog, les valeurs peuvent être associées à des constantes à la place de variables. Dans ce cas là, la requête ne traitera que les informations pour lesquelles la valeur est effectivement égale à la constante. La requête suivante permet de n’afficher que les informations du document possédant pour identifiant chap-01-1.

(?<- (stdout)  [?author ?text] (docs "chap-01-1" ?author ?text))

Lors de l’analyse de texte, il est important de vérifier l’encodage, la langue, l’orthographe et la casse. Ce sont des sujets qui peuvent devenir complexes lors de l’utilisation de source sans standard vérifié tel que le web. A titre d’exemple, nous allons nous intéresser à la casse comme seule problématique de normalisation. Toute fonction retournant un booléen peut être utilisée comme filtre.

(defn unnormalized [text] (not= text (.toLowerCase text)))

Il s’agit d’une définition classique de fonction en Clojure avec la macro defn suivie du nom de la fonction, des paramètres entre crochets puis du corps de la fonction. Un document n’est pas uniquement en minuscule s’il n’est pas égal à sa transformation en minuscule. La syntaxe .toLowerCase est à remarquer. On appelle avec un prefix . la méthode ‘java’ de l’instance. Clojure étant sur la JVM, il est tout à fait possible de réutiliser votre code java. Cela vous fournit un chemin de migration de java à Clojure ou vis versa. L’utilisation de Cascalog n’impose donc pas le fait de devoir utiliser Clojure exclusivement. Avec cette fonction, nous pouvons désormais lister les documents qui ne sont pas dans la norme.

(?<- (stdout) [?id] (docs ?id _ ?text) (unnormalized ?text) )

Bien sur pour un véritable corpus, il n’est pas réaliste de lister tous les documents à modifier. Il est juste nécessaire de connaitre leur nombre. Pour cela, nous utilisons le fait qu’une opération retournant un booléen peut être utilisée comme filtre mais peut également être utilisée pour ajouter une variable dans la requête. Il est ainsi possible de compter les documents en fonction de leur statut.

(?<- (stdout) [?deviant ?count] (docs _ _ ?text)
              (unnormalized ?text :> ?deviant) (c/count ?count))

Cette requête nous montre que tous les documents sont à normaliser. Il suffit juste d’une opération pour cela, le résultat pouvant être vérifié pour un seul document.

(defn normalize [s] (.toLowerCase s))

(?<- (stdout) [?clean-text ?text] (docs "chap-01-1" _ ?text) (normalize ?text :> ?clean-text))

Calculer la pertinence des mots avec TF-IDF

Nous sommes désormais à même d’aborder un sujet un peu plus complexe : calculer pour chaque mot de chaque document son TD-IDF. Il s’agit d’un score permettant d’indiquer la pertinence d’un mot pour identifier son document. C’est un principe de base pour des outils d’indexation et de recherche comme Lucene.

td-idf(word,doc) = number_of(word)in(doc) * log( number_of(doc) / number_of(doc)having(word) )

Ce pseudo-code se traduit presque littéralement en Clojure en décalant les opérateurs.

(defn td-idf [n-docs n-word n-doc-word] (* n-word (Math/log (/ n-docs n-doc-word))))

Encore une fois, il faut remarquer une syntaxe spéciale : Math/log est l’appel d’une méthode ‘java’ static.

Nous avons besoin de trois données. Obtenir le nombre de documents est trivial.

(?<- (stdout) [?count] (docs ?id _ _) (c/count ?count))

Pour le nombre d’occurrence d’un mot par document, nous savons que cela peut s’exprimer ainsi.

(?<- (stdout) [?word ?count ?id] (docs ?id _ ?text)
              (normalize ?text :> ?clean-text) (split ?clean-text :> ?word)
              (c/count ?count))

Le prédicat split est une simple opération comme vu précédemment si ce n’est qu’il faut utiliser une macro spécifique defmapcatop afin de signaler que chaque valeur d’entrée sera associées à une collection de valeurs en sortie et non une seule.

(defmapcatop split [text] (seq (.split text "[^\\p{Alpha}]+")))

La même logique peut être utilisée pour obtenir le nombre de documents possédant un certain mot en définissant un split ne gardant que les valeurs distinctes.

(defmapcatop distinct-split [text] (set (.split text "[^\\p{Alpha}]+")))

(?<- (stdout) [?word ?count] (docs ?id _ ?text) (normalize ?text :> ?clean-text) (distinct-split ?clean-text :> ?word) (c/count ?count))

Il est maintenant temps de regrouper les requêtes. Pour cela, nous allons devoir séparer leur déclaration, avec l’opérateur <-, de leur exécution, avec l’opérateur ?-. Nous introduisons également une nouvelle fonction Clojure let permettant d’associer un symbole à une expression, ici une requête. L’exécution de la requête pour obtenir le nombre de documents peut se reformuler de la manière suivante.

(let
[
   num-docs
   (<- [?count] (docs ?id _ _) (c/count ?count))
]
   (?- (stdout) num-docs)
)

Et voici comment le td-idf peut être calculé pour chaque mot de chaque document.

(let
[
   num-docs
   (<- [?count] (docs ?id _ _) (c/count ?count)),

num-word-doc (<- [?word ?count ?id] (docs ?id _ ?text) (normalize ?text :> ?clean-text)(split ?clean-text :> ?word) (c/count ?count)),
num-doc-word (<- [?word ?count] (docs ?id _ ?text) (normalize ?text :> ?clean-text)(distinct-split ?clean-text :> ?word) (c/count ?count)),
full-td-idf (<- [?id ?word ?relevance] (num-docs ?n-docs)(cross-join) (num-word-doc ?word ?n-word ?id) (num-doc-word ?word ?n-doc-word) (td-idf ?n-docs ?n-word ?n-doc-word :> ?relevance)), ] (?- (stdout) full-td-idf) )

Cette requête nous permet d’aborder la problématique des jointures. Lorsque deux prédicats partagent la même variable alors une jointure est faite implicitement avec cette variable. C’est le cas du nombre d’occurrence d’un mot ?word (num-word-doc) avec le nombre de documents possédant ce même mot (num-doc-word). Il est également possible de faire des jointures cartésiennes mais dans ce cas là, il faut rajouter un prédicat pour expliciter qu’il ne s’agit pas d’une erreur. Cela nous permet de rattacher le nombre de documents (num-docs) aux autres informations.

En analysant rapidement les résultats, on peut remarquer que les mots communs tels que  ‘a’, ‘and’ et ‘by’ sont très peu significatifs avec une valeur de leur td-idf entre 0 et 1. Par contre, des termes comme ‘facebook’, ‘feature’ ou ‘shelter’ et ‘women’ sont très pertinents pour identifier certains documents avec une valeur de leur td-idf supérieure à 15. Les résultats seraient encore plus significatifs avec un corpus plus important tel que le corpus fourni par Common Crawl et l’utilisation de techniques plus avancées telles que la racinisation ou la lemmatisation. Mais cela dépasse le cadre de cet article.

De même, les performances peuvent être améliorées en effectuant le pré-traitement en amont et en réduisant le nombre de MapReduces par l’utilisation de types complexes. Mais cela serait aller contre l’objectif de cet article qui était de présenter Cascalog, même pour ceux ne connaissant pas Clojure.

Avec ce dernier exemple, vous avez pu constater avec quel facilité il était possible de déclarer vos traitements MapReduces et de les regrouper afin d’extraire des indicateurs complexes. Pour plus d’information, je vous conseille bien sur le wiki et le google groupe de Cascalog mais également les blogs de Nathan Marz et de Sam Ritchie, les auteurs de « Big Data, Principles and best practices of scalable realtime data systems ».

2 commentaires

  • Merci de nous avoir présenté cette alternative à l’écriture de MapReduce en Java.
    Pour rebondir sur le premier point, java n’est pas concis par nature, avec les défauts et les avantages que cela présente.
    Une façon d’y remédier est de s’appuyer sur les outils proposés par apache. En utilisant des frameworks d’import de données et un peu d’avro, on arrive à simplifier fortement le code et avoir une grande lisibilité.

    Le paradigme est pensé pour pouvoir réutiliser Mapper et Reducer indépendamment et le langage Java permet l’héritage. Une fois la boite à outil constituée, l’écriture de nouveaux MapReduce en Java devient dans la plupart des cas concise et facilement utilisable par de nombreux développeurs.

    De plus, il ne faut pas oublier de limiter au maximum les opérations de lecture, car cela représente un coût important en performance et donc en prix. Java permet d’avoir un maximum de contrôle sur ce point. Ce sujet est évoqué en fin de post et je crois qu’il mérite un billet à part entière.

  • Java permet d’avoir un maximum de contrôle sur ce point.

    Et l’assembleur permet de ne pas gaspiller des cycles d’exécution pour rien!

    Je force volontairement la comparaison. Une brique ne peut pas aller plus vite que la brique sur laquelle elle se repose. Et un benchmark permettrait effectivement d’estimer la perte de performance. Sur ce point, on est d’accord.

    Cependant, je ne pense pas que cet écart en performance soit le facteur décisif. La véritable question est, comme pour l’acquisition de tout outil : est ce que le cout en vaut la peine (ROI) et quelles sont les alternatives (risques/opportunités). Cascalog permet une expression plus concise pour autant le cout d’apprentissage existe et il faut également introduire Clojure dans votre système. Contrairement à la comparaison java-assembleur, il est bien nécessaire de comprendre le principe de MapReduce pour pouvoir optimiser les performances. Pour une entreprise démarrant progressivement sur Hadoop, ayant peu de compétences dans le domaine, il peut être tout a fait pertinent de commencer par l’API Java. Ajouter des outils supplémentaires sans comprendre leur intérêt nuirait à la productivité. Pour autant, dans la mesure où l’utilisation d’Hadoop ne se limite pas à quelques requêtes qui ne sont plus à faire évoluer, dans le plus long terme des alternatives comme Cascading, Pig, Hive ou encore Cascalog seraient à étudier. Parmi ces quatre solutions, qui ne représentent pas une liste exhaustive, il serait encore une fois nécessaire de les comparer en fonction de l’environnement d’utilisation : chacune possède ses forces et ses faiblesses.

Laisser un commentaire