Publié par
Il y a 11 mois · 16 minutes · Craft, Data

Tester du code Spark : 1- La théorie

spark.png

Spark est plus puissant et plus simple à utiliser que MapReduce, mais comment tester du code Spark ? Voici notre vision !

Spark est un framework de calcul distribué créé à Berkeley en 2010. Il connait une adoption impressionnante. Plusieurs raisons à cela : des performances de très loin supérieures à MapReduce et un framework à la fois beaucoup plus simple d’utilisation et multi-langage (Scala, Python, Java ou R).

Chez Xebia, nous aimons le code bien fait. En tant que craftsman de la Data, nous avons donc cherché la façon qui nous paraissait la meilleure pour tester notre code Spark.

Nous avons donc décidé d’écrire une série d’articles centrée sur les tests des programmes Spark. Plusieurs objectifs : présenter les problèmes liés à l’écriture de tests pour Spark, les outils et frameworks, et enfin, les manques de ces outils.
Dans ce premier article, nous allons nous concentrer sur la théorie : ce qu’il faut tester, comment le tester et les spécificités apportées à nos tests sur Spark.

Dans cette série d’articles, nous ne présenterons pas d’exemples en Java ni en R. Nous nous concentrerons sur les API Scala et Python qui sont à notre avis à favoriser, mais aussi les plus utilisées par la communauté. Les notions globales introduites restent cependant les mêmes quel que soit le langage.

Mais que doit-on tester dans son code Spark ?

Un code plus utilisable, lisible et plus maintenable

« Tester, ça prend trop de temps », « Tester, ce n’est pas simple, je ne sais pas ce que je vais écrire dans mon code, donc pas facile à tester », voire même « tester ? ». Autant de réactions que l’on obtient que trop fréquemment lorsque l’on évoque le sujet, quel que soit le domaine. Alors répondons tout de suite à ces arguments :

  • « Tester, ça prend trop de temps » : il a été déjà démontré à maintes reprises le gain de temps à tester son code. Tester, c’est identifier les erreurs au plus tôt. Et plus une erreur est corrigée tôt, moins il faudra de temps pour la corriger. On récupère donc aisément l’augmentation de temps passer à tester dans la réduction de temps passé à la correction de bugs.
  • « Tester, ce n’est pas simple, je ne sais pas comment je vais écrire mon code à l’avance » : Tester, c’est aussi se forcer à réfléchir à une manière élégante d’écrire son code. Puisque dans un test, on se place en tant qu’appelant des différentes méthodes, on est en mesure de mieux mesurer ce qu’elle doit ou ne doit pas faire. C’est en effet plus compliqué, mais on obtient facilement quelque chose de plus utilisable.
  • « Tester ? » : … nous ne nous abaisserons pas à répondre à cette remarque pourtant assez fréquente qui relève ici plus de la provocation.

A cela, nous voyons de nombreux avantages à nous concentrer sur une bonne écriture des tests. Cela permet entre autres de mieux comprendre et structurer son code, de le rendre plus lisible et donc mieux maintenable. Il permet de se focaliser sur le besoin métier. Enfin, si avec tout ceci, vous ne convainquez pas les sponsors de votre projet, parlez réduction du stock d’anomalie et de gain de satisfaction du client.

La programmation fonctionnelle nous vient en aide

Depuis quelques années maintenant, nous avons vu l’essor des langages de programmation fonctionnelle. Java 8 intégrant les lambda et les streams et Scala en sont des exemples. Mais on peut aussi bien faire du fonctionnel dans de nombreux langages.

Dans l’univers où nous travaillons, la donnée est au centre de toutes les attentions. La programmation fonctionnelle va grandement nous venir en aide ici puisque nous pourrons exprimer une grande partie de nos traitements à travers des transformations de données : map, flatmap, reduce, etc. Pour ceux qui auraient raté les bases, un petit rappel :

Maintenant que vous vous rappelez ces méthodes, nous allons explorer des exemples types d’application de ces concepts dans Spark 2.0. Nous allons ici séparer les descriptions pour Scala et Python.

En Scala, nous allons naturellement appliquer ce concept :

val result = dataFrame.filter(positiveBalance(_))
 
def positiveBalance(input: Row): Boolean = {
  input.getInt(input.fieldIndex("income")) - input.getInt(input.fieldIndex("outcome")) > 0
}

Pour réaliser cela en Python, nous devrons en général passer par le concept d’UDF. Le problème avec une UDF Python est que le framework va constamment effectuer des tâches de sérialisation/désérialisation entre Python et la JVM, dégradant ainsi les performances globales du programme. Attention donc à limiter l’utilisation d’UDF.

De plus, le fait que le code Python n’est pas fortement typé statiquement simplifie souvent l’écriture mais signifie en contrepartie que certaines erreurs ne seront visibles que lors de l’exécution. Nous allons donc préférer extraire le code d’une façon légèrement différente :

result = only_positive_balance(input_data_frame)
 
def only_positive_balance(data_frame, income_col='income', outcome_col='outcome'):
  return data_frame.filter(data_frame[income_col] - data_frame[outcome_col] > 0)

Avec cette syntaxe, nous nous assurons (un peu plus) que l’objet passé dans le paramètre data_frame se rapproche de la structure d’une DataFrame Spark. Nous nous assurons également de ne pas appeler d’UDF et donc de conserver des performances correctes.

Dans les deux cas, l’important sera d’extraire une partie testable de notre code. Nous pourrons donc nous concentrer au mieux (de façon moins explicite pour Python malheureusement) sur le fait de tester des fonctions métier, et non pas des paradigmes internes de Spark. Ce que nous remarquerons ici, c’est le fait que les objets passés en entrées et sorties de ces fonctions sont fréquemment spécifiques à Spark. Les outils que nous utiliserons devront donc pouvoir gérer la manipulation d’objets de Spark, ce qui ne pose généralement pas de problème.

Des chaînes de transformations

La définition même du test unitaire est de tester un petit morceau de code pour s’assurer qu’il remplisse bien son rôle. Pour le traitement de nos données, nous allons chaîner des transformations et des fonctions simples à travers nos concepts fonctionnels. Comme vu dans le paragraphe précédent, tous ces éléments sont facilement testables. Mais qu’en est-il de l’appel à la chaîne de transformation ? Comment s’assurer que les appels successif aux bonnes méthodes sont réalisés et dans le bon ordre pour répondre au besoin métier ?

Nous préférons ici nous poser la question : est-ce qu’un test unitaire doit tester le pipeline de données ? De manière générale, nous allons inclure ces tests dans un test d’intégration. Le test d’intégration est plus global et est autorisé à prendre plus de temps en raison de ses liens avec des éléments externes. Plusieurs éléments peuvent être vérifiés ici :

  • Les transformations peuvent-elles être appelées sur le jeu de données ? En Spark, dans une chaîne de transformations, nombreuses sont les étapes qui vont changer le schéma du jeu de données. La validation de la présence des colonnes requises est une étape réalisée à l’exécution. Ce type de test peut donc convenir à vérifier la bonne configuration de la chaîne de transformation.
  • Le schéma de sortie de la transformation est-il celui attendu ?
  • Les colonnes ajoutées par le pipeline contiennent-elles les bonnes valeurs ?

Mais… et les modèles probabilistes ?

A ce stade, nous voyons arriver des Data Scientists avec un argument qui leur est propre. « Un test unitaire doit être déterministe. Ce que je réalise est un modèle probabiliste et donc par essence est non-déterministe ». Dès lors, il faut vraiment chercher à comprendre ce que l’on souhaite tester.

Nous n’allons pas chercher à tester le fonctionnement de l’algorithme utilisé (à moins que vous implémentiez votre propre algorithme). Cela est (normalement, et on l’espère) déjà réalisé par l’auteur de l’algorithme. Ce que nous pouvons tester, c’est la performance du modèle avec les paramètres que l’on fourni, le tout sur un jeu de données bien défini. On cherchera donc par exemple à fixer un seuil minimal par rapport à une métrique bien identifiée.

L’enjeu le plus important ici sera la définition du jeu de données en entrée. Plusieurs méthodes existent pour définir ce jeu d’entrée :

  • Effectuer un échantillonnage des données de productions. Cette méthode nécessite de fréquemment mettre à jour le jeu de test pour refléter tout changement de comportement des données. Elle peut également nécessiter une phase d’anonymisation.
  • Générer des données aléatoirement sur la base d’un schéma donné. Cette méthode est plus simple mais répondra moins fréquemment au besoin car il est difficile de refléter le contenu des vraies données.

Nos premiers points de blocages

Nous avons vu les bases de la théorie. Ceci était censé représenter un monde idéal où tout se passe bien et où le test est donc facile. Dans la réalité, nous nous sommes heurtés à différents problèmes auquel nous allons tenter de répondre, ou au moins, énoncer les pièges à éviter.

La complexité d’un système distribué

Dans le monde de la Data, nous avons fréquemment affaire à des systèmes distribués et complexes. Par exemple, les données d’entrée sont dans un fichier sur disque ou bien dans une base de données ou encore dans un cluster HDFS. Dans nos tests, nous allons essayer de nous abstraire au maximum de ces contraintes. Cela passe avant tout par un bon découpage de son code. Une fois cela fait, nous aurons deux solutions, en fonction du cas dans lequel on se trouve.

Le cas le plus simple est celui du test unitaire. Ici, on ne voudra pas accéder à un élément externe. Nous allons donc mocker tout ce qu’il est nécessaire de mocker, c’est-à-dire utiliser une fausse implémentation de certains composants. Assurez-vous de bien tester le comportement et non l’implémentation. Demandez-vous si votre test doit échouer selon telle ou telle modification de l’implémentation. Ici, nous pourrons utiliser des bibliothèques telles que ScalaMock ou Mockito en Scala, ou bien les mocks de unittest en Python.

En ce qui concerne le test d’intégration, on tentera de limiter les mocks. Si possible, on souhaitera accéder à un environnement d’intégration. Nous préférerons en général l’utilisation d’un pseudo environnement local. Le plus connu autour de l’écosystème Hadoop est Hadoop mini-cluster. Il existe également Hadoop-unit, un wrapper de mini-cluster, qui permet de simplifier l’utilisation d’Hadoop mini-cluster. Il est toutefois à noter qu’il s’agit de bibliothèques Java. Nous ne pourrons donc les utiliser nativement que pour du test d’intégration en Java ou Scala. Heureusement, hadoop-unit fourni également un outil en ligne de commande qui permet de démarrer/arrêter le mini-cluster en ligne de commande. Ceci inclus dans un build Jenkins permettra de l’utiliser dans un test d’intégration en Python.

Dans la suite de cet article, nous allons nous focaliser sur Spark spécifiquement. Nous ne tiendrons donc pas compte des éléments externes tels que Impala par exemple.

Définir le bon jeu de test

Pour définir un test, nous allons avoir besoin d’un jeu de données en entrée. Quelle granularité choisir ? Où mettre le jeu de données à disposition ? Un jeu de données trop gros rendrait la lecture des tests illisibles. Un jeu de donnée trop petit ne comprendrait pas les cas minimaux que l’on souhaite tester.

Ici, nous allons revenir aux bases. Nous avons précédemment vu que nous pourrions profiter du paradigme fonctionnel. Et bien c’est l’occasion ou jamais. En découpant correctement son code, on peut obtenir des transformations de données fonctionnelles. Nous allons donc nous concentrer à tester en priorité les fonctions que nous avons définies. Ces fonctions devraient normalement être relativement simples et représenter des besoins métiers particuliers. Si on reprend l’exemple de filtre que nous avons cité précédemment, nous filtrions simplement les lignes ayant un revenu positif. Il est alors simple de définir les données du jeu d’entrée. En effet, il se limite à quelques cas simples (plus de gains que de dépenses, moins de gains que de dépenses, autant de gains que de dépenses et les cas avec des valeurs manquantes).

Maintenant, un élément nous paraît important à préciser ici : CHAQUE cas simple identifié doit faire l’objet de son propre test. Nous ne devons en aucun cas tester plusieurs cas dans un unique test dans le but de simplifier l’analyse de problèmes. Maintenant que nous avons vu cela, la localisation des données devient triviale. Nous allons définir clairement dans le code de nos tests ces jeux de données.

Partant de ce concept, nous avons pu rédiger la majeure partie de nos tests. Seul le cas de la validation d’un modèle de Machine Learning est difficilement applicable. Dans ce cas précis, le jeu de données est de taille importante. Nous allons donc, dans ce cas unique, déplacer le jeu de données dans un fichier externe. Attention toutefois au format de ce fichier. Nous préférerons en général un format de fichier non binaire afin de simplifier la lecture des modifications dans l’outil de gestion de configuration.

Le temps d’exécution des tests

Xebia a défini il y a quelques années des bonnes pratiques de développement à travers les Xebia Essentials. L’une d’elles est « Tests should be fast, reliable and independant ». Après avoir implémenté nos premiers tests Spark, nous nous sommes heurtés à un problème majeur : cette bonne pratique n’est pas respectée du point de vue du temps d’exécution. Même en local, le démarrage du contexte d’exécution Spark prend quelques secondes, voire dizaines de secondes. Ce sur-coût, qui semble petit, est en fait très pénalisant dans une optique de développement « test first ». Heureusement, il n’intervient qu’une seule et unique fois au démarrage de la suite de tests.

A ce jour, nous n’avons pas identifié de solution et ce problème reste un problème majeur. 

Le démarrage du contexte Spark

Lors de nos premiers développements Spark, alors en version 1.X, nous nous sommes également heurtés à des problèmes liés au lancement du HiveContext. En effet, si celui-ci est démarré en local, il ne peut pas l’être une seconde fois. Il est important que ce contexte soit unique dans l’application. Nous vous conseillons donc de créer un singleton de ce contexte afin de s’assurer de toujours appeler le même contexte.

Depuis Spark 2, ce problème est résolu par l’arrivée de la SparkSession. Cet objet est construit à l’aide d’un builder par une méthode getOrCreate qui s’occupe de gérer une instance unique. Nous vous conseillons tout de même de sortir l’utilisation de ce builder dans une classe externe à vos tests afin de ne pas dupliquer ce code.

object SharedSparkSession {
  val sparkSession = SparkSession
    .builder()
    .appName("SparkSession for unit tests")
    .master("local[*]")
    .getOrCreate()
}


class SimpleSpec extends FlatSpec {
  "Some Test" should "tests something" in {
    // Given
    val rdd: RDD[SimpleCaseClass] = ???
    val dataFrame = SharedSparkSession.sparkSession.createDataFrame(rdd)
    ???
  }
}
spark_context = SparkContext()
spark_session = SparkSession(spark_context).builder.master("local[*]").appName("Unit tests").getOrCreate()
 
class TestSimpleOperation(unittest2.TestCase):

    def test_with_balance(self):
        # Given
        data_frame = spark_session.createDataFrame([
            Row(some_column=42)
        ])
 
        ...

Une solution plutôt élégante en Scala consiste à placer cet objet partagé dans un trait.

trait SparkSessionProvider {
  val sparkSession = SparkSession
    .builder()
    .appName("SparkSession for unit tests")
    .master("local[*]")
    .getOrCreate()
}


class SimpleSpec extends FlatSpec with SparkSessionProvider {
  "Some Test" should "tests something" in {
    // Given
    val rdd: RDD[SimpleCaseClass] = ???
    val dataFrame = sparkSession.createDataFrame(rdd)
    ???
  }
}

Le streaming et ses besoins spécifiques

Un dernier souci que nous avons rencontré concerne le streaming. Spark permet de réaliser des traitements à faible latence à travers Spark Streaming. Il s’agit en fait de micro batchs exécutés à intervalles réguliers. Le principe d’un tel job est qu’il n’est pas censé se terminer. Comment alors le tester ?

Lors de nos premiers tests, nous étions arrivés à la même conclusion que pour les chaînes de traitement. Nous ne testions pas que le streaming fonctionne (ceci est censé être testé par Spark), mais plutôt que les traitements effectués lors de chaque micro batchs sont corrects. Nous extrayons donc une fonction correspondant au traitement, que nous allons fournir à Spark Streaming et que l’on pourra tester comme une fonction classique.

Cependant, nous sommes arrivés à un moment où nous avons dû utiliser la fonction updateStateByKey. Cette fonction impose l’utilisation du streaming. pour effectuer des tests sur son retour, nous avons dû nous intéresser à d’autres solutions comme sscheck. Cette librairie se base sur le Property Based Testing afin de tester des jobs en streaming. Nous entrerons plus en détail sur des exemples d’implémentations dans un prochain article.

Conclusion

Dans cet article, nous avons exploré différents aspects du test dans Spark. Nous avons défini ce qu’il convenait de faire, les bonnes pratiques. Pour résumer :

  • Spark fait usage de la programmation fonctionnelle. Utilisez ceci à votre avantage pour tester correctement les fonctions que vous fournirez à Spark.
  • Réfléchissez à ce que vous voulez tester. Il ne s’agit pas de tester le framework mais bien vos comportements fonctionnels. Il est très simple de tomber dans ce piège, spécialement dans le cadre de Machine Learning et de streaming.
  • Nous n’avons actuellement pas identifié de solution au temps de démarrage du contexte Spark. Il vous sera cependant nécessaire de le démarrer une seule et unique fois pour l’intégralité de vos tests.

Plus question donc de refuser de tester son code.

L’idée principale à retirer de cet article est que la plupart des points décrits ici n’est pas spécifique à Spark. Vous souhaitez sûrement une preuve que tout ceci est bien applicable. Il s’agira de notre prochain article dans lequel nous traiterons des différents outils que nous avons à disposition et de leur utilisation. Nous aborderons aussi les manques que nous avons identifiés dans ces outils.

Sylvain Lequeux
Sylvain a 6 ans d'expérience en développement d'applicatif backend, principalement Java. Depuis 2 ans, il s'investit autour des technologies BigData et a rejoint Xebia à ce titre en 2015. Il dispense la formation certifiante Cloudera Administrateur et est certifié Cloudera Developper. Sylvain est également passionné par le craftsmanship.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *