Publié par et
Il y a 2 années · 16 minutes · Data

Les outils de la Data Science : Spark MLlib, mise en pratique (2/2)

 

Dans le précédent article, nous vous présentions les différents concepts liés à MLlib : les types de RDD à fournir, la structure des algorithmes et la démarche à adopter pour construire une chaîne de traitement de Machine Learning. Nous allons maintenant mettre en pratique ces concepts à travers plusieurs exemples basés sur les différents algorithmes qui composent MLlib.

MLlib est divisé en plusieurs packages, dont la liste exhaustive se trouve ici. Un package représente soit une catégorie d’algorithmes de Machine Learning (classification, régression, etc.), soit des outils nécéssaires à leurs usages (feature engineering, statistiques, etc.). Nous allons passer en revue dans cet article quelques-uns des principaux packages disponibles et donner des exemples d’implémentation.

La création et l’analyse exploratoire des données avec les packages random et stat

Le package random

MLlib permet la génération aléatoire de RDDs via son package random. C’est un package très utile pour faire des tests, nous permettant de créer des données selon plusieurs types de distributions: Normale, Poisson, Exponentielle, Log-Normale, Gamma et Uniforme.

Utilisons ce package pour créer un RDD[Vector], contenant un million de lignes et 4 colonnes, avec des données tirées selon une loi normale centrée réduite. On utilisera ce RDD par la suite pour présenter les différents algorithmes de Machine Learning (les résultats des algorithmes ne seront pas très bons car les données n’ont aucune structure, l’idée étant simplement de présenter comment construire une chaine de traitement en Machine Learning).

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.rdd.RDD
 
// Create a RDD[Vector] with data drawn from normal distribution
val data: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, numRows = 1000000L, numCols = 4)

Le package stat

Le package stat regroupe dans son objet Statistics des fonctions statistiques très utilisées notamment dans la partie exploratoire des données. On peut citer par exemple la fonction colStats() qui retourne une instance de MultiVariateStatisticalSummary regroupant entre autres les max, min, mean et variance pour chaque colonne d’un RDD[Vector].

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

// Compute column summary statistics.
val summary: MultivariateStatisticalSummary = Statistics.colStats(data)
println(summary.mean) // a dense vector containing the mean value for each column
println(summary.variance) // column-wise variance
println(summary.max) // highest number for each column

Appliquées au RDD généré dans l’étape précédente, on devrait retrouver des valeurs proches de 0 pour la moyenne et de 1 pour la variance.

On trouve de plus une fonction de calcul de la corrélation entre deux RDD. Les corrélations supportées sont celles de Pearson et de Spearman. Si on ne spécifie qu’un seul RDD, la fonction calcule l’autocorrélation.

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics

// Calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
// If a method is not specified, Pearson's method will be used by default. 
val correlMatrix: Matrix = Statistics.corr(data, "pearson")

Pour notre RDD, la matrice retournée doit donner des 1 sur la diagonale, et des valeurs proches de 0 sur le reste (pas de corrélation entre les différentes colonnes).

Pour compléter ce panel, on trouve aussi des fonctions relatives aux tests d’hypothèses, comme le test du chi-squared (chiSqTest).

Les algorithmes d’apprentissage supervisé avec les packages classification et regression

Comme leurs noms l’indiquent, les packages classification et regression regroupent tous les principaux algorithmes liés à l’apprentissage supervisé (voir le précédent article sur MLlib pour un rappel sur le Machine Learning) et vont donc demander l’utilisation de RDD[LabeledPoint]. Les deux types ont pour objectif de prédire une variable grâce à des features en utilisant un training set contenant les vrais labels. La différence entre les deux est qu’en classification, on cherche à prédire une variable discrète (ex: spam / non spam), alors qu’elle est continue en régression. Voyons maintenant comment sont utilisés quelques algorithmes classiques de Machine Learning présents dans ces packages.

Un exemple en régression: LinearRegressionWithSGD

La régression linéaire est l’algorithme le plus classique pour des problématiques de régression. En MLlib, il est implémenté en utilisant de la Descente de Gradient Stochastique, d’où son nom. Son objectif est de minimiser la fonction de coût suivante:

Avec:

  • w le vecteur des paramètres appris par l’algorithme (correspondant globalement au poids de chaque caractéristique)
  • x un vecteur de caractéristiques (un point dans l’espace)
  • y la valeur réelle observée

Il est possible d’y ajouter des termes de régularisation, donnant lieu aux algorithmes dits de Ridge Regression ou de Lasso.

Voici un exemple d’implémentation, utilisant comme données d’entrée le RDD créé précédemment. Un autre RDD nommé label est aussi créé, il va représenter notre variable cible. Il est créé selon une loi uniforme (double entre 0 et 1).

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.mllib.regression.{LinearRegressionWithSGD, LinearRegressionModel, LabeledPoint}
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.rdd.RDD

// Create a RDD[Vector] with data drawn from normal distribution
val data: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, numRows = 1000000L, numCols = 4)

// Create a RDD of labels for regression
val label: RDD[Double] = RandomRDDs.uniformRDD(sc, 1000000L)

// Create a RDD[LabeledPoint]
val dataWithLabel: RDD[LabeledPoint] = data.zip(label).map(point => LabeledPoint(point._2, point._1))
dataWithLabel.cache()

// Building the model
val numIterations = 100
val model: LinearRegressionModel = LinearRegressionWithSGD.train(dataWithLabel, numIterations)

// Evaluate model on training examples
val predictionAndLabels: RDD[(Double, Double)] = dataWithLabel.map { case LabeledPoint(label, features) =>
  val prediction: Double = model.predict(features)
  (prediction, label)
}

// Compute Mean Square Error
val metrics: RegressionMetrics = new RegressionMetrics(predictionAndLabels)
val MSE = metrics.meanSquaredError
println("training Mean Squared Error = " + MSE)

La démarche est classique, et on la retrouvera quasiment systématiquement:

  • Création du RDD sous le bon format
  • Construction et entraînement du modèle
  • Prédiction et évaluation des prédictions faites

On remarquera que l’on a préféré la méthode train() de LinearRegressionWithSGD en mettant le nombre d’itérations en paramètre de la méthode, plutôt que d’utiliser la méthode run() avec son setter associé. De plus, il est fortement recommandé d’utiliser la méthode cache() pour monter notre RDD en mémoire. De nombreux algorithmes de Machine Learning nécessitent plusieurs itérations sur les données, leur montée en mémoire va donc réduire fortement le temps de calcul.

Pour les algorithmes de régression, il est possible d’évaluer les modèles à l’aide de la classe RegressionMetrics, qui permet d’obtenir notamment la mean squared error.

Un exemple en classification: LogisticRegressionWithLBFGS

Contrairement à ce que son nom peut sembler l’indiquer, la régression logistique est un algorithme de classification utilisé fréquemment pour prédire une réponse (binaire ou multi-classes). La fonction de coût qu’il veut minimiser est la suivante :

Son implémentation en MLlib se fait soit par Descente de Gradient Stochastique (LogisticRegressionWithSGD) ou par l’algorithme Limited-Memory Broyden-Fletcher-Goldfarb-Shanno (LogisticRegressionWithLBFGS), qui est généralement préféré. Cette dernière implémentation ne possède pas de méthode train(), nous aurons donc recours à la méthode run() et à l’appel des setters associés à la classe.

import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.rdd.RDD

// Create a RDD[Vector] with data drawn from normal distribution
val data: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, numRows = 1000000L, numCols = 4)

// Create a RDD of labels for classification (5 classes)
val label: RDD[Double] = RandomRDDs.uniformRDD(sc, 1000000L).map(x => (x*5).floor)

// Create a RDD[LabeledPoint]
val dataWithLabel: RDD[LabeledPoint] = data.zip(label).map(point => LabeledPoint(point._2, point._1))

// Split data into training (75%) and test (25%).
val splits: Array[RDD[LabeledPoint]] = dataWithLabel.randomSplit(Array(0.75, 0.25), seed = 11L)
val training: RDD[LabeledPoint] = splits(0).cache()
val test: RDD[LabeledPoint] = splits(1)

// Run training algorithm to build the model
val model: LogisticRegressionModel = new LogisticRegressionWithLBFGS().setNumClasses(10).run(training)

// Compute raw scores on the test set
val predictionAndLabels: RDD[(Double, Double)] = test.map { case LabeledPoint(label, features) =>
  val prediction: Double = model.predict(features)
  (prediction, label)
}

// Get evaluation metrics
val metrics: MulticlassMetrics = new MulticlassMetrics(predictionAndLabels)
val precision: Double = metrics.precision
println("Precision = " + precision)

La démarche appliquée est la même que dans l’exemple précédent, avec en plus une étape de splitting du RDD, ce qui est recommandé en apprentissage supervisé. L’apprentissage se fait alors sur le training set, et les prédictions et évaluations se font sur le test set. Cela permet d’avoir une meilleure idée de la capacité de généralisation de notre algorithme à de nouvelles données sur lesquelles il ne s’est pas entraîné.

Pour les algorithmes de classification, il est possible d’évaluer les modèles à l’aide de la classe MulticlassMetrics (dans le cas où il y a plus de deux classes), qui permet d’obtenir notamment la précision ou la matrice de confusion. Pour une classification binaire, il faut plutôt utiliser BinaryClassificationMetrics, qui a un fonctionnement similaire mais possède des métriques supplémentaires spécifiques à ce type de classification.

Autres algorithmes

Voici quelques autres algorithmes présents dans les packages classification et regression:

  •  Classification
    • NaiveBayes: Algorithme qui suppose l’indépendance de chaque caractéristique du jeu de données pour faciliter le calcul de probabilité d’appartenance à une classe. Il est par exemple utilisé dans la classification de spams.
    • SVMWithSGD: C’est une généralisation des classifieurs linéaires plus classiques. Il donne souvent de bons résultats en pratique et est particulièrement utilisé lorsqu’il faut travailler avec des données à grandes dimensions. Il a de plus l’avantage de ne nécessiter que d’un faible nombre d’hyper-paramètres (les paramètres qu’il faut tuner pour améliorer les résultats de l’algorithme).
  • Régression
    • LassoWithSGD: Algorithme de régression linéaire pour lequel on ajoute une pénalisation du premier ordre (on ajoute la somme des valeurs absolues des paramètres à la fonction de coût) pour faire de la régularisation et rendre l’algorithme plus robuste et moins enclin à de l’overfitting (apprentissage par coeur du training set)
    • RidgeRegressionWithSGD: Identique au précédent, mais la pénalisation est cette fois du second ordre (on ajoute la somme du carré des paramètres à la fonction de coût).

Leur utilisation se fait exactement de la même manière que pour les deux algorithmes précédemment présentés. Il suffit de regarder dans la documentation les différents paramètres qui leurs sont propres.

Le package tree pour les algorithmes fonctionnant à la fois en régression et en classification

Les algorithmes basés sur des arbres de décision (DecisionTree, GradientBoostedTree, RandomForest) pouvant être utilisés à la fois en classification et en régression, ils bénéficient d’un package tree qui leur est propre.  Ils permettent de plus d’utiliser des variables catégorielles (il faut alors spécifier à l’algorithme quelles sont les variables catégorielles en question).

DecisionTree est un algorithme très commun en Machine Learning car il est très interprétable et n’oblige pas à faire un scaling préalable des données. Concrètement, cet algorithme fonctionne en pratiquant de manière récursive des partitions binaires dans l’espace des caractéristiques. A chaque étape de splitting, l’algorithme va choisir la caractéristique qui va maximiser le gain en information après le split, c’est-à-dire minimiser notre incertitude sur la variable à prédire dans les subsets créés.

RandomForest et GradientBoostedTree sont des méthodes dites d’ensemble. Le principe n’est plus de construire un unique arbre de décision, mais un ensemble, soit en parallèle (RandomForest), soit les uns après les autres sur les résidus des précédents (GradientBoostedTree). Ce sont des algorithmes très puissants, donnant souvent de très bons résultats.

En plus de la méthode classique run(), il est possible pour les algorithmes DecisionTree et RandomForest d’utiliser les méthodes trainClassifier() ou trainRegressor(), qui ont des paramètres différents. Voici un exemple de classification avec un DecisionTree.

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.rdd.RDD

// Create a RDD[Vector] with data drawn from normal distribution
val data: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, numRows = 1000000L, numCols = 4)

// Create a RDD of labels for classification
val label: RDD[Double] = RandomRDDs.uniformRDD(sc, 1000000L).map(x => (x*5).floor)

// Create a RDD[LabeledPoint]
val dataWithLabel: RDD[LabeledPoint] = data.zip(label).map(point => LabeledPoint(point._2, point._1))

// Split data into training (75%) and test (25%).
val splits: Array[RDD[LabeledPoint]] = dataWithLabel.randomSplit(Array(0.75, 0.25), seed = 11L)
val training: RDD[LabeledPoint] = splits(0).cache()
val test: RDD[LabeledPoint] = splits(1)

// Train a DecisionTree model
val model: DecisionTreeModel = DecisionTree.trainClassifier(training, numClasses = 5,
  categoricalFeaturesInfo = Map[Int, Int](), impurity = "gini", maxDepth = 5, maxBins = 32)

// Evaluate model on test instances
val predictionAndLabels: RDD[(Double, Double)] = test.map { point =>
  val prediction = model.predict(point.features)
  (prediction, point.label)
}

// Get evaluation metrics
val metrics: MulticlassMetrics = new MulticlassMetrics(predictionAndLabels)
val precision: Double = metrics.precision
println("Precision = " + precision)
println("Learned classification tree model:\n" + model.toDebugString)

Les autres algorithmes fonctionnent globalement de la même manière, à la différence prêt que les paramètres à tuner sont plus nombreux. Pour le GradientBoostedTree, le choix entre régression et classification ne se fait pas avec trainRegressor ou trainClassifier, mais en utilisant la classe BoostingStrategy et en spécifiant defaultParams(« Classification ») ou defaultParams(« Regression »). Les autres paramètres sont aussi à spécifier de cette manière.

import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
 
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.
boostingStrategy.treeStrategy.numClassesForClassification = 2
boostingStrategy.treeStrategy.maxDepth = 5
//  Empty categoricalFeaturesInfo indicates all features are continuous.
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
 
val model = GradientBoostedTrees.train(training, boostingStrategy)

L’apprentissage non supervisé avec le package clustering

Le package clustering contient une liste d’algorithmes de type non-supervisé, pour lesquels l’objectif n’est plus de prédire une variable mais de regrouper des points dans les clusters à forte similarité. On utilise alors des RDD[Vector] puisqu’il n’y a aucun label à disposition. Les algorithmes de clustering sont couramment utilisés dans de l’analyse exploratoire de données ou bien comme composante d’une pipeline pour l’utilisation d’un algorithme supervisé.

K-Means est l’algorithme de clustering le plus courramment utilisé. Il est nécessaire de lui spécifier en paramètre le nombre de clusters que l’on souhaite trouver, ce qui en fait parfois sa faiblesse. Son utilisation, comme celle de tous les autres algorithmes de Machine Learning avec MLlib, est très simple.

import org.apache.spark.mllib.clustering.{KMeansModel, KMeans}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.random.RandomRDDs
import org.apache.spark.rdd.RDD

// Create a RDD[Vector] with data drawn from normal distribution
val data: RDD[Vector] = RandomRDDs.normalVectorRDD(sc, numRows = 1000000L, numCols = 4)

// Cluster the data into two classes using KMeans
val clusters: KMeansModel = KMeans.train(data, k = 2, maxIterations = 20)

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE: Double = clusters.computeCost(data)
println("Within Set Sum of Squared Errors = " + WSSSE)
Comme on peut le voir dans le code ci-dessus, le KMeansModel possède une méthode computeCost qui calcule la somme du carré des distances entre chaque point et son centroïde le plus proche. C’est une mesure de la similarité des points au sein de chaque cluster.

Les autres packages

Voici quelques autres packages présents dans la verion 1.3 de Spark MLlib, que nous n’allons pas développer ici, mais qui sont facilement utilisables et compréhensibles via leurs documentations associées.

  • feature: Ce package contient différentes classes relatives au Feature Engineering. On y trouve des algorithmes tels que TF-IDF qui permettent de construire des features à partir de textes, ou bien StandardScaler qui permet de mettre les features à la même échelle (souvent requis par les algorithmes de Machine Learning)
  • linalg: Ce package est utilisé pour représenter des matrices, ainsi que pour faire de la Réduction de Dimension.
  • optimization: Ce package permet d’avoir accès aux algorithmes tels que la Descente de Gradient Stochastique pour des développements plus poussés. C’est sur ce package que se basent beaucoup d’algorithmes de Machine Learning  présentés précédemment.
  • recommendation: C’est dans ce package que se trouvent les algorithmes relatifs au Collaborative Filtering (pour les systèmes de recommandation).

MLlib: Quel avenir?

On l’a vu, MLlib permet de déployer des chaînes de traitement de données de manière relativement simple sur Spark. La librairie répond de manière forte aux réclamations faites contre Mahout pour les problématiques de Machine Learning sur des Big Datas.

La librairie est en constante expansion, avec l’ajout fréquent de nouveaux algorithmes et de nouvelles méthodes de traitement des features.

Cependant, MLlib souffre de quelques critiques, que leurs concepteurs sont en train d’effacer brillamment. On l’a vu, beaucoup d’algorithmes peuvent à la fois être entraînés grâce à la méthode run() et à la méthode train(), ce qui porte parfois à confusion. De plus, on est obligés de passer systématiquement par une étape de mapping des données pour les mettre au format souhaité (RDD[Vector] ou RDD[LabeledPoint]), alors qu’une librairie telle que scikit-learn en Python est beaucoup plus flexibles en acceptant notamment des Arrays Numpy ou bien des DataFrames Pandas.

Pour contrer cela, les contributeurs du projet développent en parallèle à MLlib une nouvelle API, spark.ml, qui propose une structure plus logique pour la construction des algorithmes de Machine Learning. Elle incorpore de plus des notions essentielles en Data Science que sont les Pipelines et la Cross Validation. Enfin, les algorithmes prendront en entrée des DataFrames, notion introduite dans la version 1.3 de Spark SQL. Une DataFrame est une collection de données organisée en colonnes, ce qui en fait un objet très proche de ce que l’on connait sur R ou Python.

Ainsi, la nouvelle API spark.ml, qui est encore en phase de développement, permettra de construire des pipelines complètes de Machine Learning en liant plusieurs algorithmes les uns aux autres, et pouvant prendre en entrée une structure de données beaucoup plus intuitive pour des Data Scientists, les DataFrames.

Pour aller plus loin

Si vous souhaitez travailler plus en profondeur sur les notions présentées sur MLlib, nous vous conseillons de jeter un oeil aux ateliers Hands’ On que nous avons donné cette année à Devoxx France et à Mix-IT sur le même sujet. Les répertoires associés se trouvent aux adresses suivantes:

Alban Phelip
Alban est data engineer chez Xebia. Issu d'une formation en statistiques il s'est spécialisé dans l'ingéniérie des données. Ses outils préférés : Spark et R. Speaker et bloggeur il se passionne par tout ce qui touche de près ou de loin au Big Data et à la Data science.
Yoann Benoit
Yoann est Data Scientist chez Xebia depuis près de deux ans. Il intervient sur de nombreux sujets autour de la Data Science et du Big Data, allant de la collecte, du traitement et de l'analyse des données jusqu'à la mise en production de pipelines complets de Machine Learning. Speaker et rédacteur à la fois sur les concepts et les technologies liées à la Data Science, il travaille principalement avec Python, Scala et Spark. Il intervient de plus en tant que formateur sur l'Analyse de Données et le Machine Learning sur Spark.

Une réflexion au sujet de « Les outils de la Data Science : Spark MLlib, mise en pratique (2/2) »

  1. Publié par Elias, Il y a 1 année

    Cette article est d’excellente qualité! Depuis le temps que je travaille avec Spark, j’ai rarement croisé un article de cette qualité. Bravo!

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *