IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)

Tutoriel Vert.x 3 : Discuter via l'event bus

Thierry

Vert.x est une API asynchrone très proche du modèle d'acteurs. Vert.x est polyglotte, simple, scalable (élastique) et hautement concurrente. Vert.x est bien adapté aux architectures en microservices, mais s'intègre aussi parfaitement dans une WebApp plus classique. Dans ce deuxième article d'une série consacrée à Vert.x 3, nous allons voir comment faire communiquer les verticles les uns avec les autres. 3 commentaires Donner une note à l´article (5)

Article lu   fois.

L'auteur

Profil ProSite personnelICAUDA

Liens sociaux

Viadeo Twitter Facebook Share on Google+   

I. Introduction

Vert.x est un framework (sous licence Apache 2.0) destiné à la plate-forme Java qui a la particularité d'être polyglotte. Il a été conçu pour faciliter la création d'applications asynchrones et événementielles, tout en fournissant des performances (vitesse, mémoire, threads) excellentes. De par ses caractéristiques, Vert.x est notamment un bon choix pour l'écriture de microservices.

Dans l'article précédent, nous avons fait connaissance avec le framework. Nous avons notamment découvert les verticles et comment les lancer. Nous allons maintenant voir comment les faire communiquer via le bus.

Vous trouverez les codes source et les fichiers de configuration dans le fichier vertx-02.zip.

I-A. Série d'articles

Cet article fait partie d'une série consacrée à Vert.x 3 :

I-B. Versions

Pour écrire cet article, j'ai utilisé les logiciels suivants :

  • Vert.x 3.2.1 ;
  • Java JDK 1.8.0_45 ;
  • Maven 3.2 ;
  • Eclipse 4.5.

II. Quelques verticles pour commencer

Pour faire communiquer des verticles, encore faut-il en avoir plusieurs. Je vous propose donc d'en créer quelques-uns à titre d'illustration :

DogVerticle.java
Sélectionnez
public class DogVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("Start of Dog Verticle");
    }

    @Override
    public void stop() throws Exception {
        System.out.println("Stop of Dog Verticle");
    }
}
CatVerticle.java
Sélectionnez
public class CatVerticle extends AbstractVerticle{
    @Override
    public void start() throws Exception {
        System.out.println("Start of Cat Verticle");
    }
    
    
    @Override
    public void stop() throws Exception {
        System.out.println("Stop of Cat Verticle");
    }
}
ShopVerticle.java
Sélectionnez
public class ShopVerticle extends AbstractVerticle{
    @Override
    public void start() throws Exception {
        System.out.println("Start of Shop Verticle");
    }
    
    
    @Override
    public void stop() throws Exception {
        System.out.println("Stop of Shop Verticle");
    }
}

Dans le tutoriel précédent, nous avions créé un verticle équipé d'un main. Cette fois, pour que ce soit encore plus parlant, nous allons lancer ces verticles directement depuis un main d'une classe qui n'est pas un verticle :

AnimalLauncher.java
Sélectionnez
public class AnimalLauncher {

    public static void main(String[] args) {
        System.out.println("Start of Animal Launcher");

        // Pas besoin d'etendre AbstractVerticle pour faire un main...
        final Vertx vertx = Vertx.vertx();

        // Lequel vous semble le plus simple, ecrire le nom du package ou passer par class.name ?
        // vertx.deployVerticle("com.masociete.tutovertx.ShopVerticle");

        vertx.deployVerticle(ShopVerticle.class.getName());

        // pause
        Thread.sleep(500);
        
        vertx.deployVerticle(DogVerticle.class.getName());
        vertx.deployVerticle(CatVerticle.class.getName());

        System.out.println("End of Animal Launcher");
    }
}

Lors du premier tutoriel, j'avais indiqué le nom complet des verticles à lancer. Cette fois, j'utilise « class.name », ce qui revient strictement au même. Dites-moi ce que vous en pensez dans les commentaires. Lequel préférez-vous ?

Pour l'exemple, j'ai envie de lancer plusieurs instances de DogVerticle, ce qui est relativement simple à faire.

AnimalLauncher.java
Sélectionnez
final DeploymentOptions options = new DeploymentOptions() //
        .setInstances(5);

vertx.deployVerticle(DogVerticle.class.getName(), options);

Nous verrons dans un prochain article comment Vert.x gère les différentes instances des verticles, et notamment l'event loop.

Quand on lance les verticles en ligne de commande, surtout séparément, on peut en spécifier le nombre, ce qui sera plus pratique.

Les println vous permettent de voir comment tout cela s'enchaîne. Voici ce que ça donne lorsqu'on lance AnimalLauncher. Notez que l'ordre des logs peut varier d'un lancement à l'autre.

Console de AnimalLauncher
Sélectionnez
Start of Animal Launcher
Start of Shop Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
End of Animal Launcher
Start of Cat Verticle

Enfin, histoire d'avoir des choses un minimum intéressantes à envoyer, je vous propose de créer deux beans :

Dog.java
Sélectionnez
public class Dog {

    private String name;
    private int age;
    private String race;
    ...
Cat.java
Sélectionnez
public class Cat {

    private String nickname;
    private boolean milkLover;
    ...

III. Envoyer et recevoir des messages

III-A. Envoyer

Pour envoyer un message sur l'event bus, c'est très simple puisqu'il suffit de spécifier une adresse (attention une adresse n'est pas un topic), comme on le ferait avec un système de queue classique :

DogVerticle.java
Sélectionnez
final String address = "HELLO";
final String message = "I am ready";

L'envoi n'est plus qu'une formalité :

DogVerticle.java
Sélectionnez
final EventBus bus = vertx.eventBus();
bus.send(address, message);

Idem pour les chats, qui vont envoyer leurs messages à la même adresse :

CatVerticle.java
Sélectionnez
public class CatVerticle extends AbstractVerticle{
    @Override
    public void start() throws Exception {
        System.out.println("Start of Cat Verticle");
        
        final String message = "A new cat on the roofs...";
        
        vertx.eventBus().send("HELLO", message);
    }
}

III-B. Recevoir

Recevoir un message n'est pas bien plus compliqué. Il suffit de surveiller l'adresse :

ShopVerticle.java
Sélectionnez
public class ShopVerticle extends AbstractVerticle{
    @Override
    public void start() throws Exception {
        System.out.println("Start of Shop Verticle");
        
        final String address = "HELLO";
        
        final EventBus bus = vertx.eventBus();
        final MessageConsumer<String> consumer = bus.consumer(address);
        consumer.handler(message -> {
            System.out.println("incoming message: " + message.body());
        });
    }
}

Si vous le préférez, vous pouvez directement passer la lambda du handler au consumer :

ShopVerticle.java
Sélectionnez
vertx.eventBus().<String> consumer(address, message -> {
    System.out.println("incoming message: " + message.body());
});

Voici ce que ça donne au niveau de la console. L'ordre des logs peut varier d'un lancement à l'autre :

Console de AnimalLauncher
Sélectionnez
Start of Animal Launcher
Start of Shop Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
End of Animal Launcher
Start of Cat Verticle
incoming message: I am ready
incoming message: A new cat on the roofs...
incoming message: I am ready
incoming message: I am ready
incoming message: I am ready
incoming message: I am ready

III-C. Écrire de façon périodique

Pour aller plus loin, imaginons que DogVerticle envoie un message (sur une autre adresse) toutes les trois secondes. Il suffit de demander à l'objet vertx d'exécuter un handler :

DogVerticle.java
Sélectionnez
final int period = 3000;
final String address2 = "MARKET";
final String message2 = "Dog power wouaf";
vertx.setPeriodic(period, (l) -> bus.send(address2, message2));

On veut aussi écouter sur cette nouvelle adresse :

ShopVerticle.java
Sélectionnez
final String address2 = "MARKET";
final MessageConsumer<String> consumer2 = bus.consumer(address2);
consumer2.handler(message -> {
    System.out.println("Msg: " + message.body());
});

Voici ce que ça donne, sachant qu'on a conservé les autres logs :

Console de AnimalLauncher
Sélectionnez
Start of Animal Launcher
Start of Shop Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
End of Animal Launcher
Start of Cat Verticle
incoming message: I am ready
incoming message: I am ready
incoming message: I am ready
incoming message: A new cat on the roofs...
incoming message: I am ready
incoming message: I am ready
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
...

III-D. Publier

Lorsqu'on envoie un message, on peut soit l'envoyer en mode point à point avec la méthode send, soit le publier à l'aide de la méthode publish. L'usage de send fait qu'un seul consommateur (consumer) va le recevoir. L'usage de publish fait que tous les consommateurs abonnés vont le recevoir.

Pour le mettre en évidence, ajoutons DoctorVerticle qui va écouter sur l'adresse dog.address :

DoctorVerticle.java
Sélectionnez
public class DoctorVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        System.out.println("Start of Doctor Verticle");

