Publié par
Il y a 7 mois · 17 minutes · Data

Apache Kudu : la nouvelle antilope des architectures Big Data

Apache Kudu est un système de stockage distribué qui vient s’ajouter aux outils de la suite Hadoop. Le projet a été initié par les équipes de Cloudera et est spécialement conçu pour combler l’écart de performance entre HDFS et les bases de données colonnes. Cet outil est relativement jeune et vient se confronter à un écosystème ayant déjà fait ses preuves. Pourtant son succès a fait de lui un Top-Level Project Apache et lui a permis de rejoindre la suite CDH. La promesse faite par Kudu : faciliter la création d’applications en temps réel. Cet article est une introduction à Kudu, au cours duquel nous évoquerons ses spécificités et présenterons quelques moyens d’interagir avec ce nouveau produit.

Une grande partie de la proposition de valeur de Kudu s’appuie sur le constat suivant : les outils Big Data proposés aujourd’hui couvrent des fonctionnalités différentes et sont combinés pour obtenir des systèmes hybrides capables de fournir des performances d’analyse sur de larges datasets et de disposer d’un mode d’insertion ligne à ligne. C’est en expliquant cet écart de fonctionnalités que l’on peut comprendre le rôle de Kudu.

Les objectifs du projet Kudu

HDFS et le stockage brut

HDFS, Hadoop Distributed File System, est souvent considéré comme la brique indispensable d’un datalake et joue le rôle de la couche de persistance la plus basse. Les données y sont stockées sous forme de fichiers bruts. C’est la partie immuable de notre dataset. On dit que la donnée y est rangée en « Append Only ». HDFS fournit alors la réplication, la distribution, la tolérance aux pannes. Il a également l’avantage d’insérer et de permettre la remontée de grands blocs de données. Cela favorise la recherche d’information sur des données stockées au format colonne. C’est en partie grâce à ce dernier point que nous pourrons analyser notre dataset sur des volumétries très importantes (avec des moteurs de calcul comme Spark) ou procéder à des batchs de chargement de données conséquents.

En revanche, pour mettre à jour une unique ligne d’enregistrement on se tournera de préférence vers des bases de données relationnelles ou NoSQL.

HBase et le stockage colonne

Qu’il s’agisse de HBase ou de Cassandra, notre base de données orientée colonnes va permettre la mise en place de vues accessibles sous de faibles temps de latence. La sélection d’une simple ligne d’information est alors beaucoup plus simple que sur la donnée brute. Ces bases de données permettent également la mise à jour d’éléments, c’est la partie mutable de notre dataset. Les vues créées au dessus de la donnée brute pourront être régulièrement mises à jour. Des offres de services peuvent alors être mises en place sur cette couche du datalake, comme des applications de reporting ou des systèmes de recommandation.

Pourtant, dans le cadre d’une analyse impliquant de larges tâches de scan il sera préférable de passer par HDFS.

kudu-hbase-hdfs1

Ces deux modes d’accès à la données avaient jusqu’à maintenant imposé la création d’architectures complexes avec ces deux types d’outils. Grâce à un positionnement à mi-chemin entre ces deux logiciels, Kudu propose de simplifier les architectures en offrant une solution à mi-chemin entre ces deux approches.

Kudu, deux en un!

Dans cette partie nous aborderons les choix de conception liés au stockage de Kudu. Il s’agira donc d’une synthèse dont les détails peuvent être retrouvés dans la publication The Kudu Paper.

Pour répondre au challenge fixé de mettre à disposition à la fois des scans de colonnes efficaces et des accès ligne à ligne sous de faibles temps de latence, l’équipe de développement a choisi de concevoir une solution complète. Cette solution ne prend donc pas appui sur le stockage de HDFS mais propose à la place son propre système de stockage développé en C++. Le principal moyen d’accès à la donnée est Impala. Il existe aussi des API C++, Python et Java pour vous permettre de développer des applications autour de Kudu.

La réplication

