Publié par

Il y a 4 semaines -

Temps de lecture 13 minutes

Spark Structured Streaming avec Kafka Schema Registry

Introduction :

L’idée de cet article est de brancher Spark Structured Streaming à Kafka pour consommer des messages en Avro dont le schéma est géré par le Schema Registry. L’objectif est de se dissocier de la déclaration manuelle du schéma de données côté consommateur. Ceci pourrait être utile pour découpler la production de la consommation comme proposé par Confluent ou simplement pour développer des outils génériques de traitement de données (historisation, ingestion…), là ou parfois, la logique de notre application est agnostique par rapport au schéma de données.

Seules quelques notions de base de Spark et de Kafka sont nécessaires pour suivre nos exemples, nous prendrons le temps de détailler les notions avancées. Nous allons présenter brièvement Spark Structured Streaming, Avro et le Schema Registry, nous passerons ensuite à un exemple de code simple de consommation de données Kafka avec un schéma statique, pour enfin finir avec un exemple de code plus complexe où nous nous appuierons sur le Schema Registry pour la définition du schéma.

L’ensemble fonctionnel du code est dans le repo github de Xebia.

Corps du billet :

Spark Structured Streaming

Spark Structured Streaming est le plus récent des moteurs distribués de traitement de streams sous Spark. Il se base sur Spark SQL et est destiné à remplacer Spark Streaming. Il permet d’exprimer des traitements sur des données en stream de la même manière que pour des données statiques. Il est hautement optimisé, assurant un traitement en exactly-once et un mode de traitement à très faible latence depuis la version 2.3 (pouvant atteindre 1 milliseconde).

Apache Avro

Les schémas gérés dans le Schema Registry sont au format Avro. Maintenu par la fondation Apache, ce format de sérialisation s’est imposé comme étant l’un des principaux formats binaires, surtout dans le monde de la Data.

Il est non seulement riche, rapide et compact, mais permet aussi de gérer l’évolution du schéma des données.

Distribution Confluent de Kafka

Confluent a été fondée par des développeurs de Kafka qui ont travaillé ensemble chez Linkedin. Leur idée est de continuer à contribuer dans le développement de Kafka, mais aussi fournir un écosystème autour, une distribution entreprise, du conseil, du support et des formations.

Parmi les outils fournis par Confluent :

  • Control Center : une application web permettant l’administration du cluster Kafka
  • Rest Proxy : API REST permettant d’utiliser et de gérer le cluster Kafka
  • Schema Registry : API de sauvegarde, versionning et de gestion des schémas de données Avro
  • KSQL : un moteur de requêtage SQL-like de Kafka
  • Kafka Connect : un ensemble de connecteurs Kafka depuis et vers les plateformes les plus communes comme HDFS, Amazon S3, JDBC…

Dans cet article, nous nous intéressons particulièrement au Schema Registry.

Dressons la table

Plateforme Confluent

Assez de théorie, commençons par mettre en place l’environnement de développement.

Au début, installons la plateforme Confluent Open Source 5. Deux méthodes sont possibles :

  • Le mode Docker : cette solution est idéale si vous êtes sous Linux, cependant elle a encore certains problèmes sous Mac et Windows et elle est à éviter. Les étapes sont décrites sur le site de confluent.
  • Le mode Flat : télécharger une simple archive et la décompresser.

La suite de l’article se base sur cette méthode. Pour le lancement, il faut exécuter :

confluent-5.0.0/bin/confluent start

La réponse devra être de la forme :

A ce stade, les outils sont déployés en local, Kafka sur le port 9092 et le Schema Registry sur le port 8081.

Générer les données de test avec KSQL-Datagen

Essayons de produire des données en utilisant le générateur de données de tests inclus dans la plateforme :

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic users

Ces commandes permettront de générer des données à partir de modèles prédéfinis. Cependant, pour notre exemple, nous aimerions définir le modèle de données que nous souhaitons générer, chose possible avec KSQL-Datagen. Pour cela, il faudra :

  1. Définir le modèle de données en rédigeant le schéma de données. Pour notre exemple nous utiliserons ceci (dans le dossier /schemas) :

    transactions.avro
    {
            "namespace": "streams",
            "name": "transactions",
            "type": "record",
            "fields": [
                    {"name": "transactiontime", "type": {
                        "type": "long",
                        "format_as_time" : "unix_long",
                        "arg.properties": {
                           "iteration": { "start": 1, "step": 10}
                        }
                    }},
                    {"name": "transationid", "type": {
                        "type": "string",
                        "arg.properties": {
                            "regex": "transaction_[1-9][0-9][0-9]"
                        }
                    }},
                    {"name": "clientid", "type": {
                        "type": "string",
                        "arg.properties": {
                            "regex": "client_[1-9][0-9]?"
                        }
                    }},
                    {"name": "transactionamount", "type": {
                        "type": "string",
                        "arg.properties": {
                            "regex": "ad_[1-9][0-9]?"
                        }
                    }}
            ]
    }
  2. Lancer la génération de données en passant en paramètre le schéma

    ksql-datagen schema=/schemas/transactions.avro format=json topic=transactions_json key=transationid maxInterval=1000

Il est possible de générer des données de plusieurs types : CSV, JSON, Avro et Delimited avec KSQL-Datagen.

Création du projet

Il faut créer un projet Maven sur IntelliJ.

Nous allons opter pour Scala comme langage de développement, il faudra donc intégrer le plugin Scala de Maven.

Les dépendances suivantes sont nécessaires :

  • spark-sql : Spark SQL, c’est la base
  • spark-sql-kafka : nécessaire pour l’intégration avec Kafka
  • spark-avro de Databricks : sera utilisée dans la deuxième partie pour utiliser le schéma provenant du Schema Registry
  • kafka-schema-registry de Confluent : sera utilisée dans la deuxième partie pour communiquer avec le Schema Registry
  • kafka-avro-serializer de Confluent : sera utilisée dans la deuxième partie pour déserializer les messages Avro en utilisant le schéma enregistré dans le Schema Registry

A noter que les deux dernières dépendances nécessitent le rajout du repository Maven de Confluent en ajoutant dans le POM principal :

<project...>
...
    <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
    </repositories>
...
</project>

 

Mon premier Consumer

Vous attendez sûrement ce moment, faisons un peu de Spark Structured Streaming. D’abord, l’ouverture :

Consumer.scala
import org.apache.spark.sql.SparkSession

object SimpleConsumer {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("SimpleConsumer")
      .master("local[*]")
      .getOrCreate()
  }
}

Ensuite, nous créons une source streaming Kafka (readStream).

Consumer.scala
...
object SimpleConsumer {
  private val topic = "transactions_json"
  private val kafkaUrl = "http://localhost:9092"

  def main(args: Array[String]): Unit = {
...
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaUrl)
      .option("subscribe", topic)
      .load()
  }
}

Finalement, nous allons afficher les résultats sur la console.

À noter que Spark Structured Streaming dispose d’un ensemble de Sources et de Sinks fournis out-of-the-box en exactly-one.

Consumer.scala
...
object SimpleConsumer {
  private val topic = "users"
  private val kafkaUrl = "http://localhost:9092"

  def main(args: Array[String]): Unit = {
...

    df
      .writeStream
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()
  }
}

L’option truncate est mise à false afin de pouvoir visualiser l’ensemble des messages reçus.

En lançant le Job Spark Structured Streaming et par la suite la génération des messages « transactions » en JSON avec KSQL-Datagen, nous obtenons un résultat illisible de la forme :

Batch: 6
-------------------------------------------
+--------------------+--------------------+------------+---------+------+--------------------+-------------+
|                 key|               value|       topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+------------+---------+------+--------------------+-------------+
|[74 72 61 6E 73 6...|[7B 22 74 72 61 6...|transactions|        0| 35184|2018-09-03 19:14:...|            0|
+--------------------+--------------------+------------+---------+------+--------------------+-------------+

Les différents champs sont les champs du message Kafka, remarquons que les champs key et value sont considérés comme binaires.

Pour notre exemple, nous nous intéresserons seulement au champ value qui contient le corps du message (la « transaction »). Nous allons essayer de l’extraire et le formater en table.

Au début convertissons-le en String :

Consumer.scala
...
val df = spark
.readStream...
.load()
.selectExpr("CAST(value AS STRING)")

En relançant, les messages sont maintenant lisibles, cependant c’est une table qui ne contient qu’une seule colonne: « value ». Nous souhaitons avoir une colonne par champ de « transactions » afin de pouvoir effectuer nos traitements de données.

Batch: 5
-------------------------------------------
+--------------------------------------------------------------------------------------------------------------------+
|value                                                                                                               |
+--------------------------------------------------------------------------------------------------------------------+
|{"transactiontime":1535995272507,"transationid":"transaction_574","clientid":"client_6","transactionamount":"ad_86"}|
|{"transactiontime":1535995272579,"transationid":"transaction_374","clientid":"client_9","transactionamount":"ad_29"}|
|{"transactiontime":1535995272580,"transationid":"transaction_892","clientid":"client_1","transactionamount":"ad_97"}|
+--------------------------------------------------------------------------------------------------------------------+

La solution est la suivante :

Consumer.scala
...
  .selectExpr("CAST(value AS STRING) as message")

val schema = StructType(
  Seq(
    StructField("transactiontime", LongType, true),
    StructField("transationid", StringType, true),
    StructField("clientid", StringType, true),
    StructField("transactionamount", StringType, true)
  )
)

import org.apache.spark.sql.functions._

val formatted = df.select(
  from_json(col("message"), schema).alias("parsed_value"))
  .select("parsed_value.*")
...

Ici, nous avons défini le schéma de données que nous allons recevoir en tant que schéma Spark.

Ensuite, nous avons parsé les données du JSON entrant en colonnes en utilisant la méthode from_json(). Nous sommes finalement passés au niveau N-1 pour avoir les colonnes sur les messages à plat.

Ceci permet d’avoir les données formatées et bien organisées comme suit :

Batch: 9
-------------------------------------------
+---------------+---------------+---------+-----------------+
|transactiontime|transationid   |clientid |transactionamount|
+---------------+---------------+---------+-----------------+
|1535996750327  |transaction_546|client_44|ad_65            |
|1535996750334  |transaction_178|client_24|ad_41            |
|1535996750403  |transaction_264|client_8 |ad_77            |
|1535996750453  |transaction_297|client_2 |ad_37            |
+---------------+---------------+---------+-----------------+

Chérie, ramène le Schema Registry

L’exemple précédent permet de consommer des messages au format JSON d’un cluster Kafka.

Ceci n’est plus valable dans le cas de messages en binaire. Il faudra dans ce cas un désérialiseur de données. Ceci est la cas avec la platforme Confluent quand on utilise le Schema Registry. Les messages sont encodés en Avro et il n’existe pas de moyen direct au moment de la rédaction de cet article pour faire la désérialisation automatique.

 

Note : avec l’ancienne version de Spark Streaming, cette tâche aurait été simple, il suffisait de définir un désérialiseur Kafka Confluent lors de la création de la source Kafka avec :

val kafkaProps = new Properties()
...
kafkaProps.put(ProducerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getCanonicalName)
kafkaProps.put("schema.registry.url", "http://localhost:8081")
...

Le problème est qu’une telle solution n’existe pas en Structured Streaming.

 

Pour générer les données en Avro il faut :

ksql-datagen schema=/schemas/transactions.avro format=avro topic=transactions_avro key=transationid maxInterval=1000

Notre Job Spark de départ sera le suivant :

object AvroConsumer {
  private val topic = "transactions"
  private val kafkaUrl = "http://localhost:9092"

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("ConfluentConsumer")
      .master("local[*]")
      .getOrCreate()

    val kafkaDataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaUrl)
      .option("subscribe", topic)
      .load()
      .selectExpr("CAST(value AS STRING) as message")

    val schema = StructType(
      Seq(
        StructField("transactiontime", LongType, true),
        StructField("transationid", StringType, true),
        StructField("clientid", StringType, true),
        StructField("transactionamount", StringType, true)
      )
    )

    import org.apache.spark.sql.functions._

    val formattedDataFrame = valueDataFrame.select(
      from_json(col("message"), sparkSchema.).alias("parsed_value"))
      .select("parsed_value.*")

    formattedDataFrame
      .writeStream
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()
  }
}

En lançant le programme, nous obtenons le résultat suivant :

Batch: 14
-------------------------------------------
+---------------+------------+--------+-----------------+
|transactiontime|transationid|clientid|transactionamount|
+---------------+------------+--------+-----------------+
|null           |null        |null    |null             |
|null           |null        |null    |null             |
|null           |null        |null    |null             |
|null           |null        |null    |null             |
|null           |null        |null    |null             |
+---------------+------------+--------+-----------------+

Ceci est dû au fait que les messages n’ont pas pu être désérialisés.

Pour y arriver, il faut mettre en place au début le déserialiseur :

...
  private val schemaRegistryUrl = "http://localhost:8081"

  private val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
  private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val genericRecord = this.deserialize(bytes).asInstanceOf[GenericRecord]
      genericRecord.toString
    }
  }

  object DeserializerWrapper {
    val deserializer = kafkaAvroDeserializer
  }
...

Ici, nous avons :

  • défini l’URL du Schema Registry ;
  • défini le client du Schema Regisrty, il reçoit en paramètre l’URL et la capacité du cache (issu de Confluent) ;
  • défini la classe AvroDeserializer, elle hérite de AbstractKafkaAvroDeserializer (issu de Confluent) pour transformer le binaire reçu (Array[Byte]) en GenericRecord de Avro ;
  • le GenericRecord est ensuite transformé en un JSON String et retourné ;
  • nous avons défini le kafkaAvroDeserializer qui est de type AvroDeserializer et qui reçoit en paramètre le client cache Schema Registry ;
  • finalement, nous avons crée un Wrapper du désérialiseur.

Il nous reste maintenant à utiliser tout ceci :

...
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      DeserializerWrapper.deserializer.deserialize(bytes)
    )

    val kafkaDataFrame = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaUrl)
      .option("subscribe", topic)
      .load()

    val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")
...

Ici, nous définissons une User Defined Function (qui appelle le déserialiseur défini au-dessus) permettant la désérialisation, et nous l’enregistrons dans le contexte Spark.

Nous avons supprimé la ligne de conversion en String juste après le load().

Nous l’appelons à la dernière ligne pour déserialiser les messages et nous renvoyer les données en tant que String (comme défini dans la méthode deserialize de notre classe AvroDeserializer).

Enfin, pour rendre notre code générique et fonctionnel avec n’importe quel schéma de données, il faut enlever le schéma Spark définit en dur, et bénéficier des fonctionnalités de Schema Registry pour récupérer le schéma Avro au lancement et le transformer en schéma Spark :

...
  private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
  private var sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
...
  def main(args: Array[String]): Unit = {
...
    val formattedDataFrame = kafkaDataFrame.select(
      from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
      .select("parsed_value.*")
...

En lançant le programme, nous obtenons le résultat attendu :

Batch: 6
-------------------------------------------
+---------------+---------------+---------+-----------------+
|transactiontime|transationid   |clientid |transactionamount|
+---------------+---------------+---------+-----------------+
|1536000463509  |transaction_971|client_91|ad_87            |
|1536000463556  |transaction_309|client_22|ad_44            |
|1536000463626  |transaction_325|client_33|ad_40            |
+---------------+---------------+---------+-----------------+

 

Conclusion

Spark Structured Streaming est un outil puissant de calcul distribué qui permet de gérer les streams de données comme données statiques grâce aux API Dataset et Dataframe.

Nous avons réussi à l’intégrer à Kafka pour consommer des données en JSON, mais aussi à consommer des données Avro en utilisant le Schema Registry de Confluent comme repository de schémas, pour avoir finalement des données structurées en colonnes Spark faciles à manipuler.

L’exemple est resté simpliste mais on peut aller plus loin avec l’évolution du schéma Avro, l’architecture du code et les tests.

 

Publié par

Commentaire

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Nous recrutons

Être un Xebian, c'est faire partie d'un groupe de passionnés ; C'est l'opportunité de travailler et de partager avec des pairs parmi les plus talentueux.