Publié par
Il y a 3 mois · 10 minutes · Data

Tester du code Spark – 2 – La pratique

Que de la théorie. Les tests du code Spark semblent abonnés à cette réalité : tout reste théorique. À croire qu’en réaliser est impossible. Il est grand temps de remédier à ce problème et démontrer que le sujet est abordable par tous.

Quitter la théorie, c’est débuter par les bonnes pratiques. Des réflexes sur la spécification des jeux de tests jusqu’aux extensions des DSL Scala et Python qui simplifient les assertions autour des DataFrames en passant par l’utilisation de Spark-Testing-Base, cet article couvre le minimum nécessaire à l’écriture des tests en Spark.

Améliorer la lisibilité du contexte d’exécution du test

Dans un cas idéal, un test contient trois sections : « Given », « When », et « Then ». Le « Given » défini clairement le contexte d’exécution du test qui le rend spécifique. C’est ce contexte, appliqué à notre méthode testée (le « When ») qui provoque la sortie attendue (le « Then »). Il est important de chercher à réduire la quantité d’informations présentes dans ce contexte d’exécution dans l’objectif d’améliorer sa lisibilité.

Premièrement, où doivent se situer les données de test ? Doivent-elles être externalisées dans un fichier dédié ? Ou bien doivent-elles être insérées directement dans le code du test ? Mon précédent article préconise de les intégrer dans le code du test, à l’exception des tests de validation d’un modèle de Machine Learning. À première vue, la solution du fichier permet de fournir un jeu de test plus conséquent. Cependant, augmenter la taille du jeu de test en entrée entraîne de la difficulté en termes de maintien de la cohésion entre le test et ses données.

Deuxièmement, comment initialiser un DataFrame contenant potentiellement beaucoup de colonnes ? La solution est en fait plutôt simple.

En Scala, la mécanique de « case class » associée à la méthode implicite d’une session Spark permet de conserver un maximum de simplicité comme le montre l’exemple suivant.

import org.apache.spark.sql.DataFrame
import org.scalatest.{FlatSpec, GivenWhenThen}

case class TestRow(column1: String = "", column2: String = "")

class SimpleSpec extends FlatSpec with GivenWhenThen with SparkSessionProvider {
  import sparkSession.implicit._
  
  "A Unit test" should "have a readable Given" in {
    Given("A specific value on column 1")
    val inputDataFrame: DataFrame = List(
      TestRow(column1 = "specific value")
    ).toDF()
    
    ???
  }
  
}

En Python, le « kwargs » fourni quelque chose de semblable à la version Scala. Bien que plus légère à première vue, cette version convient principalement à des cas simples de valeurs précises. Toute inclusion d’une valeur nulle (None) impose d’initialiser et de fournir un schéma SparkSql (une instance de StructType).

import unittest2

spark_context = SparkContext()
spark_session = SparkSession(spark_context).builder.master("local[*]").appName("Unit tests").getOrCreate()
 
class TestSimple(unittest2.TestCase):
 
  def test_shoud_have_readable_give(self):
    # Given
    input_data_frame = spark_session.createDataFrame([
      Row(column1 = "specific value")
    ])
    ....

La compréhension du test est simple, en Python comme en Scala : le DataFrame d’entrée contient une ligne dont la particularité porte sur la colonne « column1 ».

Spark-Testing-Base, de nombreuses fonctionnalités pour différents langages

Une des bibliothèques les plus connues autour du test de programmes Spark est sans conteste Spark-Testing-Base. Cette bibliothèque propose de nombreuses fonctionnalités pour différents langages (Scala, Java, Python) parmi lesquelles des classes d’aide à la rédaction d’assertions sur les RDD, DataFrames et DataSets et de tests basés sur l’utilisation d’un Mini-Cluster. Elle propose également du support autour de ScalaCheck.

Une configuration simple

En Scala comme en Python, la procédure de configuration est simple.

En Scala, elle est décrite directement dans le readme du dépôt Github du projet. De base, un ajout de dépendance Maven ou SBT et le tour est joué (il est toutefois recommandé de correctement dimensionner la mémoire car l’utilisation d’un contexte Spark local est assez gourmand). Attention à bien préciser la version voulue de Spark dans la dépendance.

val sparkVersion = "2.2.0"
val sparkTestingBaseVersion = "0.7.4"
libraryDependencies += "com.holdenkarau" %% "spark-testing-base" % s"${sparkVersion}_$sparkTestingBaseVersion" % "test"
    <dependency>
        <groupId>com.holdenkarau</groupId>
        <artifactId>spark-testing-base_${scala.version}</artifactId>
        <version>${spark.version}_${sparktestingbase.version}</version>
        <scope>test</scope>
    </dependency>

