Publié par

Il y a 11 ans -

Temps de lecture 6 minutes

fr.xebia.concurrent.CyclicLatch

….ou comment effectuer un traitement régulièrement

Avec l’arrivée de l’api java.util.concurrent dans le JDK 5, la programmation concurrente est à la portée de tous. Auparavant, il fallait :

  • soit être un expert des APIs de bas niveau et être prêt à passer des nuits blanches à mettre au point le système,
  • soit se tourner vers les serveurs d’applications J2EE et leur implémentation JMS et EJB Message Driven Bean. Dans ce cas-là, la lourdeur de l’API JMS et les contraintes de persistance et de transaction (par défaut) des serveurs JMS viendront mettre à mal au final l’utilisation de traitements parallèles et concurrents.

Dans le cadre d’un de mes projets, j’ai eu besoin d’implémenter le comportement suivant :

  • les producteurs, N Threads effectuent une tâche qui entre autres collecte des données.
  • le consommateur, 1 Thread collecte ces données pour les agréger.

Dans un premier temps, je me tourne vers une java.util.concurrent.BlockingQueue partagée entre les producteurs et le consommateur. Cette conception fonctionne jusqu’au moment où je me suis aperçu que

  1. le traitement effectué par mon consommateur pouvait être long et coûteux en ressources : il faut donc éviter d’effectuer le traitement au fil de l’eau et attendre d’avoir un certain nombre de données dans la file.
  2. les producteurs, suivant la vie de l’application, pouvaient à un instant donné être très nombreux (beaucoup de données en peu de temps) ou au contraire peu nombreux. Dans ce dernier cas, le remplissage de ma file prendrait des heures et je ne pouvais attendre cette condition pour lancer mon traitement.

Cahier des charges

Je me suis donc fixé le cahier des charges suivant :
Le consommateur doit vider l’ensemble de la queue et effectuer le traitement si :

  • au moins N éléments ont été déposés (cf 1.)
  • S secondes se sont écoulées depuis la dernière collecte (cf 2.)

J’ai commencé, bien sûr, par examiner l’API java.util.concurrent. Deux éléments de synchronisation ont attiré mon attention sans réellement convenir à mon besoin.

J’ai décidé de mixer les deux pour écrire fr.xebia.concurrent.CyclicLatch

Utilisation

Une instance de la classe fr.xebia.concurrent.CyclicLatch est créée avec les paramètres suivants :

  • une valeur N qui indique le nombre de fois avant de que le Latch ne soit libéré
  • une valeur S qui indique le timeout
// création d'un CyclicLatch avec N=100 et S=10 secondes
latch = new CyclicLatch(100, 10, TimeUnit.SECONDS); 

Cette instance est partagée entre les producteurs et le consommateur.

  • A chaque traitement terminé, chaque producteur va décrémenter le Latch.
public class Producer {
	public void doit() {
		//Perform its own business...
		latch.countDown();
	}
}
  • Le consommateur va se mettre en attente du latch
public class Consummer {
	public void run() {
		while (true) {
			latch.await() // attente avec les valeurs par défauts
			//ou
			latch.await(30,TimeUnit.Secondes) // attente de 30 secondes
			// Perform its own business....
		}
	}
}

Implémentation

La classe CyclicLatch est implémentée autour

  • d’un attribut de type CountDownLatch
final public class CyclicLatch {
	private final int initialcount;
	private CountDownLatch latch;
	private final ReentrantLock lock = new ReentrantLock();

	public CyclicLatch(int initialcount) {
		this.latch = new CountDownLatch(initialcount);
	}
}
  • de 2 méthodes, countDown() et await().

La méthode countDown() verrouille l’accès au latch et délègue au CountDownLatch.

	public void countDown() {
		try {
			lock.lock();
			latch.countDown();
		} finally {
			lock.unlock();
		}
	}

La méthode await() délègue au CountDownLatch.await(long, TimeUnit)
Extrait de javadoc : Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted or the specified waiting time elapses.

Et ensuite effectue reset safe du CountDownLatch

	public boolean await(long timeout, TimeUnit unit)
			throws InterruptedException {

		log("Waiting for latch of "+this.initialcount+"  & " + timeout + " seconds timeout");
		boolean result = latch.await(timeout, unit);
		/* Reset Latch, on timeout & on overflow */
		try {
			lock.lock();
			log("Reset Latch...." + latch);
			latch = new CountDownLatch(initialcount);
		} finally {
			lock.unlock();
		}
		return result;

	}

L’implémentation complète du CyclicLatch permet de fixer les paramètres de taille et de timeout (méthode await()) ou à chaque attente du consommateur de positionner un nouveau timeout (méthode {{await(long, TimeUnit)}})

Tests

Dans le repository de Xebia-France, vous trouverez le code complet du CyclicLatch et un exemple de mise en œuvre avec la classe fr.xebia.xke.concurrency.NProducers1ConsumerTimedTest. Cette classe crée 4 producteurs qui remplissent une BlockinQueue. Le consommateur doit lire les messages en utilisant bien sûr un CyclicLatch. Suivant le paramétrage du latch on obtient les comportements suivants :

  • CyclicLatch taille 100, time out 30 secondes. Les traces, ci-dessous, montre que le latch a été déclenché sur ‘Overflow’ donc plus de 100 messages dans la file, au bout d’environs 2.5 secondes
main Consumer.Consumer()
MyBasket.start()
Thread Consumer #0 Consumer.run()
Thread Consumer #0 Waiting for latch of 100  & 5 seconds timeout
Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@45a877[Count = 0]
Thread Consumer #0 OverFlow, 2.4278326s
Thread Consumer #0 messages #100
Thread Consumer #0 Waiting for latch of 100  & 5 seconds timeout
Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@126b249[Count = 0]
Thread Consumer #0 OverFlow, 2.5149238s
Thread Consumer #0 messages #100
Thread Consumer #0 Waiting for latch of 100  & 5 seconds timeout
Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@182f0db[Count = 0]
Thread Consumer #0 OverFlow, 2.5150535s
Thread Consumer #0 messages #100
Thread Consumer #0 Waiting for latch of 100  & 5 seconds timeou
Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@192d342[Count = 0]
Thread Consumer #0 OverFlow, 2.5142925s
Thread Consumer #0 messages #100
Thread Consumer #0 Waiting for latch of 100  & 5 seconds timeout
10062 main done
MyBasket.stop()
  • CyclicLatch taille 100, time out 2 secondes. Les traces, ci-dessous, montrent qui le latch a été déclenché maintenant sur timeout avec une file qui contient environ 80 messages.
main Consumer.Consumer()
MyBasket.start()
Thread Consumer #0 Consumer.run()
Thread Consumer #0 Waiting for latch of 100  & 2 seconds timeout
Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@45a877[Count = 20]
Thread Consumer #0 TimeOut,  2.0013237s
Thread Consumer #0 messages #80
Thread Consumer #0 Waiting for latch of 100  & 2 seconds timeout
Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@126b249[Count = 20]
Thread Consumer #0 TimeOut,  2.000778s
Thread Consumer #0 messages #80
Thread Consumer #0 Waiting for latch of 100  & 2 seconds timeout
Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@182f0db[Count = 20]
Thread Consumer #0 TimeOut,  2.0010734s
Thread Consumer #0 messages #80
Thread Consumer #0 Waiting for latch of 100  & 2 seconds timeout
Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@192d342[Count = 20]
Thread Consumer #0 TimeOut,  2.0014079s
Thread Consumer #0 messages #80
Thread Consumer #0 Waiting for latch of 100  & 2 seconds timeout
Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@6b97fd[Count = 20]
Thread Consumer #0 TimeOut,  2.001409s
Thread Consumer #0 messages #80
Thread Consumer #0 Waiting for latch of 100  & 2 seconds timeout
10078 main done
MyBasket.stop()

Conclusion

La programmation concurrente est vraiment passionnante, mais il faut absolument devenir paranoïaque. En effet, il faut envisager tous les cas possibles: chaque instruction doit être envisagée comme pouvant être interrompue. Le simple conseil que je vous donnerais est de commencer par synchroniser toutes les sections critiques (mot-clé synchronized). Ensuite, et après réflexion et bench, vous pourrez passer à des verrouillages de plus bas niveau si nécessaire (les JVM modernes ont fait d’énormes progrès dans ce domaine!)

Depuis que j’ai implémenté cette classe pour un projet et un besoin précis, je trouve régulièrement un nouveau cas d’utilisation. Le dernier en date est l’affichage dynamique dans une application Swing de graphes JFreeChart d’un très grand nombre de valeurs en fonction du temps.

Et vous, que ferez-vous du CyclicLatch ?

Publié par

Commentaire

3 réponses pour " fr.xebia.concurrent.CyclicLatch "

  1. Publié par , Il y a 11 ans

    Je pense que c’est beaucoup d’honneur d’associer le CyclicLatch à une implémentation de la JSR-166

  2. Publié par , Il y a 11 ans

    @Dominique

    Vous faites certainement référence à l’un des futurs outils de concurrence proposés pour le jdk7 : le Phaser dont nous en avions fait une rapide présentation lors d’une de nos revues de presse. Si le CyclicLatch de Benoit en reprend quelques principes de fonctionnement du Phaser, il n’en reprend pas son implémentation. Pour mémoire, l’une des grandes promesses de cette nouvelle version de JSR (JSR-166y) concerne la simplification de traitements parallèles dans des environnements multi-cores avec l’arrivée de la nouvelle API Fork and Join.

    @Benoit
    Je suis certain que Doug Lea est jaloux de ton CyclicLatch :)

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Nous recrutons

Être un Xebian, c'est faire partie d'un groupe de passionnés ; C'est l'opportunité de travailler et de partager avec des pairs parmi les plus talentueux.