        final String dogAddress = "dog.address";
        final EventBus bus = vertx.eventBus();
        bus.consumer(dogAddress, message -> {
            System.out.println("Dog event: " + message.body());
        });
    }

    @Override
    public void stop() throws Exception {
        System.out.println("Stop of Doctor Verticle");
    }
}

Disons qu'on en lance deux :

AnimalLauncher.java
Sélectionnez
final DeploymentOptions doctorOptions = new DeploymentOptions() //
        .setInstances(2);
vertx.deployVerticle(DoctorVerticle.class.getName(), doctorOptions);

Et pour bien faire, disons que CatVerticle va également écouter sur cette adresse :

CatVerticle.java
Sélectionnez
public class CatVerticle extends AbstractVerticle{
    @Override
    public void start() throws Exception {
        System.out.println("Start of Cat Verticle");
        
        final String message = "A new cat on the roofs...";

        final EventBus bus = vertx.eventBus();
        
        bus.send("HELLO", message);

        final String dogAddress = "dog.address";
        bus.consumer(dogAddress, msg -> {
            System.out.println("I do not like dogs");
        });
    }

Enfin, on va demander au DogVerticle d'écrire sur cette adresse :

DogVerticle.java
Sélectionnez
bus.send("dog.address", "Grrr");

Voyons maintenant ce que ça donne au niveau de la console. Le résultat change d'une exécution à l'autre :

Console de AnimalLauncher
Sélectionnez
I do not like dogs
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr

Comment comprendre ces logs ? On a lancé cinq DogVerticle qui vont chacun envoyer un message. La méthode send se contente d'envoyer au premier consommateur disponible qui l'écoute.

Image non disponible
send

Remplaçons maintenant l'utilisation de send par publish :

DogVerticle.java
Sélectionnez
bus.publish("dog.address", "Grrr");

Cette fois, il y aura beaucoup plus de logs :

Console de AnimalLauncher
Sélectionnez
Dog event: Grrr
Dog event: Grrr
I do not like dogs
I do not like dogs
Dog event: Grrr
Dog event: Grrr
I do not like dogs
I do not like dogs
Dog event: Grrr
I do not like dogs
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr

Comment comprendre ces logs ? On a donc cinq DogVerticle qui publient chacun un message. On a deux consommateurs lancés par DoctorVerticle et un consommateur lancé par CatVerticle qui écoutent. Les consommateurs de DoctorVerticle vont donc recevoir cinq messages chacun, soit dix en tout, et celui de CatVerticle va recevoir cinq messages. On aura donc quinze messages reçus en tout.

Image non disponible
publish

IV. Répondre aux messages

Quand on envoie un message avec Vert.x sur le bus, on peut aussi indiquer un callback, qui sera appelé si une réponse est reçue :

DogVerticle.java
Sélectionnez
vertx.setPeriodic(period, (l) -> bus.send(address2, message2, ar -> {
        System.out.println("Received number: " + ar.result().body());
}));

Pour envoyer une réponse au message, ce n'est pas beaucoup plus difficile :

ShopVerticle.java
Sélectionnez
final String address2 = "MARKET";
final Random random = new Random();
final AtomicInteger ai = new AtomicInteger(0);
final MessageConsumer<String> consumer2 = bus.consumer(address2, message -> {
    System.out.println("Msg: " + message.body());
    final int index = ai.incrementAndGet();
    message.reply("Hi " + index + " : " + random.nextInt(99));
});
Console de AnimalLauncher
Sélectionnez
...
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Received number: Hi 2 : 40
Msg: Dog power wouaf
Received number: Hi 3 : 9
Received number: Hi 4 : 81
Received number: Hi 1 : 76
Msg: Dog power wouaf
Received number: Hi 5 : 16

Regardons de plus près la définition de la méthode send (ici la plus complète) et plus spécifiquement son dernier argument :

EventBus.java
Sélectionnez
/**
 * Like {@link #send(String, Object, DeliveryOptions)} but specifying a {@code replyHandler} that will be called if the recipient
 * subsequently replies to the message.
 *
 * @param address  the address to send it to
 * @param message  the message, may be {@code null}
 * @param options  delivery options
 * @param replyHandler  reply handler will be called when any reply from the recipient is received, may be {@code null}
 * @return a reference to this, so the API can be used fluently
 */
@Fluent
<T> EventBus send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler);

Vous remarquez que le handler attend une réponse asynchrone, ce qui mérite un peu d'explications. Vertx garantit que les messages que vos verticles envoient seront bien reçus dans l'ordre dans lequel ils ont été émis, sans pour autant pouvoir dire que la méthode send est synchrone. Hormis quelques rares exceptions comme les workers (dont on parlera dans un prochain article) Vert.x travaille de manière non bloquante. La réponse est quant à elle attendue de manière asynchrone et sans ordre. On voit d'ailleurs dans la console que les réponses arrivent dans le désordre. Cette mécanique est une des grandes forces de Vert.x. C'est en partie grâce à elle que le framework peut afficher de si bonnes performances, notamment en élasticité (scalabilité).

Les fonctionnements synchrones et asynchrones reposent sur des concepts différents. Passer d'un raisonnement synchrone à asynchrone demande une petite gymnastique, mais cela en vaut la peine. Pour commencer, demandez-vous simplement si vous avez besoin que vos traitements soient synchrones. La plupart du temps, la réponse sera non…

Bien entendu, il est possible que, pour une raison ou une autre, quelque chose se passe mal. On retournera donc une erreur. Par exemple, disons qu'on veut que ça échoue quand l'index est un multiple de 3. On fera appel à fail :

ShopVerticle.java
Sélectionnez
final MessageConsumer<String> consumer2 = bus.consumer(address2, message -> {
    System.out.println("Msg: " + message.body());
    final int index = ai.incrementAndGet();
    if (index % 3 != 0) {
        message.reply("Hi " + index + " : " + random.nextInt(99));
    } else {
        System.out.println("Fail fail fail...");
        message.fail(123, "Failed because " + index + " is a multiple of 3.");
    }
});

Au niveau du callback, on va prendre en compte les éventuelles erreurs à l'aide de succeded et failed. Ici j'ai utilisé les deux pour bien illustrer le fonctionnement, mais il va de soi que les deux s'excluent mutuellement :

DogVerticle.java
Sélectionnez
vertx.setPeriodic(period, (l) -> bus.send(address2, message2, ar -> {
    if (ar.succeeded()) {
        System.out.println("Received number: " + ar.result().body());
    } else if (ar.failed()) { // Le else suffit ici en vrai
        System.out.println("It failed: " + ar.cause().getMessage());
    }
}));
Console de AnimalLauncher
Sélectionnez
...
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Received number: Hi 1 : 33
Received number: Hi 2 : 50
Msg: Dog power wouaf
Msg: Dog power wouaf
Received number: Hi 4 : 74
It failed: Failed because 3 is a multiple of 3.
Received number: Hi 5 : 67
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
It failed: Failed because 6 is a multiple of 3.
Received number: Hi 7 : 0
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Received number: Hi 8 : 65
Msg: Dog power wouaf
It failed: Failed because 9 is a multiple of 3.
Received number: Hi 10 : 71

V. Synchroniser les lancements

Vous avez sûrement remarqué la petite pause (Thread.sleep) entre le lancement de ShopVerticle et celui des DogVerticle et CatVerticle. Cette pause est nécessaire pour que tout ait le temps de se mettre en place avant que DogVerticle et CatVerticle n'envoient leurs messages. Maintenant que vous avez fait un peu plus connaissance avec Vert.x, on va faire un peu mieux. Une des idées que vous pourriez avoir serait d'envoyer des messages périodiquement jusqu'à ce que les autres verticles répondent. Cette stratégie pourrait marcher, mais elle oblige à polluer vos verticles avec du code technique dédié.

Vert.x permet également de surveiller le déploiement de vos verticles. La méthode deployVerticle appelle automatiquement un callback lorsque c'est bon :

AnimalLauncher.java
Sélectionnez
//vertx.deployVerticle(ShopVerticle.class.getName());
vertx.deployVerticle(ShopVerticle.class.getName(), ar -> {
    System.out.println("Shop Verticle Deployed");
});

On peut donc se servir de ce call back pour conditionner les autres lancements :

AnimalLauncher.java
Sélectionnez
vertx.deployVerticle(ShopVerticle.class.getName(), sar -> {
    System.out.println("Shop Verticle Deployed");
    
    vertx.deployVerticle(DoctorVerticle.class.getName(), doctorOptions);
});

On reproduit simplement pour tout le monde :

AnimalLauncher.java
Sélectionnez
public class AnimalLauncherAllInOne {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Start of Animal Launcher");

        final DeploymentOptions doctorOptions = new DeploymentOptions() //
                .setInstances(2);

        final DeploymentOptions dogOptions = new DeploymentOptions() //
                .setInstances(5);

        final Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(ShopVerticle.class.getName(), sar -> {
            System.out.println("Shop Verticle Deployed");

            vertx.deployVerticle(DoctorVerticle.class.getName(), doctorOptions, dar -> {
                System.out.println("Doctor Verticle Deployed");

                vertx.deployVerticle(DogVerticle.class.getName(), dogOptions);
                vertx.deployVerticle(CatVerticle.class.getName());
            });
        });

        System.out.println("End of Animal Launcher");
    }
}

Personnellement, je trouve que ça devient vite le bazar. Je préfère définir séparément les handlers :

AnimalLauncher.java
Sélectionnez
final Vertx vertx = Vertx.vertx();

final Handler<AsyncResult<String>> doctorCompletionHandler = dar -> {
    System.out.println("Doctor Verticle Deployed");
    vertx.deployVerticle(DogVerticle.class.getName(), dogOptions);
    vertx.deployVerticle(CatVerticle.class.getName());
};

final Handler<AsyncResult<String>> shopCompletionHandler = sar -> {
    System.out.println("Shop Verticle Deployed");
    vertx.deployVerticle(DoctorVerticle.class.getName(), doctorOptions, doctorCompletionHandler);
};

vertx.deployVerticle(ShopVerticle.class.getName(), shopCompletionHandler);

Les handlers sont bien séparés comme ça. En revanche, il faut les définir dans l'ordre inverse de leur utilisation. Dites-moi dans les commentaires la version que vous préférez.

VI. Envoyer des données

VI-A. JSON

Envoyer des messages simples ne devrait pas vous suffire très longtemps. Sans surprise, vous aurez déjà deviné que le format privilégié pour échanger des structures plus complexes avec Vert.x est JSON. Envoyer des messages au format JSON n'est pas plus difficile qu'en texte simple. En fait, c'est quasiment la même chose. Et bien entendu, vous n'allez pas construire une String, mais un JsonObject :

CatVerticle.java
Sélectionnez
public class CatVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        
        final EventBus bus = vertx.eventBus();
        ...

        final AtomicInteger ai = new AtomicInteger(1);
        vertx.setPeriodic(3000, (l) -> {
            final Cat cat = new Cat();
            cat.setNickname("Kitty-" + ai.incrementAndGet());
            cat.setMilkLover(true);

            final JsonObject json = new JsonObject() //
                    .put("nickname", cat.getNickname()) //
                    .put("milkLover", cat.isMilkLover());
            
            bus.send("json.address", json);
        });
    }

Le AtomicInteger sert juste à générer des noms (nickname) de chats différents.

C'est au niveau de la réception qu'il va y avoir quelques petites différences, à commencer par le fait que vous n'allez plus avoir un message de String mais un message de JsonObject :

ShopVerticle.java
Sélectionnez
public class ShopVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        
        final EventBus bus = vertx.eventBus();
        ...
        
        bus.<JsonObject> consumer("json.address", message -> {
            final JsonObject body = message.body();
            final String nickname = body.getString("nickname");
            System.out.println("Nickname json: " + nickname);
        });
    }

L'affichage qui en résulte n'est pas impressionnant :

Console de AnimalLauncher
Sélectionnez
Nickname: Kitty-1
Nickname: Kitty-2
Nickname: Kitty-3
Nickname: Kitty-4
...

Facile ? Un peu trop même… Reste qu'il faut manipuler les champs du JSON, ce qui risque de vous agacer rapidement. Si c'est le cas, passez vite au chapitre suivant.

VI-B. Objet

L'idéal serait de pouvoir communiquer directement avec les objets du domaine sur le bus. Au niveau de l'envoi, c'est relativement simple :

CatVerticle.java
Sélectionnez
public class CatVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        
        final EventBus bus = vertx.eventBus();
        
        vertx.setPeriodic(3000, (l) -> {
            final Cat cat = new Cat();
            ...

            bus.send("object.address", cat);
        });
    }

C'est même si simple que ça va lancer une exception, expliquant qu'aucun codec n'a été indiqué pour cet objet du modèle :

Console de AnimalLauncher
Sélectionnez
avr. 25, 2016 11:08:04 PM io.vertx.core.impl.ContextImpl
GRAVE: Unhandled exception
java.lang.IllegalArgumentException: No message codec for type: class com.masociete.tutovertx.domain.Cat

On doit donc créer un codec. Les codecs permettent à Vertx de transformer vos objets en flux (en JSON dans l'exemple suivant) lors de l'envoi et d'un flux en objet lors de la réception. En première approche, on peut voir cela comme du mapping. Le codec qui manipulera le JSON donc, mais ça ne sera fait que là :

CatMessageCodec.java
Sélectionnez
public class CatMessageCodec implements MessageCodec<Cat, Cat> {

    @Override
    public void encodeToWire(Buffer buffer, Cat cat) {
        // Easiest ways is using JSON object
        final JsonObject jsonToEncode = new JsonObject();
        jsonToEncode.put("nickname", cat.getNickname());
        jsonToEncode.put("milkLover", cat.isMilkLover());

        // Encode object to string
        final String jsonToStr = jsonToEncode.encode();

        // Length of JSON: is NOT characters count
        final int length = jsonToStr.getBytes().length;

        // Write data into given buffer
        buffer.appendInt(length);
        buffer.appendString(jsonToStr);
    }

    @Override
    public Cat decodeFromWire(int position, Buffer buffer) {
        // My custom message starting from this *position* of buffer
        int _pos = position;

        // Length of JSON
        final int length = buffer.getInt(_pos);

        // Get JSON string by it`s length
        // Jump 4 because getInt() == 4 bytes
        final String jsonStr = buffer.getString(_pos += 4, _pos += length);
        final JsonObject contentJson = new JsonObject(jsonStr);

        // Get fields
        final String nickname = contentJson.getString("nickname");
        final boolean milkLover = contentJson.getBoolean("milkLover");

        // We can finally create the cat object
        final Cat cat = new Cat();
        cat.setNickname(nickname);
        cat.setMilkLover(milkLover);

        return cat;
    }

    @Override
    public Cat transform(Cat cat) {
        return cat;
    }

    @Override
    public String name() {
        // Each codec must have a unique name.
        // This is used to identify a codec when sending a message and for unregistering codecs.
        return this.getClass().getSimpleName();
    }

    @Override
    public byte systemCodecID() {
        // Always -1
        return -1;
    }
}

Vous vous demandez sans doute à quoi correspond ce « +4 » dans le code de decodeFromWire. Il vient du fait que les int sont encodés avec 32 bits en Java, soit 4 octets, et donc 4 bytes…

Vous arriverez certainement à faire mieux avec votre bibliothèque JSON favorite.

Il suffit ensuite d'enregistrer le codec au niveau du bus et de l'indiquer lors de l'envoi :

CatVerticle.java
Sélectionnez
public class CatVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        
        final EventBus bus = vertx.eventBus();
        
        final CatMessageCodec catCodec = new CatMessageCodec();
        bus.registerCodec(catCodec);
        
        final DeliveryOptions options = new DeliveryOptions().setCodecName(catCodec.name());
        
        vertx.setPeriodic(3000, (l) -> {
            final Cat cat = new Cat();
            ...

            bus.send("object.address", cat, options);
        });
    }

On peut aussi indiquer au bus que le codec sera systématiquement utilisé, par défaut, pour cet objet. Dans ce cas, on ne doit plus le préciser au moment de l'envoi :

CatVerticle.java
Sélectionnez
final CatMessageCodec catCodec = new CatMessageCodec();
//bus.registerCodec(catCodec);
bus.registerDefaultCodec(Cat.class, catCodec);

vertx.setPeriodic(3000, (l) -> {
    final Cat cat = new Cat();
    ...

    bus.send("object.address", cat);
});

Le plus difficile est fait. On peut maintenant recevoir directement l'objet du domaine :

ShopVerticle.java
Sélectionnez
bus.<Cat> consumer("object.address", message -> {
    final Cat cat = message.body();
    final String nickname = cat.getNickname();
    System.out.println("Nickname object: " + nickname);
});

Il n'est pas nécessaire d'enregistrer de nouveau le codec sur le bus pour consommer les messages puisque l'émetteur l'a déjà enregistré.

C'est tout de même plus sympa de travailler avec des objets qu'avec un JSON…

Ces exemples partent du principe que l'émetteur et le consommateur vont travailler avec les mêmes objets, ce qui n'est absolument pas nécessaire et, surtout, ce qui a peu de chances de vraiment se produire. Pour l'illustrer, disons qu'on continue d'envoyer des Chats (Cat) ; mais qu'on ne souhaite recevoir que des animaux (Animal). Créons un bean très simple :

Animal.java
Sélectionnez
public class Animal {

    private String name;
    ...

Créons aussi le codec adapté :

CatAnimalMessageCodec.java
Sélectionnez
public class CatAnimalMessageCodec implements MessageCodec<Cat, Animal> {

    @Override
    public void encodeToWire(Buffer buffer, Cat cat) {
        // Easiest ways is using JSON object
        final JsonObject jsonToEncode = new JsonObject();
        jsonToEncode.put("name", cat.getNickname());

        ...
    }

    @Override
    public Animal decodeFromWire(int position, Buffer buffer) {
        ...

        // Get fields
        final String name = contentJson.getString("name");

        // We can finally create the animal object
        final Animal animal = new Animal();
        animal.setName(name);

        return animal;
    }

    @Override
    public Animal transform(Cat cat) {
        final Animal animal = new Animal();
        animal.setName(cat.getNickname());
        return animal;
    }

Il ne reste plus qu'à envoyer en utilisant ce nouveau codec :

CatVerticle.java
Sélectionnez
final CatAnimalMessageCodec catAnimalCodec = new CatAnimalMessageCodec();
bus.registerCodec(catAnimalCodec);
final DeliveryOptions options2 = new DeliveryOptions().setCodecName(catAnimalCodec.name());
final Cat cat = new Cat(..);                
bus.send("object2.address", cat, options2);

Lors de la réception, on peut désormais utiliser directement l'autre objet :

ShopVerticle.java
Sélectionnez
bus.<Animal> consumer("object2.address", message -> {
    final Animal animal = message.body();
    final String name = animal.getName();
    System.out.println("Name object: " + name);
});

VII. Cluster

Jusque là, tous les verticles étaient lancés depuis la même classe main, et donc dans la même JVM. Or on imagine bien que ce n'est pas ce que vous avez envie d'envoyer en prod. Ce serait mieux de les lancer séparément. Je vous propose la répartition suivante. DogVerticle et CatVerticle restent dans AnimalLauncher. DoctorVerticle va dans DoctorLauncher qu'on crée pour l'occasion. Enfin, ShopVerticle va dans ShopLauncher qu'on crée également pour l'occasion. Voici ce que ça donne :

AnimalLauncher.java
Sélectionnez
public class AnimalLauncher {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Start of Animal Launcher");

        final Vertx vertx = Vertx.vertx();

        // Dog
        final DeploymentOptions dogOptions = new DeploymentOptions() //
                .setInstances(5);
        vertx.deployVerticle(DogVerticle.class.getName(), dogOptions);
        
        // Cat
        vertx.deployVerticle(CatVerticle.class.getName());

        System.out.println("End of Animal Launcher");
    }
}
DoctorLauncher.java
Sélectionnez
public class DoctorLauncher {

    public static void main(String[] args) {
        System.out.println("Start of Doctor Launcher");

        final Vertx vertx = Vertx.vertx();

        final DeploymentOptions doctorOptions = new DeploymentOptions() //
                .setInstances(2);
        vertx.deployVerticle(DoctorVerticle.class.getName(), doctorOptions);

        System.out.println("End of Doctor Launcher");
    }
}
DoctorLauncher.java
Sélectionnez
public class ShopLauncher {

    public static void main(String[] args) {
        System.out.println("Start of Shop Launcher");

        final Vertx vertx = Vertx.vertx();
        
        vertx.deployVerticle(ShopVerticle.class.getName());
        
        System.out.println("End of Shop Launcher");
    }
}

C'est tout de suite plus lisible. Bien entendu, on va les lancer dans le bon ordre : Shop, puis Doctor, puis Animal. Voici ce qu'on voit apparaître dans les trois consoles une fois que tout est lancé (en respectant les temps de pause) :

Console de ShopLauncher
Sélectionnez
Start of Shop Launcher
End of Shop Launcher
Start of Shop Verticle
Console de DoctorLauncher
Sélectionnez
Start of Doctor Launcher
End of Doctor Launcher
Start of Doctor Verticle
Start of Doctor Verticle
Console de AnimalLauncher
Sélectionnez
Start of Animal Launcher
End of Animal Launcher
Start of Dog Verticle
Start of Cat Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
I do not like dogs
I do not like dogs
I do not like dogs
I do not like dogs
I do not like dogs
It failed: No handlers for address MARKET
It failed: No handlers for address MARKET
It failed: No handlers for address MARKET
It failed: No handlers for address MARKET
It failed: No handlers for address MARKET
It failed: No handlers for address MARKET
It failed: No handlers for address MARKET
It failed: No handlers for address MARKET
It failed: No handlers for address MARKET
It failed: No handlers for address MARKET
...

On voit tout de suite que quelque chose cloche. Il n'y a que les messages échangés entre DogVerticle et CatVerticle qui continuent de fonctionner. Les autres messages n'arrivent pas et voici pourquoi. Chaque appel à Vertx.vertx() crée un nouveau bus. Or les différents bus, lancés dans des JVM distinctes, ne sont pas reliés.

Il faut donc passer les bus en mode cluster. La première chose que je vous invite à faire est d'ajouter une dépendance à « vertx-hazelcast » dans votre pom.xml :

pom.xml
Sélectionnez
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-hazelcast</artifactId>
    <version>${vertx.version}</version>
</dependency>

Je peux vous proposer deux façons de traiter la suite, chacune ayant ses avantages et ses inconvénients.

VII-A. Programmatiquement

Le plus simple, à ce niveau est de demander à Vert.x de nous fournir un bus en mode cluster :

ShopLauncher.java
Sélectionnez
public class ShopLauncher {

    public static void main(String[] args) {
        System.out.println("Start of Shop Launcher");

        // final Vertx vertx = Vertx.vertx();
        
        final ClusterManager mgr = new HazelcastClusterManager();
        final VertxOptions options = new VertxOptions().setClusterManager(mgr);
        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                System.out.println("res ok shop");
                final Vertx vertx = res.result();
                vertx.deployVerticle(ShopVerticle.class.getName());
            } else {
                System.out.println("FAIL !!!");
            }
        });

        System.out.println("End of Shop Launcher");
    }
}

Faites de même avec les autres launchers :

DoctorLauncher.java
Sélectionnez
public class DoctorLauncher {

    public static void main(String[] args) {
        System.out.println("Start of Doctor Launcher");

        // final Vertx vertx = Vertx.vertx();

        final DeploymentOptions doctorOptions = new DeploymentOptions() //
                .setInstances(2);

        final ClusterManager mgr = new HazelcastClusterManager();

        final VertxOptions options = new VertxOptions().setClusterManager(mgr);
        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                System.out.println("res ok doctor");
                final Vertx vertx = res.result();
                vertx.deployVerticle(DoctorVerticle.class.getName(), doctorOptions);
            } else {
                System.out.println("FAIL !!!");
            }
        });

        System.out.println("End of Doctor Launcher");
    }
}
AnimalLauncher.java
Sélectionnez
public class AnimalLauncher {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Start of Animal Launcher");
        
        final ClusterManager mgr = new HazelcastClusterManager();

        final DeploymentOptions dogOptions = new DeploymentOptions() //
                .setInstances(5);

        final VertxOptions options = new VertxOptions().setClusterManager(mgr);
        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                System.out.println("res ok animal");
                final Vertx vertx = res.result();
                vertx.deployVerticle(DogVerticle.class.getName(), dogOptions);
                vertx.deployVerticle(CatVerticle.class.getName());
            } else {
                System.out.println("FAIL !!!");
            }
        });        

        System.out.println("End of Animal Launcher");
    }
}

Vous avez également besoin de configurer Hazelcast (dont on parlera dans un prochain article) à l'aide du fichier « cluster.xml ». Prenez celui qui est fourni dans le zip associé à cet article. Il fera largement l'affaire. Voyons ce que ça donne quand on relance.

Ici on utilise Hazelcast en tant que Cluster manager. Hazelcast est bien plus que ça. On aurait aussi pu utiliser jGroup ou Ignite.

Console de ShopLauncher
Sélectionnez
Start of Shop Launcher
End of Shop Launcher
avr. 28, 2016 12:24:57 PM com.hazelcast.config.AbstractXmlConfigHelper
AVERTISSEMENT: Name of the hazelcast schema location incorrect using default
avr. 28, 2016 12:24:58 PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL] [dev] [3.5.2] Interfaces is enabled, trying to pick one address matching to one of: [127.0.0.1]
avr. 28, 2016 12:24:58 PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL] [dev] [3.5.2] Picked Address[127.0.0.1]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
avr. 28, 2016 12:24:58 PM com.hazelcast.spi.OperationService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Backpressure is disabled
avr. 28, 2016 12:24:58 PM com.hazelcast.spi.impl.operationexecutor.classic.ClassicOperationExecutor
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Starting with 4 generic operation threads and 8 partition operation threads.
avr. 28, 2016 12:24:58 PM com.hazelcast.system
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Hazelcast 3.5.2 (20150826 - ba8dbba) starting at Address[127.0.0.1]:5701
avr. 28, 2016 12:24:58 PM com.hazelcast.system
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved.
avr. 28, 2016 12:24:58 PM com.hazelcast.instance.Node
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Creating TcpIpJoiner
avr. 28, 2016 12:24:58 PM com.hazelcast.core.LifecycleService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Address[127.0.0.1]:5701 is STARTING
avr. 28, 2016 12:24:58 PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Connecting to /127.0.0.1:5703, timeout: 0, bind-any: true
avr. 28, 2016 12:24:58 PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Connecting to /127.0.0.1:5702, timeout: 0, bind-any: true
avr. 28, 2016 12:24:58 PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Could not connect to: /127.0.0.1:5703. Reason: SocketException[Connection refused to address /127.0.0.1:5703]
avr. 28, 2016 12:24:58 PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Could not connect to: /127.0.0.1:5702. Reason: SocketException[Connection refused to address /127.0.0.1:5702]
avr. 28, 2016 12:24:58 PM com.hazelcast.cluster.impl.TcpIpJoiner
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Address[127.0.0.1]:5703 is added to the blacklist.
avr. 28, 2016 12:24:58 PM com.hazelcast.cluster.impl.TcpIpJoiner
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Address[127.0.0.1]:5702 is added to the blacklist.
avr. 28, 2016 12:24:59 PM com.hazelcast.cluster.impl.TcpIpJoiner
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] 


Members [1] {
    Member [127.0.0.1]:5701 this
}

avr. 28, 2016 12:24:59 PM com.hazelcast.core.LifecycleService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Address[127.0.0.1]:5701 is STARTED
avr. 28, 2016 12:24:59 PM com.hazelcast.partition.InternalPartitionService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Initializing cluster partition table first arrangement...
res ok shop
Start of Shop Verticle
avr. 28, 2016 12:25:08 PM com.hazelcast.nio.tcp.SocketAcceptor
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Accepting socket connection from /127.0.0.1:62836
avr. 28, 2016 12:25:08 PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Established socket connection between /127.0.0.1:5701
avr. 28, 2016 12:25:09 PM com.hazelcast.cluster.ClusterService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] 

Members [2] {
    Member [127.0.0.1]:5701 this
    Member [127.0.0.1]:5702
}

avr. 28, 2016 12:25:09 PM com.hazelcast.partition.InternalPartitionService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Re-partitioning cluster data... Migration queue size: 135
avr. 28, 2016 12:25:10 PM com.hazelcast.partition.InternalPartitionService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] All migration tasks have been completed, queues are empty.
Incoming message: I am the Doctor
Incoming message: I am the Doctor
avr. 28, 2016 12:25:16 PM com.hazelcast.nio.tcp.SocketAcceptor
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Accepting socket connection from /127.0.0.1:62849
avr. 28, 2016 12:25:16 PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Established socket connection between /127.0.0.1:5701
avr. 28, 2016 12:25:17 PM com.hazelcast.cluster.ClusterService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] 

Members [3] {
    Member [127.0.0.1]:5701 this
    Member [127.0.0.1]:5702
    Member [127.0.0.1]:5703
}

avr. 28, 2016 12:25:17 PM com.hazelcast.partition.InternalPartitionService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] Re-partitioning cluster data... Migration queue size: 90
avr. 28, 2016 12:25:19 PM com.hazelcast.partition.InternalPartitionService
INFOS: [127.0.0.1]:5701 [dev] [3.5.2] All migration tasks have been completed, queues are empty.
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: A new cat on the roofs...
Msg: Dog power wouaf
avr. 28, 2016 12:25:24 PM io.vertx.core.impl.ContextImpl
GRAVE: Unhandled exception
java.lang.IllegalStateException: No message codec registered with name CatMessageCodec
    at io.vertx.core.eventbus.impl.clustered.ClusteredMessage.readFromWire(ClusteredMessage.java:148)
    at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$1.handle(ClusteredEventBus.java:256)
    at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$1.handle(ClusteredEventBus.java:248)
    at io.vertx.core.parsetools.impl.RecordParserImpl.parseFixed(RecordParserImpl.java:203)
Console de DoctorLauncher
Sélectionnez
Start of Doctor Launcher
End of Doctor Launcher
avr. 28, 2016 12:25:07 PM com.hazelcast.config.AbstractXmlConfigHelper
AVERTISSEMENT: Name of the hazelcast schema location incorrect using default
avr. 28, 2016 12:25:07 PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL] [dev] [3.5.2] Interfaces is enabled, trying to pick one address matching to one of: [127.0.0.1]
avr. 28, 2016 12:25:07 PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL] [dev] [3.5.2] Picked Address[127.0.0.1]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true
avr. 28, 2016 12:25:07 PM com.hazelcast.spi.OperationService
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Backpressure is disabled
avr. 28, 2016 12:25:07 PM com.hazelcast.spi.impl.operationexecutor.classic.ClassicOperationExecutor
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Starting with 4 generic operation threads and 8 partition operation threads.
avr. 28, 2016 12:25:08 PM com.hazelcast.system
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Hazelcast 3.5.2 (20150826 - ba8dbba) starting at Address[127.0.0.1]:5702
avr. 28, 2016 12:25:08 PM com.hazelcast.system
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved.
avr. 28, 2016 12:25:08 PM com.hazelcast.instance.Node
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Creating TcpIpJoiner
avr. 28, 2016 12:25:08 PM com.hazelcast.core.LifecycleService
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Address[127.0.0.1]:5702 is STARTING
avr. 28, 2016 12:25:08 PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Connecting to /127.0.0.1:5701, timeout: 0, bind-any: true
avr. 28, 2016 12:25:08 PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Connecting to /127.0.0.1:5703, timeout: 0, bind-any: true
avr. 28, 2016 12:25:08 PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Could not connect to: /127.0.0.1:5703. Reason: SocketException[Connection refused to address /127.0.0.1:5703]
avr. 28, 2016 12:25:08 PM com.hazelcast.cluster.impl.TcpIpJoiner
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Address[127.0.0.1]:5703 is added to the blacklist.
avr. 28, 2016 12:25:08 PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Established socket connection between /127.0.0.1:62836
avr. 28, 2016 12:25:09 PM com.hazelcast.cluster.ClusterService
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] 

Members [2] {
    Member [127.0.0.1]:5701
    Member [127.0.0.1]:5702 this
}

avr. 28, 2016 12:25:11 PM com.hazelcast.core.LifecycleService
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Address[127.0.0.1]:5702 is STARTED
res ok doctor
Start of Doctor Verticle
Start of Doctor Verticle
avr. 28, 2016 12:25:16 PM com.hazelcast.nio.tcp.SocketAcceptor
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Accepting socket connection from /127.0.0.1:62848
avr. 28, 2016 12:25:16 PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] Established socket connection between /127.0.0.1:5702
avr. 28, 2016 12:25:17 PM com.hazelcast.cluster.ClusterService
INFOS: [127.0.0.1]:5702 [dev] [3.5.2] 

Members [3] {
    Member [127.0.0.1]:5701
    Member [127.0.0.1]:5702 this
    Member [127.0.0.1]:5703
}

Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Console de AnimalLauncher
Sélectionnez
Start of Animal Launcher
End of Animal Launcher
avr. 28, 2016 12:25:16 PM com.hazelcast.config.AbstractXmlConfigHelper
AVERTISSEMENT: Name of the hazelcast schema location incorrect using default
avr. 28, 2016 12:25:16 PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL] [dev] [3.5.2] Interfaces is enabled, trying to pick one address matching to one of: [127.0.0.1]
avr. 28, 2016 12:25:16 PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL] [dev] [3.5.2] Picked Address[127.0.0.1]:5703, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5703], bind any local is true
avr. 28, 2016 12:25:16 PM com.hazelcast.spi.OperationService
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Backpressure is disabled
avr. 28, 2016 12:25:16 PM com.hazelcast.spi.impl.operationexecutor.classic.ClassicOperationExecutor
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Starting with 4 generic operation threads and 8 partition operation threads.
avr. 28, 2016 12:25:16 PM com.hazelcast.system
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Hazelcast 3.5.2 (20150826 - ba8dbba) starting at Address[127.0.0.1]:5703
avr. 28, 2016 12:25:16 PM com.hazelcast.system
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Copyright (c) 2008-2015, Hazelcast, Inc. All Rights Reserved.
avr. 28, 2016 12:25:16 PM com.hazelcast.instance.Node
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Creating TcpIpJoiner
avr. 28, 2016 12:25:16 PM com.hazelcast.core.LifecycleService
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Address[127.0.0.1]:5703 is STARTING
avr. 28, 2016 12:25:16 PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Connecting to /127.0.0.1:5702, timeout: 0, bind-any: true
avr. 28, 2016 12:25:16 PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Connecting to /127.0.0.1:5701, timeout: 0, bind-any: true
avr. 28, 2016 12:25:16 PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Established socket connection between /127.0.0.1:62848
avr. 28, 2016 12:25:16 PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Established socket connection between /127.0.0.1:62849
avr. 28, 2016 12:25:17 PM com.hazelcast.cluster.ClusterService
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] 

Members [3] {
    Member [127.0.0.1]:5701
    Member [127.0.0.1]:5702
    Member [127.0.0.1]:5703 this
}

avr. 28, 2016 12:25:19 PM com.hazelcast.core.LifecycleService
INFOS: [127.0.0.1]:5703 [dev] [3.5.2] Address[127.0.0.1]:5703 is STARTED
res ok animal
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Cat Verticle
I do not like dogs
I do not like dogs
I do not like dogs
I do not like dogs
I do not like dogs
Received number: Hi 1 : 23
Received number: Hi 4 : 88
Received number: Hi 2 : 23
Received number: Hi 5 : 62
It failed: Failed because 3 is a multiple of 3.

C'est tout de suite plus bavard. Si vous connaissez déjà HazelcastSite Web d'Hazelcast, vous reconnaissez la structure de ces logs. Ici, il y a deux choses à voir en plus du fonctionnement de nos verticles. La première, c'est que Hazelcast trouve tout seul les membres du cluster. Dans une configuration d'entreprise, on pourra affiner ce comportement.

La deuxième chose est que les codecs ne sont pas partagés sur le bus en cluster. Il faut donc changer le verticle pour que le codec soit chargé des deux côtés :

ShopVerticle.java
Sélectionnez
public class ShopVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        ...
        final CatMessageCodec catCodec = new CatMessageCodec();
        final CatAnimalMessageCodec catAnimalCodec = new CatAnimalMessageCodec();

        bus.registerCodec(catCodec);
        bus.registerCodec(catAnimalCodec);
        ...

Tout devrait être bon dans la console :

Console de ShopLauncher
Sélectionnez
...
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: A new cat on the roofs...
Incoming message: I am ready
Msg: Dog power wouaf
Msg: Dog power wouaf
Nickname object: Kitty-1
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Msg: Dog power wouaf
Name object: Kitty-1
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Nickname object: Kitty-2
Name object: Kitty-2
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Nickname object: Kitty-3
Name object: Kitty-3
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Msg: Dog power wouaf
...

Si un cluster Hazelcast est déjà disponible sur votre plate-forme, vous pouvez bien entendu l'utiliser. Il suffira de bien configurer cluster.xml.

VII-B. Par argument

À mon sens, le plus simple et le plus pratique est toutefois de travailler avec des verticles à la place des mains :

ShopLauncherAsVerticle.java
Sélectionnez
public class ShopLauncherAsVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("Start of Shop Launcher");

        // final Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(ShopVerticle.class.getName());

        System.out.println("End of Shop Launcher");
    }
}
DoctorLauncherAsVerticle.java
Sélectionnez
public class DoctorLauncherAsVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("Start of Doctor Launcher");

        // final Vertx vertx = Vertx.vertx();

        final DeploymentOptions doctorOptions = new DeploymentOptions() //
                .setInstances(2);
        vertx.deployVerticle(DoctorVerticle.class.getName(), doctorOptions);

        System.out.println("End of Doctor Launcher");
    }
}
AnimalLauncherAsVerticle.java
Sélectionnez
public class AnimalLauncherAsVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("Start of Animal Launcher");

        // final Vertx vertx = Vertx.vertx();

        final DeploymentOptions dogOptions = new DeploymentOptions() //
                .setInstances(5);

        vertx.deployVerticle(DogVerticle.class.getName(), dogOptions);
        vertx.deployVerticle(CatVerticle.class.getName());

        System.out.println("End of Animal Launcher");
    }
}

Il suffit ensuite de lancer ces verticles avec l'option « -cluster » pour que tout fonctionne. N'oubliez pas d'ajouter les codecs et cluster.xml…

On peut également utiliser l'argument « -ha », pour « high availability », qui indique à Vert.x de relancer les verticles en cas de mort. Dans ce cas, il n'est pas utile d'ajouter « -cluster ».

Si vous lancez les verticles depuis Eclipse (ou un autre IDE), utilisez la classe « io.vertx.core.Launcher » et indiquez « run com.masociete.tutovertx.ShopLauncherAsVerticle -cluster » comme argument de programme.

Image non disponible
Image non disponible

La classe « Starter » que je vous avais indiquée à l'occasion du tutoriel précédent est désormais dépréciée (Deprecated) au profit de « Launcher ».

VIII. Conclusions

Le sujet de cet article peut sembler simple, voire trivial, mais il ne l'est clairement pas. En arrivant jusqu'à cette conclusion, vous avez déjà fait un pas de géant dans votre découverte de Vert.x (et pas seulement). Vous savez maintenant comment envoyer (en point à point) et comment publier des messages sur le bus. Vous savez que le bus peut être local ou distribué (clusterisé), notamment grâce à Hazelcast. Vous savez comment écouter sur le bus et répondre de manière asynchrone et non bloquante. Vous ne savez pas seulement comment envoyer des messages simples, mais également des structures organisées en JSON ainsi que des objets et des grappes.

Dans le prochain article, nous mettrons tout cela en œuvre afin de travailler avec des bases de données. Nous verrons que certains drivers, comme celui de Cassandra, sont asynchrones, tandis que d'autres sont bloquants, à l'image de JDBC.

Vos retours nous aident à améliorer nos publications. N'hésitez donc pas à commenter cet article sur le forum. 3 commentaires Donner une note à l´article (5)

IX. Remerciements

D'abord, j'adresse mes remerciements à l'équipe Vert.x et aux créateurs de modules complémentaires, pour avoir développé une bibliothèque aussi utile et pour la maintenir. Je n'oublie pas tous les contributeurs qui participent, notamment sur le forum.

Je souhaite également remercier Clément Escoffier qui a volontiers accepté de relire cet article. Enfin, je tiens à remercier l'équipe de Developpez.com et plus particulièrement à Logan Mauzaize, Mickael Baron et Claude Leloup.

X. Annexes

X-A. Liens

X-B. Codes complets

AnimalLauncherAllInOne.java
Sélectionnez
package com.masociete.tutovertx;

import io.vertx.core.AsyncResult;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;

public class AnimalLauncherAllInOne {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Start of Animal Launcher");

        final DeploymentOptions doctorOptions = new DeploymentOptions() //
                .setInstances(2);

        final DeploymentOptions dogOptions = new DeploymentOptions() //
                .setInstances(5);

        final Vertx vertx = Vertx.vertx();

        final Handler<AsyncResult<String>> doctorCompletionHandler = dar -> {
            System.out.println("Doctor Verticle Deployed");
            vertx.deployVerticle(DogVerticle.class.getName(), dogOptions);
            vertx.deployVerticle(CatVerticle.class.getName());
        };

        final Handler<AsyncResult<String>> shopCompletionHandler = sar -> {
            System.out.println("Shop Verticle Deployed");
            vertx.deployVerticle(DoctorVerticle.class.getName(), doctorOptions, doctorCompletionHandler);
        };

        vertx.deployVerticle(ShopVerticle.class.getName(), shopCompletionHandler);

        System.out.println("End of Animal Launcher");
    }
}
AnimalLauncher.java
Sélectionnez
package com.masociete.tutovertx;

import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;

public class AnimalLauncher {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Start of Animal Launcher");

        final ClusterManager mgr = new HazelcastClusterManager();

        final DeploymentOptions dogOptions = new DeploymentOptions() //
                .setInstances(5);

        final VertxOptions options = new VertxOptions().setClusterManager(mgr);
        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                System.out.println("res ok animal");
                final Vertx vertx = res.result();
                vertx.deployVerticle(DogVerticle.class.getName(), dogOptions);
                vertx.deployVerticle(CatVerticle.class.getName());
            } else {
                System.out.println("FAIL !!!");
            }
        });

        System.out.println("End of Animal Launcher");
    }
}
DoctorLauncher.java
Sélectionnez
package com.masociete.tutovertx;

import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;

public class DoctorLauncher {

    public static void main(String[] args) {
        System.out.println("Start of Doctor Launcher");

        final DeploymentOptions doctorOptions = new DeploymentOptions() //
                .setInstances(2);

        final ClusterManager mgr = new HazelcastClusterManager();

        final VertxOptions options = new VertxOptions().setClusterManager(mgr);
        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                System.out.println("res ok doctor");
                final Vertx vertx = res.result();
                vertx.deployVerticle(DoctorVerticle.class.getName(), doctorOptions);
            } else {
                System.out.println("FAIL !!!");
            }
        });

        System.out.println("End of Doctor Launcher");
    }
}
ShopLauncher.java
Sélectionnez
package com.masociete.tutovertx;

import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;

public class ShopLauncher {

    public static void main(String[] args) {
        System.out.println("Start of Shop Launcher");

        final ClusterManager mgr = new HazelcastClusterManager();

        final VertxOptions options = new VertxOptions().setClusterManager(mgr);
        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                System.out.println("res ok shop");
                final Vertx vertx = res.result();
                vertx.deployVerticle(ShopVerticle.class.getName());
            } else {
                System.out.println("FAIL !!!");
            }
        });

        System.out.println("End of Shop Launcher");
    }
}
DogVerticle.java
Sélectionnez
package com.masociete.tutovertx;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;

public class DogVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        System.out.println("Start of Dog Verticle");

        final String address = "HELLO";
        final String message = "I am ready";

        final EventBus bus = vertx.eventBus();
        bus.send(address, message);

        Thread.sleep(1000);
        //bus.send("dog.address", "Grrr");
        bus.publish("dog.address", "Grrr");
        
        
        final int period = 3000;
        final String address2 = "MARKET";
        final String message2 = "Dog power wouaf";
        vertx.setPeriodic(period, (l) -> bus.send(address2, message2, ar -> {
            if (ar.succeeded()) {
                System.out.println("Received number: " + ar.result().body());
            } else if (ar.failed()) {
                System.out.println("It failed: " + ar.cause().getMessage());
            }
        }));
        
        
    }

    @Override
    public void stop() throws Exception {
        System.out.println("Stop of Dog Verticle");
    }
}
CatVerticle.java
Sélectionnez
package com.masociete.tutovertx;

import java.util.concurrent.atomic.AtomicInteger;

import com.masociete.tutovertx.domain.Cat;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;

public class CatVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        System.out.println("Start of Cat Verticle");

        final String message = "A new cat on the roofs...";

        final EventBus bus = vertx.eventBus();

        bus.send("HELLO", message);

        final String dogAddress = "dog.address";
        bus.consumer(dogAddress, msg -> {
            System.out.println("I do not like dogs");
        });

        Thread.sleep(1000);

        final CatMessageCodec catCodec = new CatMessageCodec();
        final CatAnimalMessageCodec catAnimalCodec = new CatAnimalMessageCodec();

        try {
            bus.registerCodec(catCodec);
        } catch (IllegalStateException e) {
            System.out.println("Codec allreday registered...");
        }
        try {
            bus.registerCodec(catAnimalCodec);
        } catch (IllegalStateException e) {
            System.out.println("Codec allreday registered...");
        }

        // bus.registerDefaultCodec(Cat.class, catCodec);

        final DeliveryOptions options = new DeliveryOptions().setCodecName(catCodec.name());
        final DeliveryOptions options2 = new DeliveryOptions().setCodecName(catAnimalCodec.name());

        final AtomicInteger ai = new AtomicInteger(0);
        vertx.setPeriodic(3000, (l) -> {
            final Cat cat = new Cat();
            cat.setNickname("Kitty-" + ai.incrementAndGet());
            cat.setMilkLover(true);

            final JsonObject json = new JsonObject() //
                    .put("nickname", cat.getNickname()) //
                    .put("milkLover", cat.isMilkLover());

            // bus.send("json.address", json);
            bus.send("object.address", cat, options);
            bus.send("object2.address", cat, options2);
            // bus.send("object.address", cat);
        });
    }

    @Override
    public void stop() throws Exception {
        System.out.println("Stop of Cat Verticle");
    }
}
DoctorVerticle.java
Sélectionnez
package com.masociete.tutovertx;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;

public class DoctorVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        System.out.println("Start of Doctor Verticle");

        final EventBus bus = vertx.eventBus();
        bus.send("HELLO", "I am the Doctor");

        final String dogAddress = "dog.address";
        final MessageConsumer<String> consumer = bus.consumer(dogAddress);
        consumer.handler(message -> {
            System.out.println("Dog event: " + message.body());
        });
    }

    @Override
    public void stop() throws Exception {
        System.out.println("Stop of Doctor Verticle");
    }
}
ShopVerticle.java
Sélectionnez
package com.masociete.tutovertx;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import com.masociete.tutovertx.domain.Animal;
import com.masociete.tutovertx.domain.Cat;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;

public class ShopVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        System.out.println("Start of Shop Verticle");

        final String address = "HELLO";
        final EventBus bus = vertx.eventBus();

        final CatMessageCodec catCodec = new CatMessageCodec();
        final CatAnimalMessageCodec catAnimalCodec = new CatAnimalMessageCodec();

        bus.registerCodec(catCodec);
        bus.registerCodec(catAnimalCodec);

        final MessageConsumer<String> consumer = bus.consumer(address);
        consumer.handler(message -> {
            System.out.println("Incoming message: " + message.body());
        });

        final String address2 = "MARKET";
        final Random random = new Random();
        final AtomicInteger ai = new AtomicInteger(0);
        bus.consumer(address2, message -> {
            System.out.println("Msg: " + message.body());
            final int index = ai.incrementAndGet();
            if (index % 3 != 0) {
                message.reply("Hi " + index + " : " + random.nextInt(99));
            } else {
                System.out.println("Fail fail fail...");
                message.fail(123, "Failed because " + index + " is a multiple of 3.");
            }
        });

        bus.<JsonObject> consumer("json.address", message -> {
            final JsonObject body = message.body();
            final String nickname = body.getString("nickname");
            System.out.println("Nickname json: " + nickname);
        });

        bus.<Cat> consumer("object.address", message -> {
            final Cat cat = message.body();
            final String nickname = cat.getNickname();
            System.out.println("Nickname object: " + nickname);
        });

        bus.<Animal> consumer("object2.address", message -> {
            final Animal animal = message.body();
            final String name = animal.getName();
            System.out.println("Name object: " + name);
        });
    }

    @Override
    public void stop() throws Exception {
        System.out.println("Stop of Shop Verticle");
    }
}
CatMessageCodec.java
Sélectionnez
package com.masociete.tutovertx;

import com.masociete.tutovertx.domain.Cat;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.json.JsonObject;

public class CatMessageCodec implements MessageCodec<Cat, Cat> {

    @Override
    public void encodeToWire(Buffer buffer, Cat cat) {
        // Easiest ways is using JSON object
        final JsonObject jsonToEncode = new JsonObject();
        jsonToEncode.put("nickname", cat.getNickname());
        jsonToEncode.put("milkLover", cat.isMilkLover());

        // Encode object to string
        final String jsonToStr = jsonToEncode.encode();

        // Length of JSON: is NOT characters count
        final int length = jsonToStr.getBytes().length;

        // Write data into given buffer
        buffer.appendInt(length);
        buffer.appendString(jsonToStr);
    }

    @Override
    public Cat decodeFromWire(int position, Buffer buffer) {
        // My custom message starting from this *position* of buffer
        int _pos = position;

        // Length of JSON
        final int length = buffer.getInt(_pos);

        // Get JSON string by it`s length
        // Jump 4 because getInt() == 4 bytes
        final String jsonStr = buffer.getString(_pos += 4, _pos += length);
        final JsonObject contentJson = new JsonObject(jsonStr);

        // Get fields
        final String nickname = contentJson.getString("nickname");
        final boolean milkLover = contentJson.getBoolean("milkLover");

        // We can finally create the cat object
        final Cat cat = new Cat();
        cat.setNickname(nickname);
        cat.setMilkLover(milkLover);

        return cat;
    }

    @Override
    public Cat transform(Cat cat) {
        return cat;
    }

    @Override
    public String name() {
        // Each codec must have a unique name.
        // This is used to identify a codec when sending a message and for unregistering codecs.
        return this.getClass().getSimpleName();
    }

    @Override
    public byte systemCodecID() {
        // Always -1
        return -1;
    }
}

Vous avez aimé ce tutoriel ? Alors partagez-le en cliquant sur les boutons suivants : Viadeo Twitter Facebook Share on Google+   

Les sources présentées sur cette page sont libres de droits et vous pouvez les utiliser à votre convenance. Par contre, la page de présentation constitue une œuvre intellectuelle protégée par les droits d'auteur. Copyright © 2016 Thierry-Leriche-Dessirier. Aucune reproduction, même partielle, ne peut être faite de ce site ni de l'ensemble de son contenu : textes, documents, images, etc. sans l'autorisation expresse de l'auteur. Sinon vous encourez selon la loi jusqu'à trois ans de prison et jusqu'à 300 000 € de dommages et intérêts.