L’architecture s’organise autour de deux types de services : le Master-Server et les Tablet-Servers. La donnée est stockée au format colonne et restituée sous forme de tables. Les tablet-servers stockent ces tables découpées en partitions appelées tablets. Les tablets sont répliquées sur les différents tablets-server. Le master-server conserve les méta données. Il porte des informations sur l’ensemble du cluster et sait notamment quels sont les tablet-servers responsables (ou leader) pour chaque partition. Le master coordonne la réplication des partitions sur les différents nœuds en suivant l’algorithme du Raft Consensus. Le Raft Consensus (qui à lui seul mériterait un article) est fortement inspiré du Paxos, un autre algorithme qui intervient dans le cycle de vie d’un cluster CassandraDB et qui a déjà été abordé dans un article précédent. Cette organisation implique que le master dissocie des serveurs leaders et followers pour chaque partition et leur délègue des tâches différentes:

  • Le nœud leader d’une partition aura la tâche de détecter la disparition de nœuds follower et de déclencher une réplication sur un autre nœud le cas échéant
  • Les nœuds follower en cas de disparition de leur leader procéderont à l’élection d’un nouveau leader

kudu-architecture-2

Ces deux opérations reposent sur un système de heartbeats permettant au cluster de se réorganiser. Il y a peu de temps encore le master-server était le point faible de Kudu (son single point of failure). Depuis la version 1.0 sortie en Septembre 2016, les méta données et les services fournis par le master-server sont redondés via le même système de leader / follower. Kudu peut donc gérer la perte d’un nœud maître. La table contenant les méta données est appelée catalog-table. C’est l’unique table hébergée par le master. En dehors de ce point le catalogue possède les même propriétés que les autres tables. On retrouve au sein du catalogue les informations suivantes pour chaque table : la version courante de son schéma, son état (creating, running, deleting), et les différents leaders associés.

Le partitionnement

Chaque table possède une clef primaire unique composée d’une ou plusieurs colonnes. Un sous-ensemble de cette clef peut être utilisé comme colonne de partition. Kudu offre deux modes de partitionnement.

  • Un partitionnement par Hash. Avec le premier mode de partitionnement, les lignes insérées seront réparties grâce au Hash de la colonne de partition choisie. Contrairement au mode de partitionnement que l’on connaît sous parquet on a affaire ici à des partitions statiques. Il faut donc bien s’assurer de choisir une colonne de partition qui séparera la table en des blocs équitables pour éviter un déséquilibre des partitions, aussi appelé hotspots.
  • Un partitionnement par Range. Celui-ci est basé sur des intervalles. On pourra donc par exemple créer une partition par an, sans connaître exactement la distribution des dates pour celle-ci. Ces deux modes de partitions peuvent être combinés comme le montre l’illustration suivante. C’est le partitionnement flexible :

hash-range-partitioning-example.png

Parmi les récentes améliorations de Kudu on note la possibilité de disposer d’une dernière Range partition qui ne présente pas de borne supérieure. Cela facilite la gestion de séries temporelles. Les enregistrements futurs ne seront plus rejetés à cause de l’absence d’une partition correspondante. Les informations pourront continuer à s’accumuler dans la dernière partition.

range-partitioning-on-time.png

Les types et la compression

Toutes les colonnes sont fortement typées. Cela permet une gestion de l’encodage et de la sérialisation par colonne. Pour l’instant les types disponibles sont les suivants : boolean, timestamp, floatstring (UTF-8), integers et bytes. À la création de la table il est possible d’associer un encodage spécifique pour une colonne. Par défaut les colonnes sont stockées sans compression, mais il est possible de spécifier un format de compression parmi LZ4, Snappy ou Zlib. La compression impacte les performances en lecture, il y a donc un compromis à faire. Face aux autres outils de l’écosystème Hadoop, Kudu s’en sort plutôt bien en termes d’espace utilisé, comme le montre cette figure réalisée par le CERN dans le cadre d’un benchmark sur les formats de stockage.

chep2016_space.jpeg

Random Access Queries

Pour s’approcher des performances de HDFS, Kudu s’est fortement inspiré du format parquet. Reste maintenant à atteindre les performances des bases de données orientées colonnes lors de l’insertion ou de la remontée de lignes individuelles. Les bases de données comme Cassandra, ou HBase font intervenir une implémentation du LSM (Log-structured merge), qui consiste à reporter les modifications dans un store en mémoire puis de réaliser un flush sur le disques. En reprenant une partie de ces notions Kudu met à disposition des écritures légèrement plus lentes au bénéfice de la lecture. Autour de ce sujet, un des benchmarks bien connu est celui fait à l’aide de YCSB (Yahoo! Cloud Serving Benchmark). Il s’agit de faire varier la proportion de lecture et d’insertion dans une série de requêtes.

YCSB-kudu.png

Le résultat montre que HBase est presque deux fois plus rapide sur les chargements majoritairement composés d’insertions (load, B, C). Néanmoins, lorsque l’on mélange les insertions avec des requêtes d’analyse, Kudu parvient à égaler le nombre d’opérations par seconde réalisées sur Hbase.

Enfin, face au théorème CAP Apache Kudu se positionne sur l’axe CP. Autrement dit, Kudu privilégie la cohérence et la tolérance au partitionnement par rapport à la disponibilité. Cela se traduit par le respect des rôles de nœuds leader et follower décrits précédemment. Les nœuds followers n’acceptant pas directement les écritures, des insertions peuvent être retardées en cas d’indisponibilité du leader pour une partition donnée. Il faudra attendre l’élection d’un nouveau leader pour prendre connaissance des écritures et en faire profiter le reste du système. En cas d’incident, les informations les plus récentes peuvent être inaccessibles mais la cohérences des différents répliquas sera respectée.

Kudu dans la pratique

Il existe plusieurs moyens pour découvrir Kudu. Le plus connu étant la machine virtuelle Kudu Quickstart VM. Mais on peut également trouver des images Docker plus légères. Pour des démarches plus ambitieuses, il est possible d’obtenir Kudu à l’aide d’un parcel Cloudera dès la version CDH5.4 de Cloudera. Le 1er février 2017, Cloudera a officialisé l’arrivée de Kudu dans la stack CDH5.10. Vous n’avez donc plus d’excuses, Il est temps de mettre la main à la pâte ! Dans cette partie, nous allons mettre en avant quelques manières d’accéder à Kudu.

Les données d’exemple