En Python, le package est disponible sur le dépôt PyPi comme tout autre package. Il est donc installable via PIP par la commande pip install spark-testing-base. Il suffit ensuite de placer la dépendance dans le fichier setup.py dans la zone tests_requires :

from setuptools import setup
 
setup(
  ...
  tests_requires=['spark-testing-base']
  ...
)

Cette bibliothèque propose trop de contenu pour qu’il soit totalement couvert dans cet article. Probablement sa fonctionnalité la plus utile, la classe DataFrameSuiteBase fourni le nécessaire à un test simple dans Spark, en particulier des fonctions utilitaires pour la comparaison de DataFrames. Voici comment les utiliser :

class SimpleSpec extends FlatSpec with DataFrameSuiteBase {
  "Simple Spec" should "show how to use DataFrameSuiteBase" in {
    // Given
    val df = List(
      Balance(income=3, outcome=2)
    ).toDF()
 
    // When
    val result = df.filter(positiveAmount(_))
 
    // Then
    assertDataFrameEquals(df, result)
  }
 
  it should "also show to to deal with errors" in {
    // Given
    val df = List(
      Balance(income=2, outcome=3)
    ).toDF()


    // When
    val result = df.filter(positiveAmount(_))


    // Then
    intercept[TestFailedException]
      assertDataFrameEquals(df, result)
  }
}

Une fonctionnalité semblable est disponible dans la version Python de la bibliothèque. Il faut utiliser la classe SQLTestCase. Voici comment faire :

from pyspark.sql import Row
from sparktestingbase.sqltestcase import SQLTestCase
import unittest2

class SimpleTest(SQLTestCase):
  def test_do_nothing(self):
    rdd = self.sc.parallelize([Row(
        column1="foo"
    )])
    df = rdd.toDF()
    
    self.assertDataFrameEqual(df, df)

À utiliser pour des besoins spécifiques

Il est indéniable qu’un framework tel que Spark-Testing-Base rend le code plus lisible. La quantité de méthode type « helpers » apporte une facilité d’écriture des tests. Sur un petit projet, la simplicité de mise en œuvre est flagrante.

Son utilisation apporte néanmoins quelques complexités un peu cachées au premier abord. L’utilisation de cette bibliothèque par Maven/SBT provoque souvent le téléchargement d’une quantité astronomique de dépendances afin d’avoir Spark-Testing-Base à disposition. Il arrive d’ailleurs parfois d’avoir des conflits de dépendances apportés par les dépendances transitives de Spark-Testing-Base, mais rien d’insoluble. Dans de nombreux cas, le besoin se situe au niveau d’une ou deux classes assez simples de cette bibliothèque. Beaucoup d’éléments seront à ramener pour finalement quelques lignes de code. Préconisation personnelles : réécrire le petit morceau de code équivalent de cette bibliothèque pour répondre à son besoin.

Enfin, un dernier point négatif de ce framework est qu’il est moins fourni en Python, de quoi satisfaire principalement les utilisateurs de Spark en Scala.

En définitive, ce framework est à utiliser pour des besoins spécifiques. Certaines classes utilitaires sont assez complexes (telles que celles relatives à ScalaCheck) et il ne serait pas judicieux de les réécrire.

Simplifier l’écriture des tests Spark en Scala

Lors de l’écriture des tests Spark en Scala, il est classique d’utiliser les matchers de ScalaTest pour valider les attendus. Ceci fonctionne très bien pour valider des éléments tels que le contenu des schémas (les champs sont une collection de StructTypes que l’on peut valider) ou le contenu des DataFrames. Il s’agit d’une collection de Row. Puisque dans les tests, la quantité de donnée est limitée, la méthode collect est utilisable. Il est alors possible de tout ramener sur le driver pour utiliser les collections de base de Scala, donc les matchers de base, sans exploser la consommation mémoire.

Voici un exemple type de test de schéma et de contenu de DataFrame :

class SimpleSparkAndScalaTest extends FlatSpec with Matchers with SparkSessionProvider {
  import sparkSession.implicit._

  def onlyPositiveBalance(inputDataFrame: DataFrame): DataFrame = {
    def positiveBalance(input: Row): Boolean = {
      input.getInt(input.fieldIndex("income")) - input.getInt(input.fieldIndex("outcome")) > 0
    }
    inputDataFrame.filter(positiveBalance(_))
  }
  
  case class Balance(id: Int, income: Int = 0, outcome: Int = 0)
  
  "Simple test" should "do something" in {
    // Given
    val inputDF = List(
      Balance(id = 1, income = 3, outcome = 2),
      Balance(id = 2, income = 3, outcome = 4)
    ).toDF()
    
    // When
    val result = onlyPositiveBalance(inputDF)
    
    // Then
    result.schema.map(_.name) should contain allOf ("income", "outcome")
    result.count() shouldBe 1
    result.map(row => row.getInt(row.fieldIndex("id"))).head shouldBe 1
  }

}

La couverture de test est à un bon niveau puisque tout le code peut ainsi être testé. Mais à la lecture, il y a quelque chose de très dérangeant. La dernière ligne est trop complexe et est donc difficile à lire. Il est grand temps de simplifier l’écriture des tests en Scala.

Pour répondre à ce besoin, quelques lignes de code suffisent à créer un peudo DSL qui simplifie la lecture des tests. Par exemple, pour que les deux lignes de code suivantes soient équivalentes :

result.map(row => row.getString(row.fieldIndex("column_to_test"))).head shouldBe "correct_value"
field("column_to_test") of result.head() shouldBe "correct_value"

Les quelques lignes suivantes permettent cette syntaxe :

def field(name: String) = FieldMatcherOfRow(name)

case class FieldMatcherOfRow(fieldName: String) {
  def of(row: Row) = row.get(row.fieldIndex(fieldName))
}

Ainsi, la lecture se fait de façon proche du langage naturel : « Le champ « column_to_test » de la première ligne de mon DataFrame doit être « correct_value » ». Il faut ensuite laisser libre court à son imagination pour créer d’autres contenus de ce type afin de rendre lisible tous les tests !

Simplifier l’écriture des tests Spark en Python

Afin de simplifier également l’écriture de ces mêmes tests Spark en Python, le manque de DSL est encore plus flagrant. En général le framework PyHamcrest propose, selon mes critères (très subjectifs donc), la syntaxe la plus lisible et compréhensible. À l’instar de la version Scala, il est possible de créer des DSL qui rendent nos tests lisibles en Python. L’un des éléments importants à tester sur un DataFrame est sa taille. La matcher PyHamcrest correspondant s’écrit de la manière suivante :

class DataFrameCountMatcher(BaseMatcher):

    def __init__(self, expected_count):
        assert isinstance(expected_count, int), 'Provided count is not an int but %s' % type(expected_count)
        self.expected_count = expected_count

    def _matches(self, item):
        if not isinstance(item, DataFrame):
            return False
        return item.count() == self.expected_count

    def describe_to(self, description):
        description.append_text('Given DataFrame has count %d' % self.expected_count)

    def describe_mismatch(self, item, mismatch_description):
        if isinstance(item, DataFrame):
            mismatch_description.append_text('has count %d' % item.count())
        else:
            mismatch_description.append_text('%s is not a DataFrame' % type(item))


def has_count(expected_count):
    return DataFrameCountMatcher(expected_count)

Ce matcher est ensuite très facilement utilisable et produit une syntaxe claire :

class TestDataFrameCountMatcher(unittest.TestCase):

    def setUp(self):
        self.df = sql_context.createDataFrame([
            Row(key='value'),
            Row(key='value2')
        ])

    def test_has_the_right_count(self):
        assert_that(self.df, has_count(2))

Les tests sont l’affaire de tous

La prochaine fois qu’un collègue vous donnera une fausse bonne excuse pour ne pas écrire ses tests en Spark, rappelez-lui que les tests sont l’affaire de tous et qu’il est possible d’écrire des tests simples, maintenables et indépendants en utilisant ce framework.

Maintenant, même nos amis Data Scientists pourront écrire leurs tests unitaires !

Le prochain et dernier article de cette série traitera des tests de Spark en utilisant le Property Based Testing.

PROCHAIN ARTICLE, bientôt

Tester du code Spark avec du Property Based Testing

Sylvain Lequeux

Sylvain a 6 ans d’expérience en développement d’applicatif backend, principalement Java.
Depuis 2 ans, il s’investit autour des technologies BigData et a rejoint Xebia à ce titre en 2015. Il dispense la formation certifiante Cloudera Administrateur et est certifié Cloudera Developper.
Sylvain est également passionné par le craftsmanship.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *