Publié par
Il y a 4 semaines · 21 minutes · Data

Spark : comprendre et corriger l’exception Task not serializable

Dans tous les langages, le debugging peut parfois s’avérer une tâche fastidieuse. C’est d’autant plus le cas lorsque l’on utilise un framework distribué avec beaucoup de concepts complexes sous-jacents, comme Spark. Cet article propose de revenir sur l’une des erreurs les plus souvent rencontrées lors du développement d’applications avec Spark, et qui n’est pas toujours comprise : la redoutable – et surtout redoutée – Task not serializable.

Quand se produit cette erreur ?

Cette erreur apparaît le plus souvent lors du découpage du code en plusieurs classes « métier » pour respecter les recommandations du software craftsmanship. Malheureusement, cette exception, si elle est mal comprise, peut vite devenir un frein au refactoring et au respect des bonnes pratiques de développement. Pire, elle peut se solder par rassembler tout le code dans une même classe, voir une même fonction, ou d’abuser de @transient, sans vraiment analyser le fond du problème.

Pour aider les développeurs à mieux comprendre cette exception, depuis la version 1.3.0 de Spark, la serialization stack a été améliorée, pour trouver plus facilement la cause du problème. Lorsqu’une exception NotSerializableException est rencontrée, le debugger va désormais retrouver le chemin complet vers l’objet ne pouvant pas être sérialisé.

Cet article détaille le mécanisme interne de sérialisation de Spark, ainsi que les différentes solutions à cette exception, à travers le développement d’une application simple. Cela permettra aussi d’aborder de manière plus générale les bonnes pratiques à appliquer lors du développement avec Spark.

À la recherche de la Task not serializable

Le fil rouge de cet article est donc le développement d’une application développée en Scala 2.10 avec Spark 1.6.0. Cette application transforme le contenu d’un fichier et applique des transformations sur chacune des lignes.

Programme de base

Le programme de base est élémentaire. Il se contente de mettre chaque ligne d’un fichier en entrée en majuscules, et de sauvegarder le résultat sur disque.

object Main {
    def main(args: Array[String]) = {
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        sc.textFile(args.head).map(_.toUpperCase).saveAsTextFile(args(1))
        sc.stop()
    }
}

Cette application, une fois packagée, peut être lancée via une commande similaire à la suivante :

spark-submit --name myapp --master yarn --deploy-mode client --class Main my-application.jar input.txt output

Implémentation d’une nouvelle transformation

Une nouvelle transformation plus complexe va être ajoutée : en plus de mettre en majuscules chaque ligne, son contenu sera inversé et les voyelles remplacées par « ? ». L’idée ici est d’illustrer une transformation « non triviale ». L’implémentation d’une nouvelle transformation est aussi l’occasion de refactorer notre programme.

case class SparkProgram(inputPath: String, outputPath: String) {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    def transformUpperCase() = sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
    
    // our new transformation
    def transformUpperCaseReverseAndHideVowels() = sc.textFile(inputPath).map(line => {
        line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
    }).saveAsTextFile(outputPath)
}

object Main {
    def main(args: Array[String]) = {
        val program = SparkProgram(args.head, args(1))
        program.transformUpperCaseReverseAndHideVowels()
    }
}

En plus de la création de la nouvelle transformation, le code relatif à Spark a été séparé du Main. Il s’agit d’une bonne pratique à adopter pour la clarté du code. Le fait de séparer la partie qui utilise Spark est aussi recommandé pour séparer les responsabilités.

Jusqu’ici, le programme ci-dessus fonctionne. Mais la nouvelle transformation étant « plus complexe », notre âme de craftsman va nous pousser à extraire le cœur de la méthode de transformUpperCaseReverseAndHideVowels dans une fonction à part, pour pouvoir par exemple faire des tests unitaires sur celle-ci, sans forcément avoir besoin de lire un fichier en entrée (respect encore ici du principe de séparation des responsabilités). Une modification de la classe SparkProgram est donc nécessaire pour isoler la transformation « complexe » :

case class SparkProgram(inputPath: String, outputPath: String) { 
    val conf = new SparkConf()
    val sc = new SparkContext(conf)

    def transformUpperCase() = sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
    def transformUpperCaseReverseAndHideVowels() = sc.textFile(inputPath).map(upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)

    // our complex method extracted
    def upperCaseReverseAndHideVowels(line: String) = line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
}

Mais patatras ! Le lancement de notre programme produit alors l’erreur tant redoutée :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.map(RDD.scala:323)
    at fr.xebia.spark.serialization.SparkProgram.transformUpperCaseReverseAndHideVowels(Main.scala:17)
    at fr.xebia.spark.serialization.Main$.main(Main.scala:25)
    at fr.xebia.spark.serialization.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@1c9e07c6)
    - field (class: fr.xebia.spark.serialization.SparkProgram, name: conf, type: class org.apache.spark.SparkConf)
    - object (class fr.xebia.spark.serialization.SparkProgram, SparkProgram(Arguments(input.txt,output)))
    - field (class: fr.xebia.spark.serialization.SparkProgram$$anonfun$transformation2$1, name: $outer, type: class fr.xebia.spark.serialization.SparkProgram)
    - object (class fr.xebia.spark.serialization.SparkProgram$$anonfun$transformation2$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    ... 21 more

Comment le simple fait d’isoler le contenu d’une méthode a-t-il pu faire échouer notre job ? C’est ce que nous allons essayer de comprendre.

La stack trace est claire, c’est le SparkConf qui est coupable ici :

Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@1c9e07c6)
    - field (class: fr.xebia.spark.serialization.SparkProgram, name: conf, type: class org.apache.spark.SparkConf)

L’erreur indique que l’instance de type SparkConf ne peut pas être sérialisée.

Thinking Face on Google Android 7.1

Pourquoi Spark essaye-t-il de sérialiser l’instance de SparkConf ?

 

upperCaseReverseAndHideVowels. Pour que cet appel soit possible, Spark va devoir sérialiser cette méthode. En effet, le fonctionnement distribué de Spark nécessite de distribuer l’exécution du code sur différents executors. Petit rappel sur le mécanisme interne de Spark :

Lorsque l’on appelle la méthode transformUpperCaseReverseAndHideVowels, celle-ci va faire appel à

 

  • Spark va tout d’abord décomposer l’ensemble des opérations faites sur les RDD en tâches
  • avant d’exécuter ces tâches, Spark va calculer leur closure, c’est-à-dire l’ensemble des variables et des méthodes devant être visibles depuis l’executor pour pouvoir appliquer les transformations sur les RDD
  • la closure va ensuite être sérialisée, puis envoyée vers chacun des executors

Dans notre cas, la méthode upperCaseReverseAndHideVowels appartient à la closure et va donc devoir être sérialisée pour pouvoir être appelée sur chaque executor. Or, il est impossible de sérialiser une méthode seule : toute la classe SparkProgram va devoir être sérialisée. Ses attributs conf et sc seront par conséquent aussi sérialisés, d’où l’erreur.

Thinking Face on Google Android 7.1

Mais pourquoi notre code fonctionnait avant ?

Avant la dernière modification, la transformation était faite dans une fonction anonyme{ line => line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?") }. Dans ce cas, la classe SparkProgram, et par extension ses attributs, n’avaient donc pas besoin d’être sérialisés, puisque l’ensemble du code de cette fonction anonyme était executée sur chaque executor, sans dépendance avec l’extérieur de cette fonction.

Face With Open Mouth

Je sais, il suffit de déclarer SparkProgram sérialisable en ajoutant extends Serializable !

C’est commun de penser à cette solution, mais c’est une idée fausse. De toute façon ici, SparkProgram est une case class, qui par défaut est sérialisable. Le problème vient du fait que l’un des attributs de SparkProgramSparkConf en l’occurence, n’est pas sérialisable.

Les solutions

Pour corriger cette erreur, plusieurs solutions s’offrent à nous. Il existe deux catégories de solutions :

  • les rustines
  • les solutions craft

Les rustines

Ce ne sont pas de bonnes solutions d’un point de vue craft, mais elles seront tout de même présentées ici afin de détailler pourquoi leur utilisation peut être évitée.

CTRL+Z CTRL+Z CTRL+Z

Ce n’est pas une option ici :)

Séparer le contenu des méthodes est en effet une bonne solution de manière générale. Notre solution qui fonctionnait précédemment n’est pas acceptable d’un point de vue craft, surtout si l’on imagine avoir une multitude de transformations à implémenter.

Function literal

Pour éviter que SparkProgram et ses attributs ne soient sérialisés, un des moyens est que l’appel de la méthode de la transformation ne nécessite pas la sérialisation complète de la classe. Pour cela, il est possible de déclarer en tant que val la fonction upperCaseReverseAndHideVowels : c’est ce qu’on appelle une function literal :

def transformUpperCaseReverseAndHideVowels() = {
    sc.textFile(inputPath).map(upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)
}

val upperCaseReverseAndHideVowels = (line: String) => line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")

Thinking Face on Google Android 7.1

Pourquoi une fonction déclarée en tant que val au lieu de def fonctionne ?

upperCaseReverseAndHideVowels n’est plus une fonction à proprement parler, mais plutôt une variable de type Function1. Si on regarde le contenu de la case class à l’aide de javap (le décompileur de classe Java) on obtient la confirmation :

public scala.Function1<java.lang.String, java.lang.String> upperReverseAndReplaceVowels()

Or Function1 est bien sérialisable, comme le montre le test suivant facilement reproductible en console :

scala> val myFunction = (i:Int) => i
myFunction: Int => Int = <function1>

scala> myFunction.isInstanceOf[Serializable]
res0: Boolean = true

Thinking Face on Google Android 7.1

Pourquoi ne pas déclarer la function literal à l’intérieur de transformUpperCaseReverseAndHideVowels ?

C’est tout à fait possible : de cette manière, l’appel de la function literal ne nécessitera pas la sérialisation. Cependant, cela n’a pas d’intérêt car on ne pourra pas tester unitairement la méthode upperCaseReverseAndHideVowels.

Thinking Face on Google Android 7.1

Si la déclaration de la fonction upperCaseReverseAndHideVowels à l’intérieur de transformUpperCaseReverseAndHideVowels ne provoque pas l’erreur car ne nécessite pas la sérialisation de la classe, alors pourquoi ne pas simplement déclarer upperCaseReverseAndHideVowels comme une vraie fonction (avec def) ?

La déclaration d’une fonction avec def, que ce soit à l’intérieur ou à l’extérieur d’une autre fonction, déclare toujours une « vraie » fonction. En la déclarant à l’intérieur, on observe à l’aide de javap que la fonction public java.lang.String upperReverseAndHideVowels(java.lang.String); est toujours présente. L’appel de transformUpperCaseReverseAndHideVowels nécessitera donc toujours la sérialisation de la classe complète, bien que la fonction soit définie à l’intérieur d’elle.

Dans tous les cas, la déclaration d’une function literal ressemble plus à un contournement du problème. Il vaut donc mieux l’éviter.

Utilisation de @transient

Pour que conf et sc ne soient pas sérialisés, il est possible d’utiliser le mot-clé @transient. Toutes les variables déclarées de cette manière ne seront donc pas sérialisées.

Cette solution pourrait convenir; cependant, il est préférable d’éviter l’utilisation de @transient. Son utilisation n’est en soi pas mauvaise, mais résoudre toutes les exceptions Task Not Serializable par @transient peut révéler dans certains cas un code smell, comme par exemple une mauvaise organisation du code. C’est pour cette raison qu’il faut veiller à ne pas abuser de cette annotation, celle-ci pouvant de plus provoquer des erreurs difficiles à débugger en mode distribué (il ne faut pas oublier qu’une variable @transient prend une valeur par défaut lorsqu’elle est initialisée sur un executor). Un exemple de mauvaise utilisation de @transient sera présenté plus tard dans l’article.

Utiliser un singleton pour le SparkContext

Il est possible de séparer l’instanciation du SparkContext de SparkProgram sans pour autant passer le SparkContext de classe en classe. Pour cela, un singleton chargé d’instancier le SparkContext peut être créé. Le SparkContext instancié sera alors utilisé par toutes les méthodes ayant besoin d’accéder à un SparkContext.

// singleton for SparkContext
object Spark {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
}

case class SparkProgram(inputPath: String, outputPath: String) {
    def transformUpperCase() = Spark.sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
    def transformUpperCaseReverseAndHideVowels() = Spark.sc.textFile(inputPath).map(upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)
 
    def upperCaseReverseAndHideVowels(line: String): String = {
        line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
    }
}

object Main {
    def main(args: Array[String]) = {
        val program = SparkProgram(args.head, args(1))
        program.transformUpperCaseReverseAndHideVowels()
    }
}


L’avantage ici est qu’une seule instance du SparkContext sera utilisée par l’ensemble du programme. Le SparkContext sera instancié la première fois qu’une méthode appellera Spark.sc.

Cependant, l’inconvénient majeur, si ce n’est rédhibitoire, est que le code utilisant ce SparkContext ne pourra pas être testé facilement. En effet, comment remplacer le Spark.sc par un SparkContext de notre choix dans les tests ?

Les solutions craft

Elles sont opposées aux solutions dites « rustines » : ce sont en effet les solutions à recommander en priorité pour garder un code de qualité.

Séparer la méthode dans un object

Pour éviter la sérialisation du SparkContext, la méthode upperCaseReverseAndHideVowels peut être extraite dans un object à part (LineProcessor), qui se charge simplement d’opérer des transformations sur une ligne. Lorsque cette fonction sera appelée, c’est l’object LineProcessor qui sera sérialisé, et non le SparkProgram (dans lequel on trouve conf et sc).

object LineProcessor {
    def upperCaseReverseAndHideVowels(line: String): String =
        line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
}

case class SparkProgram(inputPath: String, outputPath: String) {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    def transformUpperCase() = sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
    def transformUpperCaseReverseAndHideVowels() = sc.textFile(inputPath).map(LineProcessor.upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)
}

Ici, il n’y a aucun souci pour sérialiser LineProcessor puisqu’il s’agit d’un object : par défaut, tous les attributs d’un object sont static, edonc implicitement transient. Cependant, notons que dans ce cas, il aurait aussi été tout à fait possible de définir LineProcessor comme une class (ou case class), puisque LineProcessor n’a aucun attribut (donc aucun attribut qui n’est pas sérialisable !). Un exemple avec un attribut non sérialisable est présenté plus tard dans l’article.

Cette solution fonctionne mais n’est pas encore idéale, car nous avons ici une classe SparkProgram qui définit nos transformations et qui est en charge d’instancier le SparkContext. L’objectif est de séparer ces deux rôles, afin de respecter le principe de séparation des responsabilités (encore lui !).

Déléguer l’instanciation du SparkContext

Si l’instanciation des attributs qui posent problème (SparkConf et SparkContext) se fait à l’extérieur de SparkProgram, ceux-ci ne seront donc plus sérialisés. Cependant, notre SparkProgram aura toujours besoin d’un SparkContext ! Pour l’utiliser de manière simple et transparente, on peut alors utiliser les paramètres implicites :

case class SparkProgram(inputPath: String, outputPath: String) {
    def transformUpperCase()(implicit sc: SparkContext) = sc.textFile(inputPath).map(_.toUpperCase).saveAsTextFile(outputPath)
    def transformUpperCaseReverseAndHideVowels()(implicit sc: SparkContext) = sc.textFile(inputPath).map(upperCaseReverseAndHideVowels).saveAsTextFile(outputPath)

    def upperCaseReverseAndHideVowels(line: String): String = {
        line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
    }
}

object Main {
    def main(args: Array[String]) = {
        val conf = new SparkConf()
        implicit val sc = new SparkContext(conf)
        val program = SparkProgram(args.head, args(1))
        program.transformUpperCaseReverseAndHideVowels()
        sc.stop()
    }
}

L’instanciation du SparkContext va être ici déléguée au Main, et son utilisation va se faire de manière implicite dans SparkProgram. Déclarer le SparkContext dans le Main n’est pas choquant ici, puisque tout notre programme se base sur Spark.

Thinking Face on Google Android 7.1

Pourquoi ne pas déclarer le paramètre implicit au niveau de la classe SparkProgram (au même titre que inputPath et outputPath) ?

C’est possible, mais cela implique qu’un SparkContext implicite doive exister dans le scope à l’instanciation d’un SparkProgram, et ce même pour utiliser des fonctions n’utilisant pas le SparkContext (upperCaseReverseAndHideVowels par exemple). Pour tester unitairement cette fonction, il faudra donc obligatoirement déclarer un SparkContext, qui ne sera finalement pas utilisé.

Thinking Face on Google Android 7.1

Y a-t-il une raison d’utiliser des implicits plutôt qu’un paramètre en dur spécifié à l’appel de la fonction ?

Non, ici les implicits servent juste à simplifier la lecture du code. C’est une pratique courante lorsqu’on utilise Spark de déclarer le SparkContext en implicit pour éviter de le passer dans toutes les fonctions. Attention cependant, dans le cas général, il faut utiliser les implicits avec parcimonie pour éviter de rendre difficile la compréhension du code.

Une solution possible

D’une manière générale, il vaut mieux minimiser l’adhérence avec Spark afin de pouvoir tester unitairement les fonctions ne nécessitant pas de SparkContext. De plus, il convient d’être cohérent dans tout le code pour garder un code lisible (si on choisit d’utiliser @transient – ce qui n’est pas recommandé cependant – alors il vaut mieux l’utiliser partout).

Une solution convenable pour notre problème, qui est un mix des deux solutions craft, consiste alors à :

  • déléguer la création du SparkContext
  • séparer les méthodes de transformation dans un objet à part
object LineProcessor {
    def upperCaseReverseAndHideVowels(line: String): String = {
        line.toUpperCase.reverse.replaceAll("[AEIOUY]", "?")
    }
}

case class SparkProgram(inputPath: String, outputPath: String)
                       (implicit sc: SparkContext) {
    def withInputAndOutput(f: String => String) = {
        sc.textFile(inputPath).map(f).saveAsTextFile(outputPath)
    }
    def transformUpperCase() = withInputAndOutput(_.toUpperCase)
    def transformUpperCaseReverseAndHideVowels() = withInputAndOutput(LineProcessor.upperCaseReverseAndHideVowels)
}
object Main {
    def main(args: Array[String]) = {
        val conf = new SparkConf()
        implicit val sc = new SparkContext(conf)
        val program = SparkProgram(args.head, args(1))
        program.transformUpperCaseReverseAndHideVowels()
        sc.stop()
   }
}

Pour simplifier encore plus le corps des méthodes de transformation, on utilise ici le loaner pattern (design pattern bien connu des utilisateurs de Scala), puisque la lecture/écriture sera utilisée par toutes les transformations. De plus, puisque toutes les méthodes de SparkProgram utilisent ici un SparkContext, le paramètre implicite a pu être déclaré directement au niveau de la case class.

Un autre cas de Task not serializable n’ayant pas pour origine le SparkContext

Nous allons maintenant voir une autre occurrence de Task Not Serializable, mais cette fois-ci n’ayant pas pour cause un composant de Spark.
Pour illustrer, une nouvelle transformation est implémentée, qui consiste à saler chaque ligne avec un préfixe aléatoire. Le but ici est d’introduire une classe avec un attribut d’un type non sérialisable. Ici, c’est Random qui va être utilisé (jusqu’en Scala 2.10, la classe Random n’était pas sérialisable).

Cet exemple avec Random est seulement pris pour illustrer notre propos; une autre classe non sérialisable aurait pu être choisie, par exemple une classe que nous aurions nous-même définie.

case class LineSalter(seed: Int) {
    val random = new scala.util.Random(seed)
    def salt(line: String) = random.nextString(10) + " " + line
}

case class SparkProgram(inputPath: String, outputPath: String)
                       (implicit sc: SparkContext) {
    def withInputAndOutput(f: String => String) = {
        sc.textFile(inputPath).map(f).saveAsTextFile(outputPath)
    }
    def transformUpperCase() = withInputAndOutput(_.toUpperCase)
    def transformUpperCaseReverseAndHideVowels() = withInputAndOutput(LineProcessor.upperCaseReverseAndHideVowels)
    def transformSaltLines() = {
        val salter = LineSalter(1234)
        withInputAndOutput(salter.salt)
    }
}

object Main {
    def main(args: Array[String]) = {
        val conf = new SparkConf()
        implicit val sc = new SparkContext(conf)
        val program = SparkProgram(args.head, args(1))
        program.transformSaltLines()
        sc.stop()
    }
}

Au lancement du programme, on obtient :

Serialization stack:
- object not serializable (class: scala.util.Random, value: scala.util.Random@468dda3e)
- field (class: fr.xebia.spark.serialization.LineSalter, name: random, type: class scala.util.Random)
- object (class fr.xebia.spark.serialization.LineSalter, LineSalter(1234))
- field (class: fr.xebia.spark.serialization.SparkProgram$$anonfun$transformation3$1, name: salter$1, type: class fr.xebia.spark.serialization.LineSalter)
- object (class fr.xebia.spark.serialization.SparkProgram$$anonfun$transformation3$1, <function1>)

Comme prévu, nous avons une erreur Task not serializable, causée par la sérialisation de l’instance de type scala.util.Random.

Face With Open Mouth

C’est le moment d’utiliser @transient pour la variable random !

Déclarer random @transient corrige bien l’exception Task Not Serializable. En effet, grâce à @transientrandom n’est pas sérialisé. Mais en contrepartie, lors de son instanciation sur un des executors, random va prendre sa valeur par défaut, qui est null. Donc notre exception Task Not Serializable va se transformer en NullPointerException lorsqu’une méthode va être appelée (nextString ici) !

Les solutions

Pour faire fonctionner notre code, plusieurs solutions s’offrent à nous.

Utiliser une fonction anonyme

Il est possible de déclarer l’instanciation du LineSalter dans une fonction anonyme.

def transformSaltLines() = withInputAndOutput { line => 
    LineSalter(1234).salt(line) 
}

Mais le sel généré sera toujours le même (même seed utilisé). De plus, dans le cas général, l’inconvénient de cette solution est qu’une instance de LineSalter sera créée pour chaque ligne, ce qui n’est pas performant (on peut imaginer un cas plus complexe où l’instanciation d’une classe serait beaucoup plus lourde).

Privilégier les object

Dans ce cas précis (avec Random), il serait possible d’appeler Random.nextString()qui est une méthode du companion object Random. Plus aucun problème de sérialisation n’apparaîtrait (grâce au fait que Random soit déclaré comme un object), mais il sera en contrepartie impossible de modifier la seed pour la génération de nos sels. De plus, cette solution fonctionne dans ce cas précis, mais n’est pas applicable dans le cas où le problème vient d’une classe différente de Random.

Dans le cas général, l’utilisation d’un object peut être préférée. Il est possible de définir un wrapper object autour d’une classe posant problème :

class MyClassNotSerializable(...) { 
    val notSerializableAttribute = ...
    def f() = ...
}

object WrapperMyClassNotSerializable {
    val notSerializableInstance = new MyClassNotSerializable(...)
    def f() = notSerializableInstance.f()
}

Cette solution n’est cependant pas très flexible si on souhaite disposer de n instances de MyClassNotSerializable, avec des paramètres différents par exemple. L’utilisation de n wrappers autour de chaque instance n’est pas optimal pour la compréhension du code.

Utiliser une lazy val

Déclarer random en lazy val permet de sérialiser la classe LineSalter sans erreur :

case class LineSalter(seed: Int) { 
    lazy val random = new scala.util.Random(seed)
    def salt(line: String) = random.nextString(10) + " " + line
}

La variable random ne sera alors instanciée qu’une fois, au premier appel de la variable, donc initialisée sur chaque executor. L’instance du LineSalter est initialisée sur le driver, mais pas la valeur de random.

Dans ce cas particulier, cette solution paraît la plus adaptée car seule elle permet d’instancier random avec une seed définie. Attention cependant à ne pas abuser de cette solution (si il est possible de faire autrement), pour ne pas réduire la lisibilité du code.

Conclusion

L’exception Task Not Serializable est par nature liée au fonctionnement de Spark et de son caractère distribué. La correction d’un bug provoqué par cette exception n’est pas toujours aisé lorsqu’il apparaît, mais cette exception peut être évitée au maximum tout au long des étapes de développement en isolant le plus possible les portions de code nécessitant Spark. En diminuant l’adhérence à Spark, on diminue le risque de rencontrer cette erreur, en plus de rendre le code plus lisible et de faciliter les tests unitaires.

Cependant, même en respectant ces recommandations, cette exception peut apparaître, à cause de classes externes à Spark et non sérialisables. Dans le cas des classes définies par l’utilisateur, il est assez simple de les rendre sérialisables; dans le cas de classes appartenant à des librairies non modifiables (comme scala.util.Random), l’utilisation des wrappers object est conseillée, lorsque cela est possible.

Une solution qui fonctionne dans tous les cas n’existe pas, mais une refonte du code peut parfois s’avérer nécessaire pour contourner ce problème : par exemple, transformer des variables de classes non sérialisables en paramètres de fonction (lorsque les variables ne sont pas utilisées par toutes les fonctions), ou bien créer de petits object, au lieu de class avec des attributs non sérialisables.

Autres références sur le sujet

Using non serializable objects in Apache Spark

Demystifying Spark serialization errors

Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *