Consommer et produire des messages

Nous avons vu précédemment comment faire transiter des messages d’une source à une destination. Nous allons voir maintenant comment produire et consommer ces messages.

Produire des messages

Créer une nouvelle classe PriceProducer dans le package fr.pantheonsorbonne.ufr27.miage.camel.

package fr.pantheonsorbonne.ufr27.miage.camel;

import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSContext;
import jakarta.jms.Session;
import org.eclipse.microprofile.config.inject.ConfigProperty;


import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

//cette classe est un singleton
//elle implémente l'interface Runnable qui spécifie qu'elle peut être exécutée par un scheduler
@ApplicationScoped
public class PriceProducer implements Runnable {

    //nous récupérons à l'aide de CDI une fabrique de connexions JMS
    @Inject
    ConnectionFactory connectionFactory;

    @ConfigProperty(name = "quarkus.artemis.username")
    String userName;

    //générateur de nombre aléatoire
    private final Random random = new Random();

    //planificateur d'exécution de tache
    private final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1);

    //cette méthode est appellées lorsque l'initialisation de quarkus est terminée
    void onStart(@Observes StartupEvent ev) {
        //on planifie l'exécution de la méthode run() de cette classe:
        // - immédiatement (initialDelay=0
        // - toute les 5s (period = 5L, unit = secondes)
        scheduler.scheduleAtFixedRate(this, 0L, 5L, TimeUnit.SECONDS);
    }

    //cette méthode est appellées lorsque quarkus s'arrète
    void onStop(@Observes ShutdownEvent ev) {
        scheduler.shutdown();
    }

    //cette méthode est exécutée en boucle par le scheduler
    @Override
    public void run() {
        //syntaxe try-with-resource
        //on crée un nouveau contexte JMS en spécifiant que les sessions sont cloturées automatiquement lors que les messages sont consommés.
        try (JMSContext context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
            //on crée un producteur et on y envoie un message dans une nouvelle queue "prices"
            //le message est une chaine de caractères, contenant un entier tiré aléatoirement entre 1 et 100.
            context.createProducer().send(context.createQueue("M1.prices-"+userName), Integer.toString(random.nextInt(100)));
        }
    }
}

Cette classe va produire des messages contenant des entiers aléatoires sur une queue de message appelée M1.prices- suivie de votre nom d’utilisateur qui sera récupéré depuis le fichier application.properties à l’aide de l’annotation @ConfigProperty(name = "quarkus.artemis.username")

Activité

Faites en sorte que les prix soient sauvegardés sur votre disque dur dans le répertoire data/prices

solution
package fr.pantheonsorbonne.ufr27.miage.camel;

import org.apache.camel.builder.RouteBuilder;

import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.config.inject.ConfigProperty;

@ApplicationScoped
public class CamelTutorial extends RouteBuilder {


    @ConfigProperty(name = "quarkus.artemis.username")
    String userName;

    @Override
    public void configure() {
        from("sjms2:M1.prices-"+userName).to("file:data/prices");
    }
}

Consommer des messages

Pour consommer des messages, supprimez d’abord toutes les routes de la classe CamelTutorial.

Créer la nouvelle class PriceConsumer dans le même package que précédemment:

package fr.pantheonsorbonne.ufr27.miage.camel;


import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.jms.*;
import org.eclipse.microprofile.config.inject.ConfigProperty;


@ApplicationScoped
public class PriceConsumer implements Runnable {


    @Inject
    ConnectionFactory connectionFactory;

    @ConfigProperty(name = "quarkus.artemis.username")
    String userName;

    //indique si la classe est configurée pour recevoir les messages en boucle
    boolean running;

    //cette méthode démarre un nouveau thread exécutant l'instance en cours, jusqu'à ce que la variable running soit false.
    void onStart(@Observes StartupEvent ev) {
        running = true;
        new Thread(this).start();
    }


    void onStop(@Observes ShutdownEvent ev) {
        running = false;
    }

    @Override
    public void run() {
        while (running) {
            try (JMSContext context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
                //reçoit un message à partir de la queue queue/prices
                Message mess = context.createConsumer(context.createQueue("M1.prices-" + userName)).receive();
                //converti ce message en int
                int price = Integer.parseInt(mess.getBody(String.class));
                //affiche le résultat dans la console
                System.out.println("from the consumer: " + price);

            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

Puis exécuter votre projet avec votre IDE ou en ligne de commande.

Le consommateur va lire les messages à partir de la même que de message que précédemment, directement fournis par le consommateur.

Activité

Mettre à jour de consommateur pour lire les messages à partir de la queue prices-vat qui va contenir les prix avec la TVA, et mettez à jour la classe CamelTutorial pour router tous les messages de la queue prices vers une nouvelle queue prices-vat.

solution

Dans price consumer:

Message mess = context.createConsumer(context.createQueue("M1.prices-vat-"+username)).receive();

Dans CamelTutorial

from("sjms2:M1.prices-"+username).to("sjms2:M1.prices-vat-"+username);