Publié par

Il y a 6 ans -

Temps de lecture 10 minutes

DevoxxFR – Lambda Architecture – Choose your tools for Real-Time Big Data

Cette session présentée par Florian Douetteau (CEO de Dataiku) présentait le principe des Lambda Architectures, leurs cas d’utilisation, ainsi que les différents outils utilisables pour mettre en place une telle architecture.

Pitch

« Lambda Architecture » est un terme générique pour décrire les architectures Big Data qui stockent et traitent en temps réel de la donnée, et savent la gérer à la fois comme un stock et comme un flux. De nombreuses architectures et outils ont été développés dans les mois précédents pour répondre à cette problématique et aller au-delà d’Hadoop1: Storm+Hadoop, Spark Streaming, SummingBird, Hydra. Les stores NoSQL (CouchBase…MongoDb..) peuvent aussi fournir une solution à ces problèmes jusqu’à un certain point. Dans cette présentation nous présenterons et comparerons ces outils..

Cas d’usage et principe

Voici des exemples de cas d’usage pouvants bénéficier de l’utilisation d’une Lambda Architecture :

  1. Publicité en ligne : besoin de compter le nombre de clics/affichages/transformations par campagne de publicités
  2. Système de recommandation : besoin de compter le nombre de vues/clics/achats des produits d’un catalogue
  3. Statistiques en temps réel : e.g. compter le nombre de tweets par hashtag par heure

Ces cas d’usage peuvent être généralisés en un besoin commun : celui de capturer des événements, de les traiter/transformer, des les stocker dans un état utile et enfin de servir ces vues fonctionnelles (en bref : faire de l’informatique …)

A première vue, ce besoin peut facilement être traité par une solution traditionnelle à l’aide d’une base de données relationnelle. A grande échelle, une telle approche ne tiendra pas la charge.

Nathan Marz a inventé le terme Lambda Architecture pour une architecture générique de traitement de données, sur la base de son expérience de travail chez Backtype et Twitter.

Une Lambda Architecture a pour objectif de satisfaire les besoins d’un système robuste, tolérant aux pannes (matérielles et erreurs humaines), être en mesure de servir un large éventail de charges de travail et de cas d’utilisation, et dans lequel une faible latence des lectures et des écritures est requise. Le système qui en résulte doit être linéairement évolutif – « scale out » plutôt que « scale up ».

Voici le schéma d’une Lambda Architecture proposé par Florian :

Une Lambda Architecture résout le problème du calcul de fonctions (arbitraires) sur des données (arbitraires) en temps réel en décomposant le problème en trois couches: :

  • La couche batch (Batch Layer) :Elle est responsable de deux choses. La première consiste à stocker l’ensemble de données et la deuxième est de calculer des vues logiques sur cet ensemble de données. Les vues, devant être calculées à partir de l’ensemble ou d’une partie importante des données, sont mises à jours relativement peu souvent (chaque itération peut éventuellement prendre plusieurs heures).
  • La couche temps réel (Speed Layer) : Cette couche ne traite que les données récentes et compense la latence élevée des mises à jour des vues de la couche batch en calculant des vues « temps réel ». Ces vues « temps réel » sont calculées de manière incrémentale en s’appuyant sur des systèmes de traitement de flux et des bases de données en lectures/écritures aléatoires. Dès que les données récentes sont prises en compte dans la couche batch, les résultats correspondants dans les vues « temps réel » peuvent être jetés.
  • La couche de service (Serving Layer) :Le travail de la couche de service est de charger et d’exposer les vues des couches batch et temps réel.

Une telle architecture devra traiter les problèmes suivants :

  • Assurer l’unicité du traitement d’un événement
  • Permettre la montée en charge sans remettre en cause l’architecture ou les outils utilisés
  • Permettre les évolutions du modèle de données

Tools and Framework

Florian propose une vue par module technique d’une Lambda Architecture :

Pouvant par exemple être réalisée avec les outils suivants :

Ces derniers mois de nombreux outils, pouvant répondre à tout ou partie des contraintes techniques d’une Architecture Lambda, ont été développés.

En voici quelques uns passés en revue par Florian :

Message Queues

Un buffer d’événements, sous la forme d’un bus de messagerie, peut être utilisé comme point d’entrée de votre architecture. Celui-ci, tolérant à la panne, peut garantir qu’aucun événement ne sera perdu.

Si des outils classiques comme ActiveMQ, HornetMQ ou RabbitMQ sont envisageables, des outils plus recents comme Kestrel ou Kafka peuvent potentiellement être mieux adaptés à une telle architecture.

Storm

Storm est un système de calcul distribué en temps réel qui sera typiquement utilisé comme composant real-time processing d’une Architecture Lambda ou comme canal unique de vos événements vers la couche batch et la couche temps réel de votre architecture.

L’abstraction de base dans Storm est un stream (flux) de tuples (une liste de valeurs nommées pouvant avoir chacune leur propre type).

Storm fournit les primitives pour transformer un flux dans un nouveau flux de manière distribuée et fiable : les spouts et les bolts.

Un spout est une source de stream. C’est le point d’entrée des données qui sont transmis à des bolts. Les tuples peuvent être émis à partir d’une queue d’un broker de messages ou en se connectant à une api de streaming par exemple.

Un bolt est un noeud de traitement des messages (filtre, agrégation, jointure, interaction avec une base de donnée, etc.). Il retransmet éventuellement les tuples transformés à d’autres bolts.

Une topology décrit le réseau de spouts et bolts qui sera lancé pour traiter les messages.Quand un spout ou un bolt émet un tuple dans un flux, il envoie le tuple à chaque bolt qui souscrit à ce flux. Chaque nœud dans une topology Storm est exécuté en parallèle. Dans votre topologie, vous pouvez spécifier le parallélisme que vous voulez pour chaque nœud.

Fiabilité optionnelle : il est possible, grâce à Storm de s’assurer q’un tuple sera traité au moins une fois par une topologie, à l’aide d’un principe d’acquittement (Guaranteeing-message-processing).

Trident

Trident est un abstraction de haut niveau au dessus de Storm. Il permet d’écrire vos topologies Storm à l’aide d’une API en Java, dont les concepts sont semblable à Pig ou Cascading (jointures, aggregations, regroupements, fonctions, filtres…), plutôt que de manipuler des spouts et des bolts.

Un exemple de code avec Trident :

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

En utilisant Trident, vous pouvez également vous assurer qu’un événement sera traité par votre couche « temps réel » exactement une fois.

En effet, avec Trident :

  • Les tuples sont traités par lots.
  • Chaque lot de tuples reçoit un identifiant unique : le numéro de transaction (txid).
  • Si un lot est rejoué, il recevra le même numéro de transaction.
  • Les lots sont traités « dans l’ordre ». Les tuples du lot 3 ne seront traités que lorsque les traitements sur les tuples du lot 2 auront réussi.

En persistant ce numéro de transaction dans les vues « temps réel » et en ne modifiant ces vues que pour les lots dont le numéro de transaction est différent, on assure une unicité de traitement. (cf. Trident state)

SummingBird

SummingBird est une librairie qui vous permet d’écrire vos algorithmes de traitements distribués en Scala, et de les exécuter en mode batch (grace à Scalding, une librairie en Scala au dessus de Cascading) ou en mode temps réel (en utilisant Storm). Il propose donc une abstraction commune au dessus de Hadoop et de Storm et vous permet de créer des systèmes hybrides à partir d’une seule base de code, lorsque les traitements de la couche batch et de la couche temps réel sont identiques.

Un exemple de code avec SummingBird :

object TweetHashTagCount {
    implicit val timeOf: TimeExtractor[Status] = TimeExtractor(_.getCreatedAt.getTime)
    implicit val batcher = Batcher.ofHours(1)
…

    def hashTagCount[P <: Platform[P]](
        source: Producer[P, Status],
        store: P#Store[String, Long]) =
        source
          .filter(_.getText != null)
          .flatMap { tweet: Status => tweet.getHashTags.map(_ -> 1L) }
          .sumByKey(store)
}

Spark

Spark, récemment promu projet Top-Level par la fondation Apache, est un moteur d’exécution de traitements distribués sur Hadoop alternatif à MapReduce. Il est capable d’exécuter des traitements batch 10 à 100 fois plus rapidement qu’avec MapReduce, principalement en réduisant le nombre d’écritures et de lectures sur disque. Pour cela il utilise le concept de Resilient Distributed Dataset qui permet, de manière transparente, de monter en mémoire des données distribuées sur HDFS et de les persister sur disque si besoin.

Spark peut également manipuler vos données en temps réel avec Spark Streaming une extension de Spark pour le stream processing.

Un exemple de traitement batch avec Spark en Java :

JavaRDD<String> file = spark.textFile("hdfs://...");
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
  });
JavaPairRDD<String, Integer> pairs = words.map(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
  });
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer>() {
    public Integer call(Integer a, Integer b) { return a + b; }
  });

Un exemple de traitement temps réel avec Spark Streaming en Java :

JavaStreamingContext jssc = StreamingContext(sparkConf, new Duration(1000))
JavaDStream<String> lines = jssc.socketTextStream(serverIP, serverPort);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
  });
JavaPairDStream<String, Integer> pairs = words.map(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); }
  });
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer a, Integer b) { return a + b; }
  });

Spark est une alternative sérieuse pour implémenter vos traitements batch et temps réel en manipulant des concepts et une API identique.

NoSQL (Redis/MongoDB/HBase…)

Pas besoin de manipuler des frameworks (trop?) récents pour obtenir les avantages d’une Lambda Architecture. Des base de données NoSQL (MongoDB, Redis, HBase…) peuvent être utilisées (jusqu’à un certain point) comme outil d’implémentation des briques techniques de celle-ci.

Un exemple avec Mongo :

SploutSQL

Florian nous propose de découvrir un dernier outil SploutSQL, qui permet d’indexer et de partitionner les données d’un cluster Hadoop et de les exposer en SQL.

Conclusion

Le concept de Lambda Architecture, popularisé par Nathan Marz, ne cesse d’inspirer la communauté. Pour aller plus loin, vous trouverez des ressources compilées sur les Lambda Architecture sur le site http://lambda-architecture.net/ ou encore dans le livre de Nathan Marz Big Data (toujours en Early Access mais presque terminé).

Merci à Florian pour cette présentation très intéressante.

Ressources

Pour ceux que ça intéressent, Dataiku est un éditeur de logiciel français qui publie le Data Science Studio un outil pour faciliter les tâches des Data Scientists.

Publié par

Publié par Matthieu Blanc

Matthieu est un expert Java et consultant senior dans le domaine du Big Data. Il est également formateur au sein de Xebia Training .

Commentaire

5 réponses pour " DevoxxFR – Lambda Architecture – Choose your tools for Real-Time Big Data "

  1. Publié par , Il y a 6 ans

    Merci beaucoup pour l’article intéressant mais j’aurai aimé comprendre pourquoi faut-il autant de couches logicielles pour avoir une solution stream et batch ? En prenant par exemple la section real time, on y trouve du MQ -> Storm -> Memcache -> Mongo to enfin la visu web. Pourquoi pas, par exemple un simple MQ -> NodeJS -> MongoDB ? Je ne dis pas que les choix sont mauvais, juste comprendre :) Merci

  2. Publié par , Il y a 6 ans

    Hello,

    Ce type d’architecture fait quand même beaucoup penser à ce qu’on trouve autour des notions de Event Sourcing / CQRS vous ne trouvez pas? On a l’impression que le rôle de la couche batch est de stocker des events et de recalculer régulièrement un snapshot qui servira de base lors de l’application d’evenements temps réels.

    Par contre je suis pas sur de comprendre comment se positionne SploutSQL ni en quoi il a un rapport avec les architectures lambda?

  3. Publié par , Il y a 6 ans

    @Larry Tu as raison, pas la peine de sortir l’artillerie lourde si un « simple » MQ -> NodeJS -> MongoDB suffit. Florian avait également présenté des exemples de Lambda Architecture à base d’outil plus « traditionnels » (chapitre NoSQL (Redis/MongoDB/HBase…)). Cependant un outil comme Storm possède des caractéristiques intéressantes pour mettre en place ce type d’architecture, à savoir : la tolérance à la panne, la garantie de traitement des événements (exactement une fois avec Trident), la tenue à des charges extremes (e.g. : Twitter…)

    @Sebastien L’idée est effectivement la même que l’Event Sourcing, je laisse Nathan Marz te répondre plus précisément : http://www.youtube.com/watch?v=ucHjyb6jv08#t=2652
    Je ne connais pas bien SploutSQL. Dans le cadre d’un Lambda Architecture il se placera dans la case Batch View Services du schema de Florian et permettra d’interroger ces vues en Full SQL (contrairement à un ElephantDB qui propose une API clé/valeur ou HBase et son API spécifique)

  4. Publié par , Il y a 6 ans

    @Matthieu, merci beaucoup pour la réponse et en parlant de Storm je me suis souvent demandé jusqu’à quel point ce type de logiciel (inc. le message queuing) pouvait supporter de connections multiple et comment dimensionner sont architecture en fonction de l’arrivée des datas. Je trouve qu’il y a assez peu d’informations sur cet aspect. Dans le cas d’un ZeroMQ/ActiveMQ/etc. combien de temps un seul serveur peut bufferiser les données. Est-ce par exemple la mémoire du serveur qui limite? les I/O disques? autre? et comment monitorer tout ça.

  5. Publié par , Il y a 6 ans

    @Sebastien Matthieu l’idée derrière SploutSQL est surtout d’avoir une base sql accessible uniquement en lecture. On calcul en batch les vues les plus intéressantes (il y a une sortie pig) et ensuite on publie dans sploutsql ces nouvelles vues. Les anciennes vues seront disponible jusqu’au moment ou splout aura recu l’ensemble des nouvelles vues. Le mechanisme permet d’être sur d’avoir l’ensemble des données jusqu’à la durée de ton batch le reste est géré en speed layer.

    @Larry Ceci va dépendre de tes entrées nombre de messages par secondes tailles des messages tailles de tes machines physiques liens réseaux, serialisation, compression, traitements, and co. Une chose est sur que ce soit Kafka, Zookeeper ou Storm ta RAM va être fortement sollicité.

    De manière plus général on utilise ces technos pour les aspects de distributed/scalable.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Nous recrutons

Être un Xebian, c'est faire partie d'un groupe de passionnés ; C'est l'opportunité de travailler et de partager avec des pairs parmi les plus talentueux.