Pour que nos exemples s’appuient sur un cas simple je vous propose le sujet suivant : imaginons que nous disposons d’un petit service web collectant des tags accédés par les visiteurs d’un blog technologique (comme celui de Xebia par exemple). Nous voulons donc stocker les informations concernant l’adresse de l’hôte émetteur, la date de l’événement, le tag (#data, #back, #craft etc…) et le navigateur du visiteur.

Une première ingestion

La première opération que nous allons effectuer sur Kudu est la création d’une table vide via Impala. Cela peut être réalisé de la manière suivante :

CREATE DATABASE blog;

CREATE TABLE blog.web_tags (
 host STRING,
 ts STRING,
 tag STRING,
 browser STRING,
 is_mobile BOOLEAN,
    PRIMARY KEY(host, ts)
)
PARTITION BY HASH(ts) PARTITIONS 10, RANGE(ts) (
 PARTITION VALUES < "2016",
 PARTITION "2017" <= VALUES < "2018",
 PARTITION "2018" <= VALUES
)
STORED AS KUDU

L’exemple précédent propose de créer une table interne. Contrairement à une table externe, elle sera gérée par Impala. Cela signifie que la clause DROP TABLE supprimera la donnée source stockée dans Kudu. Sur une table externe, cela aurait eu pour unique effet de détruire le mapping entre la table Kudu et Impala. Notez que cette écriture a été récemment simplifiée avec Impala 2.8 (cf [IMPALA-2848]). Une autre écriture plus complexe impliquant la clause TBLPROPERTIES peut être utilisée, mais aujourd’hui les informations relatives au cluster Kudu peuvent être indiquées au niveau du démon Impala. C’est un des exemples de la bonne intégration Kudu avec le reste de l’écosystème Big Data.

Une seconde approche consisterait à créer une table Kudu à partir d’une table existante dans Impala. La syntaxe AS SELECT permet de réaliser la requête suivante :

CREATE EXTERNAL TABLE blog.web_tags PRIMARY KEY (host, ts) STORED AS KUDU AS SELECT host, ts, tag, browser, is_mobile FROM impala.web_tags;

Cette méthode peut être intéressante si on dispose déjà de données brutes (au format csv, parquet, etc …) dans un stockage distribué comme HDFS. A la fin de l’exécution de la requête les premières tablettes seront déjà peuplées par les données sélectionnées. Cette requête est donc relativement longue en fonction des données à disposition. Pour la suite, les requêtes SELECT, INSERT, UPDATE ou DELETE se rapprocheront des requêtes habituelles sous Impala.

La requête suivante réalise un UPDATE sur quelques lignes erronées. C’est typiquement quelque chose qui nous sera impossible avec notre traditionnel couple HDFS / parquet.

UPDATE blog.web_tags SET tag = "Cloud" WHERE tag = "-1" AND ts >;= '2017-02-01';

La table est maintenant prête et on peut réaliser quelque requêtes. Par contre il y a peu de chance que nous soyons assez patient pour réaliser une cinquantaine de requêtes d’insertion pour poursuivre notre exemple. Au lieu de cela, il y a un connecteur Kafka Connect qui mérite d’être abordé ici. Apache Kafka est un bus de messages distribué et résilient largement adopté dans les architectures Big Data. Le débit important offert par Kudu et sa capacité à réaliser des insertions ligne à ligne encouragent la cohabitation avec Kafka. L’utilisation du framework Kafka Connect permet de concevoir des applications faisant le lien entre Kafka et d’autres systèmes. Ici, l’application qui nous intéresse a été développée par datamountaineer. Les quelques lignes de configuration suivantes suffisent à relier une version 3.1.2 de Confluent à notre table Kudu.

tasks.max=4
topics=web-tags
connect.kudu.master=<host>;:<port>;
connect.kudu.sink.kcql=INSERT INTO web_tags SELECT * FROM web_tags
connector.class=com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector

Une première analyse

Maintenant que nous disposons de données dans notre table, nous pouvons explorer d’autres modes d’accès. Pour rappel, un des points forts de Kudu est de supporter la réalisation d’analyses sur de larges datasets. Voyons donc ensemble de quoi nous disposons au niveau analytics sur Kudu. Pour ce faire nous allons utiliser Apache Spark, un moteur de calcul distribué pour accéder aux informations stockées dans Kudu. Il est possible d’ajouter la dépendance kudu-spark dans le fichier build.sbt pour un projet scala :

libraryDependencies += "org.apache.kudu" % "kudu-spark2_2.11" % "1.2.0"

ou lors de l’utilisation du spark-shell avec la commande :

spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.2.0

Nous sommes maintenant en mesure d’instancier un KuduContext. C’est un objet qui relie notre application Spark aux masters Kudu et qui permet d’accéder aux tables. Il possède les méthodes de création, modification ou suppression des tables. Le KuduContext fait aussi le lien entre les types des schémas Kudu et ceux des DataFrame Spark. La lecture d’une table nécessitera l’import des fonctions implicites liées au package Kudu.

import org.apache.kudu.spark.kudu._
import sparkSession.sqlContext.implicits._

val sparkSession = SparkSession.builder.getOrCreate()
val masters = "<host>;:<port>;,<host2>;:<port2>;"
val tableName = "web_tags"

val kuduContext = new KuduContext(masters)
assert(kuduContext.tableExists(tableName))

val kuduOpts: Map[String, String] = Map(
  "kudu.table" ->; tableName,
  "kudu.master" ->; masters
)

val df: DataFrame = sparkSession.sqlContext.read.options(kuduOpts).kudu

df.where($"ts" >;= "2017-03-01").groupBy($"browser", $"is_mobile").count()

Ici, on repasse les informations concernant la table à un DataFrameReader. Cela permet à Spark de comprendre que la donnée est au format Kudu. On obtient donc un DataFrame issue des tablettes. On arrive alors assez rapidement sur des manipulations classiques en Spark. Lors de l’écriture deux choix d’insertion sont proposés : INSERT et INSERT-IGNORE. Il est conseillé d’utiliser le second mode accessible via la méthode .insertIgnoreRows(). Cela permet d’éviter d’écrire plusieurs fois les même lignes lors de la reprise des tâches Spark en erreur. Autrement, il y aurait des exceptions liées à l’unicité des clefs primaires. Il est important de noter que pour la version 1.2.0 seul le mode d’écriture Append est implémenté.

kudu-blogpost-barchart.png

En appliquant la méthode .explain() on peut voir les prédicats descendus vers Kudu. Kudu-spark délègue les tâches de scan et filtrage pour lesquelles Kudu est plus efficace, puis récupère le résultat. Ce travail ne sera donc plus à faire au niveau de nos exécuteurs Spark. Malheureusement, cela ne s’applique qu’aux prédicats principaux : >;=<=!= ou ==.  Certains prédicats comme BETWEENIN ou LIKE ne sont pas gérés par Kudu mais directement évalués par Spark.

dagtest2

Une première application

Enfin, en plus de pouvoir être accessible à des profils analyste ou Data Scientist pour différentes explorations, les faibles temps de latence de Kudu permettent la construction de vraies applications. Ces applications peuvent avoir des fonctions de reporting ou encore de mise à disposition de référentiels. L’idée étant de se rapprocher de ce qu’on décrit souvent comme une « web-like performance« . Les différentes API permettent de réaliser des requêtes DDL et CRUD de la même manière que nous l’avons vu sous Impala ou Spark. Le code qui suit est un bref exemple de l’utilisation du client java.

KuduClient client = new KuduClient.KuduClientBuilder(
 KUDU_MASTER_HOST +":"+ KUDU_MASTER_PORT
).build();

try {
    KuduTable kuduTable = client.openTable(table);

 System.out.println("5 first lines of the table : blog.web_tags");
    printHeader(kuduTable.getSchema().getColumns());
 // affiche une entête partant d'une List de org.apache.kudu.ColumnSchema

    KuduScanner scan = client.newScannerBuilder(kuduTable).limit(5).build();

    while (scan.hasMoreRows()){
        for(RowResult row : scan.nextRows()) {
   printRow(row);
   // affiche une ligne partant 
   // d'un org.apache.kudu.client.RowResult
&nbsp;  }
    }
    scan.close();

} catch (KuduException e) {
    e.printStackTrace();
} finally {

    try {
        client.shutdown();
    } catch (KuduException e) {
        e.printStackTrace();
    }
}

temps-latence-kudu

Conclusion

Kudu profite de l’expérience acquise par les produits de l’écosystème pour atteindre des objectifs qui jusqu’à maintenant étaient traités séparément :

  • Proposer des performances d’analyse et de full scan semblable à HDFS
  • Proposer des update in-place avec des performances similaires à celles de HBase

De gros efforts sont réalisés pour la bonne intégration de Kudu avec le reste des outils que l’on connaît en Big Data. Avec comme exemple notamment l’arrivée l’authentification via Kerberos apportée par la version 1.3.0. Reste à savoir désormais si l’adoption de ce système de stockage sera à la hauteur. Parmi les freins éventuels on peut noter son manque de maturité (ses types, modes d’écriture et prédicats manquant) et la contrainte de devoir gérer un autre système de stockage que HDFS. Néanmoins, l’ambition du projet n’est pas de remplacer HDFS ou HBase, mais d’être un complément sur des cas d’usage impliquant des analyses en temps réel. En jouant cette carte il y a fort à parier que Kudu sera de plus en plus inclus dans les architectures dîtes Fast Data.

Loic Divad
Loïc est Data Engineer chez Xebia. Il intervient sur des problématiques liées au Big Data comme l’acquisition, le traitement et le stockage des données. Il travaille avec des outils comme Scala, Spark et Kafka.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *