Il y a 5 années · 7 minutes · Data

Tester vos jobs MapReduce avec MRUnit

Les tests unitaires appartiennent aux bonnes pratiques du génie logiciel car ils permettent de détecter un certain nombre de regressions. Tester unitairement des jobs Hadoop MapReduce est bien sur possible mais, à cause de l’API, cela reste un exercice très verbeux et demandant un temps non négligeable pour obtenir des tests compréhensibles. MRUnit a été créé pour vous simplifier la vie. Actuellement en version 0.9, il est récemment devenu un top level project d’Apache et a pour avantage d’ être compatible avec les versions 0.20 , 0.23.x , 1.0.x d’Hadoop. Dans cet article, nous allons vous montrer son utilisation dans le cadre du HelloWord d’Hadoop : le WordCount.

La logique métier d’un job se situe principalement dans le mapper et le reducer. Dans un premier temps, nous allons les tester un par un, en isolation. Puis nous testerons le job dans son intégralité : mapper, combiner et reducer.

Tester le mapper

Le mapper est responsable, pour ce job, de detecter dans une ligne de texte les occurrences des mots et d’émettre, pour ceux jugés interessants, leur découverte.

WordCountMapper.java

public final class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final IntWritable ONE = new IntWritable(1);
    private List<String> ignoredWords = newArraylist();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        String ignoredWordsFromConf = context.getConfiguration().get(KEY_IGNORED_WORDS);
        if (!isNullOrEmpty(ignoredWordsFromConf)) {
            ignoredWords = newArrayList(ignoredWordsFromConf.split(";"));
        }
    }

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
        while (tokenizer.hasMoreTokens()) {
            String token = tokenizer.nextToken();
            if (!ignoredWords.contains(token)) {
                context.write(new Text(token), ONE);
            }
        }
    }
}

La classe centrale pour tester un mapper avec MRUnit est MapDriver que l’on initialise de la manière suivante :

private final MapDriver<LongWritable, Text, Text, IntWritable> wcMapDriver = MapDriver.newMapDriver(new WordCountMapper());

L’approche la plus succincte se fait par le biais de l’API ‘fluent’. Par un enchaînement de méthodes, on déclare les entrées puis les sorties attendues et l’appel de runTest() vérifie nos attentes. Ainsi, si on fournit au mapper le texte « word1 word2 », on s’attend à recevoir deux paires de clé/valeur (« word1 »,1) et (« word2 »,1).

Exemple runTest()

private static final IntWritable EXPECTED_COUNT = new IntWritable(1);

@Test
public void givenTextInput_shouldOutputEachWordAsKeyAndOneAsValue_assertionWithMrunitForOutput() throws Exception {
    wcMapDriver//
            .withInput(new LongWritable(1), new Text("word1 word2 "))//
            .withOutput(buildExpectedOutput(new Text("word1"), EXPECTED_COUNT))//
            .withOutput(buildExpectedOutput(new Text("word2"), EXPECTED_COUNT))//
            .runTest();
}

Par défaut, l’ordre des sorties est vérifié. Ainsi si les sorties sont inversées, le test échouera. Il est possible de changer ce comportement en utilisant runTest(false).

S’il est nécessaire de faire des vérifications plus approfondies des sorties, il est possible de récupérer les résultats par la méthode run() et la vérification de ces derniers sera alors laissée à la charge du développeur.

Exemple run()

@Test
public void givenTextInput_shouldOutputEachWordAsKeyAndOneAsValue_noAssertionWithMrunitForOutput() throws Exception {
    // When
    List<Pair<Text, IntWritable>> outputs = wcMapDriver.withInput(PAIR_INPUT).run();

    // Then
    assertThat(outputs).hasSize(2);

    Pair<Text, IntWritable> firstOutput = outputs.get(0);
    assertThat(firstOutput).isEqualTo(buildExpectedOutput(new Text("word1"), EXPECTED_COUNT));

    Pair<Text, IntWritable> secondOutput = outputs.get(1);
    assertThat(secondOutput).isEqualTo(buildExpectedOutput(new Text("word2"), EXPECTED_COUNT));
}

Tester le reducer

L’objectif de ce reducer est de compter pour chaque mot son nombre d’occurrences.

WordCountReducer.java

public final class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    static enum WordsLength {
        STARTS_WITH_DIGIT, STARTS_WITH_LETTER, ALL;
    };

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
            InterruptedException {

        if (Character.isDigit(key.toString().charAt(0))) {
            context.getCounter(WordsLength.STARTS_WITH_DIGIT).increment(1);
        } else {
            context.getCounter(WordsLength.STARTS_WITH_LETTER).increment(1);
        }
        context.getCounter(WordsLength.ALL).increment(1);

        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

Lorsque le reducer reçoit un mot associé à n occurrences distinctes, il émet pour cette clef le nombre total d’occurrences.

Exemple tests WordCountReducer

private final ReduceDriver<Text, IntWritable, Text, IntWritable> wcReduceDriver = ReduceDriver.newReduceDriver(new WordCountReducer());

@Test
public void givenKeyWithIntWritableList_shouldDoTheSum_assertionWithMrunitForOutput() throws Exception {
    Text firstKey = new Text("key1");
    wcReduceDriver//
            .withInput(firstKey, buildValues(5))//
            .withOutput(firstKey, new IntWritable(5))//
            .runTest();
}

@Test
public void givenKeyWithIntWritableList_shouldDoTheSum_noAssertionWithMrunitForOutput() throws Exception {
    // Given
    Text firstKey = new Text("key1");
    wcReduceDriver.withInput(firstKey, buildValues(5));

    // When
    List<Pair<Text, IntWritable>> outputs = wcReduceDriver.run();

    // Then
    Assertions.assertThat(outputs.get(0)).isEqualTo(new Pair<Text, IntWritable>(firstKey, new IntWritable(5)));
}

private List<IntWritable> buildValues(int nbValues) {
    List<IntWritable> values = newArrayList();
    while (nbValues-- != 0) {
        values.add(new IntWritable(1));
    }
    return values;
}

Vérification de counter

Hadoop permet d’utiliser des compteurs afin d’obtenir des métriques sur des opérations MapReduce. Pour notre exemple, nous créons 3 counter, un (STARTS_WITH_LETTER) pour comptabiliser les mots commençant par une lettre, un (STARTS_WITH_DIGIT) pour ceux commençant par un chiffre et le dernier (ALL) qui comptabilise tous les mots. Pour vérifier leurs valeurs, il suffit de faire appel à withCounter(Enum counter, long expectedValue). Cette méthode permet d’éviter de faire appel au contexte pour récupérer le counter et en vérifier sa valeur. Elle simplifie l’écriture du test et facilite sa lecture.

Exemple tests WordCountReducer

@Test
public void givenKey_shouldIncrementStartsWithLetterAndAllCountersByOne() throws Exception {
    Text firstKey = new Text("key1");
    wcReduceDriver//
            .withInput(firstKey, buildValues(5))//
            .withOutput(firstKey, new IntWritable(5))//
            .withCounter(WordsLength.STARTS_WITH_LETTER, 1)//
            .withCounter(WordsLength.STARTS_WITH_DIGIT, 0)//
            .withCounter(WordsLength.ALL, 1)//
            .runTest();
}

Utilisation du contexte

Lors de l’implémentation d’un job,  le contexte peut prendre des paramètres permettant de modifier le comportement nominal (dans notre cas il s’agit du mapper). Pour illustrer ce cas, une liste de mots à ignorer (séparés par des ; ) peut être passé en paramètre du job via le contexte. Ces mots ne seront alors pas envoyés au reducer. 

Exemple test avec utilisation du contexte

@Test
public void givenTextInputWithIgnoredWordsInConf_shouldOutputEachWordIfNotIgnoredAsKeyAndOneAsValue()
            throws Exception {
    Configuration configuration = new Configuration();
    configuration.set(KEY_IGNORED_WORDS, ignoredWords);

    wcMapDriver.withConfiguration(configuration).withInput(new LongWritable(1), new Text("word1 word2 ")).runTest();
}

Cela nous permet alors de tester que le setup complet du mapper s’effectue correctement et que le comportement final est bien celui attendu.

Test du job MapReduce

Maintenant que nous venons de tester unitairement notre mapper ainsi que notre reducer, il peut être intéressant de tester l’enchaînement des deux phases.

Initialisation du MapReduceDriver

private final MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> wordCountMapReduce = MapReduceDriver.newMapReduceDriver();

Exemple test MapReduce

@Test
public void testMapReduce() {
    wordCountMapReduce//
            .withInput(new LongWritable(1), new Text("word1 word2 word2 word1"))//
            .withMapper(new WordCountMapper())//
            .withReducer(new WordCountReducer())//
            .withOutput(buildExpectedOutput(new Text("word1"), new IntWritable(2)))//
            .withOutput(buildExpectedOutput(new Text("word2"), new IntWritable(2)))//
            .runTest();
}

On précise le mapper et le reducer que l’on souhaite utiliser. Pour des raisons de performances, il peut être pertinent d’ajouter un combiner. On peut également vérifier que son utilisation ne modifie pas le résultat.

Exemple test MapReduce avec combiner

@Test
public void testMapReduceWithCombiner() {
    wordCountMapReduce//
            .withInput(new LongWritable(1), new Text("word1 word2 word2 word1"))//
            .withMapper(new WordCountMapper())//
            .withReducer(new WordCountReducer())//
            .withCombiner(new WordCountReducer())//
            .withOutput(buildExpectedOutput(new Text("word1"), new IntWritable(2)))//
            .withOutput(buildExpectedOutput(new Text("word2"), new IntWritable(2)))//
            .runTest();
}

En attendant la 1.0

MRUnit se révèle très utile à l’heure de tester unitairement des mappers et des reducers. Cependant il présente quelques lacunes comme la possibilité de tester des cas plus complexes, par exemple l’enchaînement de mappers (sauf pour l’ancienne API d’Hadoop avec le PipelineMapReduceDriver) ou l’utilisation de MultipleInputs/MultipleOutputs.

Le projet est actif et, du fait de son intérêt grandissant, de nombreuses demandes d’améliorations sont en cours dont notamment :

Pour finir, un travail de fond est en cours suite à cette jira, afin de pouvoir tester des cas plus complexes d’utilisation. En effet, le fonctionnement actuel de MRUnit est d’imiter le comportement d’Hadoop via des mocks au lieu de se reposer sur une implémentation concrète. La migration du framework permettra donc de ne plus avoir à implémenter des comportements fictifs et offrira des perspectives intéressantes. Affaire à suivre…

Lien utiles

Thomas Guerin
Consultant Java
Bertrand Dechoux
Consultant et Formateur Hadoop @BertrandDechoux

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *