Publié par
Il y a 6 années · 5 minutes · Data

Storm, Hadoop Map Reduce en temps réel

BackType est une startup spécialisée dans l’analyse des données sociales : Twitter, Facebook, blogs, etc. Utilisant notamment Clojure et Python, les trois ingénieur-fondateurs ont mis en place une infrastructure à deux vitesses. Des batchs Hadoop effectuent les traitements lourds tandis que des topologies Storm mettent à jour les résultats à l’aide des dernières informations.

Juillet dernier, Twitter les a racheté et pendant la même période, Storm, leur innovation interne, fut rendue publique en tant que projet Open Source. Storm peut se comparer à Hadoop Map Reduce mais sans les contraintes de batch ou encore à Yahoo! S4 mais avec une garantie de traitement des messages. C’est une solution potentiellement à même de remplacer tout système complexe de queues avec des workers responsables de traiter, filtrer, transformer les messages.

Incidemment, il est intéressant de noter que l’essentiel du système d’information de Backtype repose sur Amazon Web Service. Comme nous l’avons vu dans nos ateliers de découverte d’AWS, c’est une solution simple d’accès mais très puissante. BackType est un très bon exemple : aucun sysadmin ne fut engagé pour gérer leur SI.

Storm en un éclair

Un cluster Storm est contrôlé par un ‘Nimbus’ qui reçoit et distribue les topologies. Sur chaque machine, un ‘Supervisor’ dialogue avec le Nimbus et lance les traitements nécessaires en local. ZooKeeper est utilisé pour la coordination.

L’API reste très simple puisque seules les fonctions coeurs de vos traitements sont à implémenter. Des passerelles existent vers le langage de votre choix, par exemple : Java, Clojure, Python. Cette API s’articule autour de 5 concepts :

  1. tuple‘ : un message, une suite de valeurs nommées pouvant avoir chacune leur propre type.
  2. stream‘ : une suite non finie de tuple ayant le même schéma.
  3. spout‘ : une source de stream. C’est le point d’entrée des données de la topologie, leurs tuples sont transmis à des bolts.
  4. bolt‘ : un noeud de traitement des messages (filtre, agrégation, jointure, etc.). A la réception d’un tuple, ils retransmettent 0 à n tuples à d’autres bolts.
  5. topology‘ : le réseau de spouts et bolts qui sera lancé pour traiter les messages. Elle définit également la manière dont les messages sont échangés et la parallélisation souhaitée.

Un exemple : les 3 termes les plus populaires du moment

Un exemple concret d’utilisation, non trivial en haute volumétrie, est la récupération en ‘temps réel’ des n mots les plus utilisés sur le moment dans un flux continu de messages, comme des tweets.

La topologie est simple : un seul type de spout émet un stream qui sera traité par trois types de bolts successivement, comme le montre le schéma.

Un aspect important à retenir des 5 concepts de Storm est qu’un tuple est une suite de valeurs nommées. Ainsi tous les composants (spout et bolt) doivent définir leur sortie. Et comme nous le verrons, c’est le point clef du grouping, la stratégie utilisée pour savoir quel est le prochain noeud de traitement d’un tuple.

Cette topologie ne possède qu’un seul spout. Celui émet des tuples avec une seule valeur, nommée « word », issue aléatoirement de l’ensemble : « bertel », « golda », « jackson », « mike », « nathan ». Ce spout est un exemple de source de données et est exécuté par 5 threads.

Un premier bolt compte le nombre d’occurrence de chaque mot issu du spout sur une période glissante. Il émet les résultats sous forme d’un tuple avec deux valeurs « obj » et « count », par exemple ("bertel","3"). Ce bolt utilise un field grouping sur « word » de tel sorte que ce soit toujours la même instance du bolt qui récupère toutes les occurrences d’un mot donné (consistent hashing). Il est exécuté par 4 threads. Cela permet de distribuer la structure de donnée gardant en mémoire le nombre d’occurrences déjà vu pour chaque mot.

Un second bolt trie les mots associés à leur nombre d’occurences et émet les 3 termes les plus courants, par exemple ("['jackson',5],['bertel',3]"). Similairement au premier bolt, il utilise un field grouping sur « obj ». Ce bolt est exécuté par 4 threads et permet de distribuer le tri. Pour des raisons de performances, un résultat n’est émit que toutes les 2 secondes.

Un dernier bolt merge tous les résultats et émet les 3 termes les plus courants, par exemple ("['jackson',5],['nathan',4],['bertel',3]"). Il est exécuté par un seul thread et utilise un global grouping pour récupérer tous les résultats. De même, pour des raisons de performances, un résultat n’est émit que toutes les 2 secondes.

Pour appliquer cette solution à un cas réel, il faudra juste changer les entrées et les sorties. C’est à dire changer le spout de tel sorte qu’il lise une source de données externe, ajouter ensuite un bolt pour découper les messages, puis finalement ajouter un dernier bolt pour persister les résultats dans le stockage de votre choix.

Si vous possédez maven et un jdk 1.6 ou 1.7, vous pouvez tester tout de suite cet exemple, et les autres, en clonant le repository sur github. Les topologies sont lancées en local et ne nécessitent pas d’autre installation.

Garantie de traitement des messages

Un message doit être traité en moins de 30 seconds par défaut. Si ce n’est pas le cas, il sera rejoué. Cette garantie se passe au niveau du tuple. Chaque bolt doit explicitement reconnaitre chaque tuple reçu (acknowledge) ou le faire échouer (fail). Mais cette garantie s’exprime également au niveau de l’arbre de tuples. Lors de l’émission d’un tuple par un bolt, il est possible de préciser le(s) tuple(s) d’origine (anchor). Dans ce cas là, un tuple sera considéré comme traité seulement lorsque tous ses descendants auront été traités. Bien entendu, il faut que toutes les données transientes ne soient pas critiques. En cas de perte d’un spout, par exemple, la source de données doit être capable de connaitre les messages en cours de traitement qui doivent désormais être rejoués. Mais cette problématique n’est pas inhérente à Storm.

Plus d’information ?

Pour plus d’information sur Storm, consultez le wiki ou encore la présentation sur InfoQ.

Bertrand Dechoux
Consultant et Formateur Hadoop @BertrandDechoux

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *