Publié par

Il y a 2 semaines -

Temps de lecture 14 minutes

Kafka-Streams : une voie vers l’autoscaling avec Kubernetes

Kafka-Streams est la bibliothèque de stream processing associée à la streaming platform Apache Kafka. Kafka vient donc en plusieurs parties avec notamment : une partie persistance (core) qui donne la faculté de publier et consommer des messages, puis une partie traitement (streams) qui rend possible le traitement à la volée de ces messages. Kafka-Streams a pour point fort d’être une simple bibliothèque, une dépendance Java / Scala qui peut être incluse dans des projets sans perturber les processus d’ingénierie logicielle en place. Il est donc possible de livrer ces projets sans changer nos habitudes et profiter du pouvoir du traitement temps réel.

Cet article, lui, prend un autre point de vue et montre comment quelques pratiques simples (conteneurisation, orchestration) nous permettent d’exploiter un peu plus les capacités de nos applications Kafka-Streams. Parmi les promesses du framework, il y a celle d’être hautement scalable. Avec les outils qui se dégagent de la tendance cloud native nous allons même essayer d’aller plus loin, d’être auto-scalables. À la fin de cet article vous aurez une idée de comment assurer le passage à l’échelle de vos applications Kafka-Streams uniquement à partir d’outils existants.

tl;dr

Cet article a été écrit à la suite de la première représentation du talk « Scale in / Scale out with Kafka-Streams and Kubernetes«  donné en novembre 2018 à la XebiCon. Il traite d’une expérience dont le code se trouve dans le dépôt GitHub xke-kingof-scaling. Ce repo contient les manifestes YAML utilisés en exemple dans cet article.

Record-lag, le coeur du problème

Pourquoi, et dans quels cas, voudrions-nous avoir un comportement élastique des applications Kafka-Streams en particulier ? Dans cette première partie, on s’intéresse à ces applications de stream processing et à leur lien avec Apache Kafka pour répondre à cette question.

Consommation des messages

Les clients Kafka, producteurs et consommateurs, sont définis au sein même du projet Apache Kafka et établissent certaines règles utilisées pour traiter les évènements du bus de messages. Parmi ces idées il y a le polling des consommateurs. Une fois qu’un consommateur a souscrit à un topic, il peut alors tirer un certain nombre de messages (limité par les configurations fetch.max.bytes et max.poll.records), les traiter et committer leurs offsets pour les indiquer comme traités. Voilà pour les clients bas niveau. Pour Kafka-Streams, les débits de Kafka sont si importants et les polls sont réalisés à des intervalles si rapprochés que l’on atteint cette notion de traitement en temps réel.

Alors quel va être le problème ici ?

Pour plusieurs raisons, une application peut prendre du retard. L’écart entre l’offset du message en cours de traitement et le dernier posé dans la file s’agrandit. On dit alors que le groupe de consommation présente un lag important. Cette notion est capitale pour une application temps réel et a des conséquences importantes :

  • Elle ne valide plus le besoin, puisqu’elle réagit trop tard aux événements ;
  • Sous cette pression des messages non traités, cette application peut être sujette à des crashes

Quelles peuvent être les raisons de ces retards ?

Ce problème a quelques causes bien connues:

  • Une reprise sur erreur. Les différents producteurs de données ont alors le temps de construire ce lag.
  • Une transformation qui implique la construction d’un état interne toujours plus grand (corrélation ou enrichissement sur de longues périodes).
  • Où tout simplement à cause un pic de fréquentation d’un service.
  • etc …

La maxime « diviser pour mieux régner » est largement utilisée en Big/Fast Data et, là encore, c’est en jouant sur le parallélisme que Kafka-Streams va nous permettre de résoudre ce problème.

Le consumer protocol

Lorsqu’une seule instance de notre application de streaming tourne, elle se voit attribuer l’ensemble des partitions qui composent les input topics.

Rappel : les messages sont attribués aux partitions en fonction de leur clé. Un hash est déterminé en fonction de la clé et du nombre de partitions. Les messages de même hash sont alors attribués à la même partition.

Si une deuxième instance est lancée avec le même identifiant d’application, elle déclenche alors un partition rebalance qui redistribue les partitions aux deux instances. Cela allège déjà notre première instance et accélère le rattrapage de son retard. Enfin, le parallélisme est limité par le nombre de partitions. Soit N le nombre de partitions d’un topic d’entrée. Lorsque N instances de notre application sont lancées, elles se chargent chacune d’une partition (dans le cas idéal). Une application supplémentaire (n° N+1) serait donc au repos. L’ensemble de nos instances forment un groupe de consommation et le retard de ce groupe sur un topic est la somme du retard de chaque instance.

Pour réaliser une expérience capable de mettre en avant l’accumulation du retard sur nos applications, on proposera un cas d’usage fictif.

Figure 1 : groupe de consommation Kafka-Streams

Kubernetes et le support des custom metrics

Kubernetes est un orchestrateur de conteneurs. Le sujet est vaste, mais dans le cadre de notre problématique, nous pouvons nous servir de la description suivante : il s’agit d’un cluster manager composé de nodes et d’un master. Le master a la connaissance des ressources sur le cluster, tels que les CPU, la mémoire, les disques des machines et le réseau qui les relie. Son seul objectif est de maintenir le nombre de déploiements demandés pour vos applications conteneurisées.

Avec cette description, Kubernetes semble tout indiqué pour résoudre notre problème. L’idée d’auto-scaling y est présente depuis longtemps et se base sur l’usage CPU ou la consommation mémoire. Mais nous avons affaire à une application spéciale, puisqu’il s’agit d’une application de streaming. C’est donc sur le retard décrit dans le chapitre précédent que nous aimerions baser le scaling de notre application.

Et justement, Kubernetes a peut-être quelque chose sous le coude pour nous…

Depuis sa version 1.6, Kubernetes permet de récolter et d’utiliser des custom metrics. Des métriques personnalisées, potentiellement porteuses d’informations métier, que vous voudriez exposer vous-même dans un objectif de monitoring. Mais plutôt que d’inventer des métriques, nous allons ici faire le lien avec les informations exposées par les clients Kafka via JMX.

Une proposition d’implémentation peut être la suivante :

Figure 2 : Repport des metrics JMX à Kubernetes via Stackdriver

Les métriques de notre application sont exposées au format Prometheus. Ces informations sont alors récupérées par le scraper prometheus-to-sd qui va émettre les informations techniques de lag dans Stackdriver. Le retard est alors disponible pour son affichage, le monitoring / l’alerting et, surtout, il est sauvegardé pour alimenter le Metric Server custom-metrics-stackdriver-adapter qui mettra à disposition ces métriques au master de notre cluster.

Exposition des métriques JMX au format Prometheus

Prometheus est un logiciel de monitoring / alerting open source et un des premiers projets membres de la CNCF. Bien connu dans le monde DevOps, il définit un format d’exposition de métriques qui est de plus en plus utilisé. Ce format sera incontournable par la suite et nous utiliserons donc le projet jmx-exporter pour afficher les Mbeans de cette manière.

Les paramètres suivants sont ajoutés à la JVM de notre application de streaming :

-Djava.rmi.server.hostname=127.0.0.1 
-Djava.rmi.server.port=7071
-javaagent:/<>/jmx_prometheus_<version>.jar=9001:/<>/config.yaml

Cela nous permet d’exposer les métriques JMX sur le port 7071 et leur conversion au format Prometheus sur le port 9001. Le fichier config.yaml décrit les métriques à exporter.

config.yaml
global:
rules:
pattern:"kafka.consumer<type=consumer-fetch-manager-metrics,client-id=(.*),topic=GAME-FRAME-RS, partition=(.*)><>records-lag:(.*)"
   labels: { client: $1, partition: $2, topic: GAME-FRAME-RS, metric: records-lag }
   name: "consumer_lag_game_frame_rs"
   type: GAUGE

En d’autres termes, nous exportons ici parmi les métriques kafka.consumer de types consumer-fetch-manager-metrics l’information records-lag qui concerne le topic GAME-FRAME-RS, pour toutes les partitions et pour tous les client-id. On donne un nom à cette métrique et indique qu’il s’agit d’une GAUGE (obligatoire pour stackdriver). Nous répéterons l’opération pour tous les input topics. (Le fichier complet)

Construction d’une image Docker de notre streaming app

En développement, les métriques peuvent maintenant être exposées en HTTP.

Figure 3 : présentation des métriques JMX au format Prometheus

Mais le tout devra être packagé pour être également applicable dans une image docker, prête à être orchestrée par Kubernetes. L’outil de build choisi ici est Gradle, auquel on ajoute un plugin docker :

build.gradle
plugins { id 'com.palantir.docker' version '0.20.1' }
apply plugin: 'com.palantir.docker'
mainClassName = 'fr.xebia.ldi.stream.Main'
docker {
   tags version
   dockerfile file('docker/Dockerfile')
   name 'gcr.io/cloud-fighter-101/kos-streaming-app'
}

En très peu de configuration, nous sommes capables de déclarer les éléments suivants :

  • Le DockerFile à builder ;
  • Le point d’entrée de notre streaming app (class Main) ;
  • Le nom, la version et le repository sur laquelle uploader l’image.

Au sein du conteneur, nous ajouterons les fichiers suivants :

$ tree -l 3
#.
#└── opt
#    └── kos-stream
#        ├── config.yaml
#        └── jmx_prometheus_javaagent-0.3.1.jar

 

Remontée du lag des consommateurs à Stackdriver

C’est maintenant notre conteneur qui expose les métriques sur un port HTTP (accessible dans un même namespace Kubernetes). On cherche maintenant un outil de monitoring pour les persister et les visualiser. Notre choix va se porter sur Stackdriver. Ce choix est favorisé par les outils développés dans le projet : k8s-stackdriver. En effet on va y trouver des applications toutes faites pour faire le pont entre notre streaming-app et Stackdriver. Dans un même déploiement, notre conteneur sera accompagné d’une instance de l’application prometheus-to-sd dont le but est d’accéder régulièrement au port qui expose les métriques JMX pour les pousser à Stackdriver. Ce scraper est positionné à côté de notre application comme un side-car.

deployment.yaml
- name: prometheus-to-sd
 image: gcr.io/google-containers/prometheus-to-sd:v0.2.6
 command:
 - /monitor
 - --source=:http://localhost:9001
 - --stackdriver-prefix=custom.googleapis.com
 - --pod-id=$(POD_ID)
 - --namespace-id=$(POD_NAMESPACE)

On retrouve le port 9001 dans la source (désigné dans les paramètres JVM). Le simple ajout de ce side-car va nous permettre de créer les dashboards suivants.

Figure 4 : Affichage des métrics de lag, cpu et mémoire de l’application Kafka-Streams

Mise en place du MetricServer

Ces métriques sont persistées et alimentent de jolis dashboards. Mais le master Kubernetes doit maintenant en être informé. En effet lorsque l’on sonde les l’API on obtient :

$ kubectl get --raw "/apis/custom.metrics.k8s.io/v1beta1" | jq
# Error from server (NotFound): the server could not find the requested resource

Le résultat ne contient pas encore les custom metrics qui nous intéressent.

L’application custom-metrics-stackdriver-adapter est précisément là pour enrichir l’API Kubernetes avec des informations personnalisées. Cette application implémente les specs de la custom metrics API et l’external metric API et se sert de Stackdriver comme backend. Elle est pensée pour l’élasticité des applications monitorées par Stackdriver. Nous aurions pu choisir une autre implémentation, voire la refaire soi-même si Stackdriver n’était pas dans la boucle.

Pour la déployer, le simple usage de la commande suivante est nécessaire :

kubectl create -f \
https://raw.githubusercontent.com/GoogleCloudPlatform/k8s-stackdriver/master/custom-metrics-stackdriver-adapter/deploy/production/adapter.yaml

Figure 5 : Affichage du pod contenant le metric server

$ kubectl get --raw "/apis/custom.metrics.k8s.io/v1beta1" | jq
#{
#  "name": "*/custom.googleapis.com|consumer_lag_game_frame_rs",
#  ...
#  "name": "*/custom.googleapis.com|consumer_lead_game_frame_rq",
#  ...
#}

Configuration de Horizontal Pod Autoscaler

Pour reprendre la cinématique complète au sein de notre cluster Kubernetes, nous avons le schéma suivant :

Figure 6 : Repport des metrics JMX à Kubernetes via Stackdriver

Notre application de stream processing expose sur un endpoint accessible depuis son pod depuis ses métriques JMX au format prometheus. Elle est accompagnée d’un side-car, qui scrape et exporte régulièrement ces informations à Stackdriver. Ce dernier sert de backend à notre metric server. Le Master est donc en possession de toutes les informations nécessaires pour décider d’ajouter (ou pas) un pod à notre déploiement. C’est cette idée de prise de décision qui va nous intéresser pour finir. Comme pour un autoscaling classique, c’est un HPA (Horizontal Pod Autoscaler) qui portera cette responsabilité.

Sa configuration est la suivante :

hpa.yaml
apiVersion: autoscaling/v2beta1
kind: HorizontalPodAutoscaler
spec:
 scaleTargetRef:
   apiVersion: apps/v1
   kind: Deployment
   name: kstreams
 minReplicas: 1
 maxReplicas: 4
 metrics:
  - type: Pods
   pods:
     metricName: custom.googleapis.com|consumer_lag_game_frame_rq
     targetAverageValue: 100000
  // - other metrics

On notera l’usage de la version v2beta1 de l’api autoscaling. Le nombre maximal de replicas est à 4 (le nombre de partitions des inputs topics de l’expérience). Le nom de la métrique ciblée est custom.googleapis.com suivi du nom attribué dans le fichier de configuration de jmx-exporter.

Conclusion

Après la génération d’une quantité importante de données, le nombre de messages en retard finit par dépasser les seuils spécifiés dans le HPA. Des pods supplémentaires sont donc ajoutés. Ils contiennent chacun une instance de notre application de streaming et un side-car prometheus-to-sd. On peut constater la chute du lag liée aux input topics puisqu’il est réparti sur différentes instances. Ces instances, puisqu’elles supportent une charge réduite, sont plus susceptibles de rattraper le retard accumulé. Les courbes qui apparaissent en milieu de graphe correspondent au métriques émises par les nouvelles applications.

Figure 7 : Repport des metrics JMX à Kubernetes via Stackdriver

Nous avons vu comment, via des outils existants comme Stackdriver et Kubernetes, il est simple d’augmenter le pouvoir de nos applications kafka-streams. Nous avons donc abordé la question « could we? ». La partie qui n’a pas été traitée ici c’est « should we? ». Le scaling d’application a des avantages évidents qui ont été listés en début d’article. Mais il n’est pas aussi efficace pour toutes les applications de streaming. Le passage à l’échelle a un coût, et pour certains cas d’usage ou traitements, ce coût peut être non négligeable. La migration des états internes est une des premières problématiques. Il faut donc s’intéresser aux StateFullSets, une alternative aux déploiements Kubernetes qui permettent un stockage persistant, et quantifier ce coût pour un cas d’usage précis.

Where to GO Next

Pour terminer, voici une liste des liens qui m’ont aidé à réaliser cette expérience :

 

 

Publié par

Publié par Loic Divad

Loïc est Data Engineer chez Xebia. Il intervient sur des problématiques liées au Big Data comme l’acquisition, le traitement et le stockage des données. Il travaille avec des outils comme Scala, Spark et Kafka.

Commentaire

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Nous recrutons

Être un Xebian, c'est faire partie d'un groupe de passionnés ; C'est l'opportunité de travailler et de partager avec des pairs parmi les plus talentueux.