I. Introduction▲
Vert.x est un framework (sous licence Apache 2.0) destiné à la plate-forme Java qui a la particularité d'être polyglotte. Il a été conçu pour faciliter la création d'applications asynchrones et événementielles, tout en fournissant des performances (vitesse, mémoire, threads) excellentes. De par ses caractéristiques, Vert.x est notamment un bon choix pour l'écriture de microservices.
Dans l'article précédent, nous avons fait connaissance avec le framework. Nous avons notamment découvert les verticles et comment les lancer. Nous allons maintenant voir comment les faire communiquer via le bus.
Vous trouverez les codes source et les fichiers de configuration dans le fichier vertx-02.zip.
I-A. Série d'articles▲
Cet article fait partie d'une série consacrée à Vert.x 3 :
- créer et lancer un verticle ;
- Discuter via l'event bus ;
- bases de données (bientôt) ;
- Web services (bientôt) ;
- When-RX (bientôt) ;
- In-memory data grid (bientôt) ;
- sessions (bientôt) ;
- sécurité (bientôt).
I-B. Versions▲
Pour écrire cet article, j'ai utilisé les logiciels suivants :
- Vert.x 3.2.1 ;
- Java JDK 1.8.0_45 ;
- Maven 3.2 ;
- Eclipse 4.5.
II. Quelques verticles pour commencer▲
Pour faire communiquer des verticles, encore faut-il en avoir plusieurs. Je vous propose donc d'en créer quelques-uns à titre d'illustration :
public
class
DogVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Dog Verticle"
);
}
@Override
public
void
stop
(
) throws
Exception {
System.out.println
(
"Stop of Dog Verticle"
);
}
}
public
class
CatVerticle extends
AbstractVerticle{
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Cat Verticle"
);
}
@Override
public
void
stop
(
) throws
Exception {
System.out.println
(
"Stop of Cat Verticle"
);
}
}
public
class
ShopVerticle extends
AbstractVerticle{
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Shop Verticle"
);
}
@Override
public
void
stop
(
) throws
Exception {
System.out.println
(
"Stop of Shop Verticle"
);
}
}
Dans le tutoriel précédent, nous avions créé un verticle équipé d'un main. Cette fois, pour que ce soit encore plus parlant, nous allons lancer ces verticles directement depuis un main d'une classe qui n'est pas un verticle :
public
class
AnimalLauncher {
public
static
void
main
(
String[] args) {
System.out.println
(
"Start of Animal Launcher"
);
// Pas besoin d'etendre AbstractVerticle pour faire un main...
final
Vertx vertx =
Vertx.vertx
(
);
// Lequel vous semble le plus simple, ecrire le nom du package ou passer par class.name ?
// vertx.deployVerticle("com.masociete.tutovertx.ShopVerticle");
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
));
// pause
Thread.sleep
(
500
);
vertx.deployVerticle
(
DogVerticle.class
.getName
(
));
vertx.deployVerticle
(
CatVerticle.class
.getName
(
));
System.out.println
(
"End of Animal Launcher"
);
}
}
Lors du premier tutoriel, j'avais indiqué le nom complet des verticles à lancer. Cette fois, j'utilise « class.name », ce qui revient strictement au même. Dites-moi ce que vous en pensez dans les commentaires. Lequel préférez-vous ?
Pour l'exemple, j'ai envie de lancer plusieurs instances de DogVerticle, ce qui est relativement simple à faire.
final
DeploymentOptions options =
new
DeploymentOptions
(
) //
.setInstances
(
5
);
vertx.deployVerticle
(
DogVerticle.class
.getName
(
), options);
Nous verrons dans un prochain article comment Vert.x gère les différentes instances des verticles, et notamment l'event loop.
Quand on lance les verticles en ligne de commande, surtout séparément, on peut en spécifier le nombre, ce qui sera plus pratique.
Les println vous permettent de voir comment tout cela s'enchaîne. Voici ce que ça donne lorsqu'on lance AnimalLauncher. Notez que l'ordre des logs peut varier d'un lancement à l'autre.
Start of Animal Launcher
Start of Shop Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
End of Animal Launcher
Start of Cat Verticle
Enfin, histoire d'avoir des choses un minimum intéressantes à envoyer, je vous propose de créer deux beans :
public
class
Dog {
private
String name;
private
int
age;
private
String race;
...
public
class
Cat {
private
String nickname;
private
boolean
milkLover;
...
III. Envoyer et recevoir des messages▲
III-A. Envoyer▲
Pour envoyer un message sur l'event bus, c'est très simple puisqu'il suffit de spécifier une adresse (attention une adresse n'est pas un topic), comme on le ferait avec un système de queue classique :
final
String address =
"HELLO"
;
final
String message =
"I am ready"
;
L'envoi n'est plus qu'une formalité :
final
EventBus bus =
vertx.eventBus
(
);
bus.send
(
address, message);
Idem pour les chats, qui vont envoyer leurs messages à la même adresse :
public
class
CatVerticle extends
AbstractVerticle{
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Cat Verticle"
);
final
String message =
"A new cat on the roofs..."
;
vertx.eventBus
(
).send
(
"HELLO"
, message);
}
}
III-B. Recevoir▲
Recevoir un message n'est pas bien plus compliqué. Il suffit de surveiller l'adresse :
public
class
ShopVerticle extends
AbstractVerticle{
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Shop Verticle"
);
final
String address =
"HELLO"
;
final
EventBus bus =
vertx.eventBus
(
);
final
MessageConsumer<
String>
consumer =
bus.consumer
(
address);
consumer.handler
(
message ->
{
System.out.println
(
"incoming message: "
+
message.body
(
));
}
);
}
}
Si vous le préférez, vous pouvez directement passer la lambda du handler au consumer :
vertx.eventBus
(
).<
String>
consumer
(
address, message ->
{
System.out.println
(
"incoming message: "
+
message.body
(
));
}
);
Voici ce que ça donne au niveau de la console. L'ordre des logs peut varier d'un lancement à l'autre :
Start of Animal Launcher
Start of Shop Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
End of Animal Launcher
Start of Cat Verticle
incoming message: I am ready
incoming message: A new cat on the roofs...
incoming message: I am ready
incoming message: I am ready
incoming message: I am ready
incoming message: I am ready
III-C. Écrire de façon périodique▲
Pour aller plus loin, imaginons que DogVerticle envoie un message (sur une autre adresse) toutes les trois secondes. Il suffit de demander à l'objet vertx d'exécuter un handler :
final
int
period =
3000
;
final
String address2 =
"MARKET"
;
final
String message2 =
"Dog power wouaf"
;
vertx.setPeriodic
(
period, (
l) ->
bus.send
(
address2, message2));
On veut aussi écouter sur cette nouvelle adresse :
final
String address2 =
"MARKET"
;
final
MessageConsumer<
String>
consumer2 =
bus.consumer
(
address2);
consumer2.handler
(
message ->
{
System.out.println
(
"Msg: "
+
message.body
(
));
}
);
Voici ce que ça donne, sachant qu'on a conservé les autres logs :
Start of Animal Launcher
Start of Shop Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
End of Animal Launcher
Start of Cat Verticle
incoming message: I am ready
incoming message: I am ready
incoming message: I am ready
incoming message: A new cat on the roofs...
incoming message: I am ready
incoming message: I am ready
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
...
III-D. Publier▲
Lorsqu'on envoie un message, on peut soit l'envoyer en mode point à point avec la méthode send, soit le publier à l'aide de la méthode publish. L'usage de send fait qu'un seul consommateur (consumer) va le recevoir. L'usage de publish fait que tous les consommateurs abonnés vont le recevoir.
Pour le mettre en évidence, ajoutons DoctorVerticle qui va écouter sur l'adresse dog.address :
public
class
DoctorVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Doctor Verticle"
);
final
String dogAddress =
"dog.address"
;
final
EventBus bus =
vertx.eventBus
(
);
bus.consumer
(
dogAddress, message ->
{
System.out.println
(
"Dog event: "
+
message.body
(
));
}
);
}
@Override
public
void
stop
(
) throws
Exception {
System.out.println
(
"Stop of Doctor Verticle"
);
}
}
Disons qu'on en lance deux :
final
DeploymentOptions doctorOptions =
new
DeploymentOptions
(
) //
.setInstances
(
2
);
vertx.deployVerticle
(
DoctorVerticle.class
.getName
(
), doctorOptions);
Et pour bien faire, disons que CatVerticle va également écouter sur cette adresse :
public
class
CatVerticle extends
AbstractVerticle{
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Cat Verticle"
);
final
String message =
"A new cat on the roofs..."
;
final
EventBus bus =
vertx.eventBus
(
);
bus.send
(
"HELLO"
, message);
final
String dogAddress =
"dog.address"
;
bus.consumer
(
dogAddress, msg ->
{
System.out.println
(
"I do not like dogs"
);
}
);
}
Enfin, on va demander au DogVerticle d'écrire sur cette adresse :
bus.send
(
"dog.address"
, "Grrr"
);
Voyons maintenant ce que ça donne au niveau de la console. Le résultat change d'une exécution à l'autre :
I do
not like dogs
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Comment comprendre ces logs ? On a lancé cinq DogVerticle qui vont chacun envoyer un message. La méthode send se contente d'envoyer au premier consommateur disponible qui l'écoute.
Remplaçons maintenant l'utilisation de send par publish :
bus.publish
(
"dog.address"
, "Grrr"
);
Cette fois, il y aura beaucoup plus de logs :
Dog event: Grrr
Dog event: Grrr
I do
not like dogs
I do
not like dogs
Dog event: Grrr
Dog event: Grrr
I do
not like dogs
I do
not like dogs
Dog event: Grrr
I do
not like dogs
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Comment comprendre ces logs ? On a donc cinq DogVerticle qui publient chacun un message. On a deux consommateurs lancés par DoctorVerticle et un consommateur lancé par CatVerticle qui écoutent. Les consommateurs de DoctorVerticle vont donc recevoir cinq messages chacun, soit dix en tout, et celui de CatVerticle va recevoir cinq messages. On aura donc quinze messages reçus en tout.
IV. Répondre aux messages▲
Quand on envoie un message avec Vert.x sur le bus, on peut aussi indiquer un callback, qui sera appelé si une réponse est reçue :
vertx.setPeriodic
(
period, (
l) ->
bus.send
(
address2, message2, ar ->
{
System.out.println
(
"Received number: "
+
ar.result
(
).body
(
));
}
));
Pour envoyer une réponse au message, ce n'est pas beaucoup plus difficile :
final
String address2 =
"MARKET"
;
final
Random random =
new
Random
(
);
final
AtomicInteger ai =
new
AtomicInteger
(
0
);
final
MessageConsumer<
String>
consumer2 =
bus.consumer
(
address2, message ->
{
System.out.println
(
"Msg: "
+
message.body
(
));
final
int
index =
ai.incrementAndGet
(
);
message.reply
(
"Hi "
+
index +
" : "
+
random.nextInt
(
99
));
}
);
...
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Received number: Hi 2
: 40
Msg: Dog power wouaf
Received number: Hi 3
: 9
Received number: Hi 4
: 81
Received number: Hi 1
: 76
Msg: Dog power wouaf
Received number: Hi 5
: 16
Regardons de plus près la définition de la méthode send (ici la plus complète) et plus spécifiquement son dernier argument :
/**
* Like
{@link
#send(String, Object, DeliveryOptions)
}
but specifying a
{@code
replyHandler
}
that will be called if the recipient
* subsequently replies to the message.
*
*
@param
address
the address to send it to
*
@param
message
the message, may be
{@code
null
}
*
@param
options
delivery options
*
@param
replyHandler
reply handler will be called when any reply from the recipient is received, may be
{@code
null
}
*
@return
a reference to this, so the API can be used fluently
*/
@Fluent
<
T>
EventBus send
(
String address, Object message, DeliveryOptions options, Handler<
AsyncResult<
Message<
T>>>
replyHandler);
Vous remarquez que le handler attend une réponse asynchrone, ce qui mérite un peu d'explications. Vertx garantit que les messages que vos verticles envoient seront bien reçus dans l'ordre dans lequel ils ont été émis, sans pour autant pouvoir dire que la méthode send est synchrone. Hormis quelques rares exceptions comme les workers (dont on parlera dans un prochain article) Vert.x travaille de manière non bloquante. La réponse est quant à elle attendue de manière asynchrone et sans ordre. On voit d'ailleurs dans la console que les réponses arrivent dans le désordre. Cette mécanique est une des grandes forces de Vert.x. C'est en partie grâce à elle que le framework peut afficher de si bonnes performances, notamment en élasticité (scalabilité).
Les fonctionnements synchrones et asynchrones reposent sur des concepts différents. Passer d'un raisonnement synchrone à asynchrone demande une petite gymnastique, mais cela en vaut la peine. Pour commencer, demandez-vous simplement si vous avez besoin que vos traitements soient synchrones. La plupart du temps, la réponse sera non…
Bien entendu, il est possible que, pour une raison ou une autre, quelque chose se passe mal. On retournera donc une erreur. Par exemple, disons qu'on veut que ça échoue quand l'index est un multiple de 3. On fera appel à fail :
final
MessageConsumer<
String>
consumer2 =
bus.consumer
(
address2, message ->
{
System.out.println
(
"Msg: "
+
message.body
(
));
final
int
index =
ai.incrementAndGet
(
);
if
(
index %
3
!=
0
) {
message.reply
(
"Hi "
+
index +
" : "
+
random.nextInt
(
99
));
}
else
{
System.out.println
(
"Fail fail fail..."
);
message.fail
(
123
, "Failed because "
+
index +
" is a multiple of 3."
);
}
}
);
Au niveau du callback, on va prendre en compte les éventuelles erreurs à l'aide de succeded et failed. Ici j'ai utilisé les deux pour bien illustrer le fonctionnement, mais il va de soi que les deux s'excluent mutuellement :
vertx.setPeriodic
(
period, (
l) ->
bus.send
(
address2, message2, ar ->
{
if
(
ar.succeeded
(
)) {
System.out.println
(
"Received number: "
+
ar.result
(
).body
(
));
}
else
if
(
ar.failed
(
)) {
// Le else suffit ici en vrai
System.out.println
(
"It failed: "
+
ar.cause
(
).getMessage
(
));
}
}
));
...
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Received number: Hi 1
: 33
Received number: Hi 2
: 50
Msg: Dog power wouaf
Msg: Dog power wouaf
Received number: Hi 4
: 74
It failed: Failed because 3
is a multiple of 3
.
Received number: Hi 5
: 67
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
It failed: Failed because 6
is a multiple of 3
.
Received number: Hi 7
: 0
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Received number: Hi 8
: 65
Msg: Dog power wouaf
It failed: Failed because 9
is a multiple of 3
.
Received number: Hi 10
: 71
V. Synchroniser les lancements▲
Vous avez sûrement remarqué la petite pause (Thread.sleep) entre le lancement de ShopVerticle et celui des DogVerticle et CatVerticle. Cette pause est nécessaire pour que tout ait le temps de se mettre en place avant que DogVerticle et CatVerticle n'envoient leurs messages. Maintenant que vous avez fait un peu plus connaissance avec Vert.x, on va faire un peu mieux. Une des idées que vous pourriez avoir serait d'envoyer des messages périodiquement jusqu'à ce que les autres verticles répondent. Cette stratégie pourrait marcher, mais elle oblige à polluer vos verticles avec du code technique dédié.
Vert.x permet également de surveiller le déploiement de vos verticles. La méthode deployVerticle appelle automatiquement un callback lorsque c'est bon :
//vertx.deployVerticle(ShopVerticle.class.getName());
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
), ar ->
{
System.out.println
(
"Shop Verticle Deployed"
);
}
);
On peut donc se servir de ce call back pour conditionner les autres lancements :
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
), sar ->
{
System.out.println
(
"Shop Verticle Deployed"
);
vertx.deployVerticle
(
DoctorVerticle.class
.getName
(
), doctorOptions);
}
);
On reproduit simplement pour tout le monde :
public
class
AnimalLauncherAllInOne {
public
static
void
main
(
String[] args) throws
InterruptedException {
System.out.println
(
"Start of Animal Launcher"
);
final
DeploymentOptions doctorOptions =
new
DeploymentOptions
(
) //
.setInstances
(
2
);
final
DeploymentOptions dogOptions =
new
DeploymentOptions
(
) //
.setInstances
(
5
);
final
Vertx vertx =
Vertx.vertx
(
);
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
), sar ->
{
System.out.println
(
"Shop Verticle Deployed"
);
vertx.deployVerticle
(
DoctorVerticle.class
.getName
(
), doctorOptions, dar ->
{
System.out.println
(
"Doctor Verticle Deployed"
);
vertx.deployVerticle
(
DogVerticle.class
.getName
(
), dogOptions);
vertx.deployVerticle
(
CatVerticle.class
.getName
(
));
}
);
}
);
System.out.println
(
"End of Animal Launcher"
);
}
}
Personnellement, je trouve que ça devient vite le bazar. Je préfère définir séparément les handlers :
final
Vertx vertx =
Vertx.vertx
(
);
final
Handler<
AsyncResult<
String>>
doctorCompletionHandler =
dar ->
{
System.out.println
(
"Doctor Verticle Deployed"
);
vertx.deployVerticle
(
DogVerticle.class
.getName
(
), dogOptions);
vertx.deployVerticle
(
CatVerticle.class
.getName
(
));
}
;
final
Handler<
AsyncResult<
String>>
shopCompletionHandler =
sar ->
{
System.out.println
(
"Shop Verticle Deployed"
);
vertx.deployVerticle
(
DoctorVerticle.class
.getName
(
), doctorOptions, doctorCompletionHandler);
}
;
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
), shopCompletionHandler);
Les handlers sont bien séparés comme ça. En revanche, il faut les définir dans l'ordre inverse de leur utilisation. Dites-moi dans les commentaires la version que vous préférez.
VI. Envoyer des données▲
VI-A. JSON▲
Envoyer des messages simples ne devrait pas vous suffire très longtemps. Sans surprise, vous aurez déjà deviné que le format privilégié pour échanger des structures plus complexes avec Vert.x est JSON. Envoyer des messages au format JSON n'est pas plus difficile qu'en texte simple. En fait, c'est quasiment la même chose. Et bien entendu, vous n'allez pas construire une String, mais un JsonObject :
public
class
CatVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
final
EventBus bus =
vertx.eventBus
(
);
...
final
AtomicInteger ai =
new
AtomicInteger
(
1
);
vertx.setPeriodic
(
3000
, (
l) ->
{
final
Cat cat =
new
Cat
(
);
cat.setNickname
(
"Kitty-"
+
ai.incrementAndGet
(
));
cat.setMilkLover
(
true
);
final
JsonObject json =
new
JsonObject
(
) //
.put
(
"nickname"
, cat.getNickname
(
)) //
.put
(
"milkLover"
, cat.isMilkLover
(
));
bus.send
(
"json.address"
, json);
}
);
}
Le AtomicInteger sert juste à générer des noms (nickname) de chats différents.
C'est au niveau de la réception qu'il va y avoir quelques petites différences, à commencer par le fait que vous n'allez plus avoir un message de String mais un message de JsonObject :
public
class
ShopVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
final
EventBus bus =
vertx.eventBus
(
);
...
bus.<
JsonObject>
consumer
(
"json.address"
, message ->
{
final
JsonObject body =
message.body
(
);
final
String nickname =
body.getString
(
"nickname"
);
System.out.println
(
"Nickname json: "
+
nickname);
}
);
}
L'affichage qui en résulte n'est pas impressionnant :
Nickname: Kitty-1
Nickname: Kitty-2
Nickname: Kitty-3
Nickname: Kitty-4
...
Facile ? Un peu trop même… Reste qu'il faut manipuler les champs du JSON, ce qui risque de vous agacer rapidement. Si c'est le cas, passez vite au chapitre suivant.
VI-B. Objet▲
L'idéal serait de pouvoir communiquer directement avec les objets du domaine sur le bus. Au niveau de l'envoi, c'est relativement simple :
public
class
CatVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
final
EventBus bus =
vertx.eventBus
(
);
vertx.setPeriodic
(
3000
, (
l) ->
{
final
Cat cat =
new
Cat
(
);
...
bus.send
(
"object.address"
, cat);
}
);
}
C'est même si simple que ça va lancer une exception, expliquant qu'aucun codec n'a été indiqué pour cet objet du modèle :
avr. 25
, 2016
11
:08
:04
PM io.vertx.core.impl.ContextImpl
GRAVE: Unhandled exception
java.lang.IllegalArgumentException: No message codec for
type: class com.masociete.tutovertx.domain.Cat
On doit donc créer un codec. Les codecs permettent à Vertx de transformer vos objets en flux (en JSON dans l'exemple suivant) lors de l'envoi et d'un flux en objet lors de la réception. En première approche, on peut voir cela comme du mapping. Le codec qui manipulera le JSON donc, mais ça ne sera fait que là :
public
class
CatMessageCodec implements
MessageCodec<
Cat, Cat>
{
@Override
public
void
encodeToWire
(
Buffer buffer, Cat cat) {
// Easiest ways is using JSON object
final
JsonObject jsonToEncode =
new
JsonObject
(
);
jsonToEncode.put
(
"nickname"
, cat.getNickname
(
));
jsonToEncode.put
(
"milkLover"
, cat.isMilkLover
(
));
// Encode object to string
final
String jsonToStr =
jsonToEncode.encode
(
);
// Length of JSON: is NOT characters count
final
int
length =
jsonToStr.getBytes
(
).length;
// Write data into given buffer
buffer.appendInt
(
length);
buffer.appendString
(
jsonToStr);
}
@Override
public
Cat decodeFromWire
(
int
position, Buffer buffer) {
// My custom message starting from this *position* of buffer
int
_pos =
position;
// Length of JSON
final
int
length =
buffer.getInt
(
_pos);
// Get JSON string by it`s length
// Jump 4 because getInt() == 4 bytes
final
String jsonStr =
buffer.getString
(
_pos +=
4
, _pos +=
length);
final
JsonObject contentJson =
new
JsonObject
(
jsonStr);
// Get fields
final
String nickname =
contentJson.getString
(
"nickname"
);
final
boolean
milkLover =
contentJson.getBoolean
(
"milkLover"
);
// We can finally create the cat object
final
Cat cat =
new
Cat
(
);
cat.setNickname
(
nickname);
cat.setMilkLover
(
milkLover);
return
cat;
}
@Override
public
Cat transform
(
Cat cat) {
return
cat;
}
@Override
public
String name
(
) {
// Each codec must have a unique name.
// This is used to identify a codec when sending a message and for unregistering codecs.
return
this
.getClass
(
).getSimpleName
(
);
}
@Override
public
byte
systemCodecID
(
) {
// Always -1
return
-
1
;
}
}
Vous vous demandez sans doute à quoi correspond ce « +4 » dans le code de decodeFromWire. Il vient du fait que les int sont encodés avec 32 bits en Java, soit 4 octets, et donc 4 bytes…
Vous arriverez certainement à faire mieux avec votre bibliothèque JSON favorite.
Il suffit ensuite d'enregistrer le codec au niveau du bus et de l'indiquer lors de l'envoi :
public
class
CatVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
final
EventBus bus =
vertx.eventBus
(
);
final
CatMessageCodec catCodec =
new
CatMessageCodec
(
);
bus.registerCodec
(
catCodec);
final
DeliveryOptions options =
new
DeliveryOptions
(
).setCodecName
(
catCodec.name
(
));
vertx.setPeriodic
(
3000
, (
l) ->
{
final
Cat cat =
new
Cat
(
);
...
bus.send
(
"object.address"
, cat, options);
}
);
}
On peut aussi indiquer au bus que le codec sera systématiquement utilisé, par défaut, pour cet objet. Dans ce cas, on ne doit plus le préciser au moment de l'envoi :
final
CatMessageCodec catCodec =
new
CatMessageCodec
(
);
//bus.registerCodec(catCodec);
bus.registerDefaultCodec
(
Cat.class
, catCodec);
vertx.setPeriodic
(
3000
, (
l) ->
{
final
Cat cat =
new
Cat
(
);
...
bus.send
(
"object.address"
, cat);
}
);
Le plus difficile est fait. On peut maintenant recevoir directement l'objet du domaine :
bus.<
Cat>
consumer
(
"object.address"
, message ->
{
final
Cat cat =
message.body
(
);
final
String nickname =
cat.getNickname
(
);
System.out.println
(
"Nickname object: "
+
nickname);
}
);
Il n'est pas nécessaire d'enregistrer de nouveau le codec sur le bus pour consommer les messages puisque l'émetteur l'a déjà enregistré.
C'est tout de même plus sympa de travailler avec des objets qu'avec un JSON…
Ces exemples partent du principe que l'émetteur et le consommateur vont travailler avec les mêmes objets, ce qui n'est absolument pas nécessaire et, surtout, ce qui a peu de chances de vraiment se produire. Pour l'illustrer, disons qu'on continue d'envoyer des Chats (Cat) ; mais qu'on ne souhaite recevoir que des animaux (Animal). Créons un bean très simple :
public
class
Animal {
private
String name;
...
Créons aussi le codec adapté :
public
class
CatAnimalMessageCodec implements
MessageCodec<
Cat, Animal>
{
@Override
public
void
encodeToWire
(
Buffer buffer, Cat cat) {
// Easiest ways is using JSON object
final
JsonObject jsonToEncode =
new
JsonObject
(
);
jsonToEncode.put
(
"name"
, cat.getNickname
(
));
...
}
@Override
public
Animal decodeFromWire
(
int
position, Buffer buffer) {
...
// Get fields
final
String name =
contentJson.getString
(
"name"
);
// We can finally create the animal object
final
Animal animal =
new
Animal
(
);
animal.setName
(
name);
return
animal;
}
@Override
public
Animal transform
(
Cat cat) {
final
Animal animal =
new
Animal
(
);
animal.setName
(
cat.getNickname
(
));
return
animal;
}
Il ne reste plus qu'à envoyer en utilisant ce nouveau codec :
final
CatAnimalMessageCodec catAnimalCodec =
new
CatAnimalMessageCodec
(
);
bus.registerCodec
(
catAnimalCodec);
final
DeliveryOptions options2 =
new
DeliveryOptions
(
).setCodecName
(
catAnimalCodec.name
(
));
final
Cat cat =
new
Cat
(
..);
bus.send
(
"object2.address"
, cat, options2);
Lors de la réception, on peut désormais utiliser directement l'autre objet :
bus.<
Animal>
consumer
(
"object2.address"
, message ->
{
final
Animal animal =
message.body
(
);
final
String name =
animal.getName
(
);
System.out.println
(
"Name object: "
+
name);
}
);
VII. Cluster▲
Jusque là, tous les verticles étaient lancés depuis la même classe main, et donc dans la même JVM. Or on imagine bien que ce n'est pas ce que vous avez envie d'envoyer en prod. Ce serait mieux de les lancer séparément. Je vous propose la répartition suivante. DogVerticle et CatVerticle restent dans AnimalLauncher. DoctorVerticle va dans DoctorLauncher qu'on crée pour l'occasion. Enfin, ShopVerticle va dans ShopLauncher qu'on crée également pour l'occasion. Voici ce que ça donne :
public
class
AnimalLauncher {
public
static
void
main
(
String[] args) throws
InterruptedException {
System.out.println
(
"Start of Animal Launcher"
);
final
Vertx vertx =
Vertx.vertx
(
);
// Dog
final
DeploymentOptions dogOptions =
new
DeploymentOptions
(
) //
.setInstances
(
5
);
vertx.deployVerticle
(
DogVerticle.class
.getName
(
), dogOptions);
// Cat
vertx.deployVerticle
(
CatVerticle.class
.getName
(
));
System.out.println
(
"End of Animal Launcher"
);
}
}
public
class
DoctorLauncher {
public
static
void
main
(
String[] args) {
System.out.println
(
"Start of Doctor Launcher"
);
final
Vertx vertx =
Vertx.vertx
(
);
final
DeploymentOptions doctorOptions =
new
DeploymentOptions
(
) //
.setInstances
(
2
);
vertx.deployVerticle
(
DoctorVerticle.class
.getName
(
), doctorOptions);
System.out.println
(
"End of Doctor Launcher"
);
}
}
public
class
ShopLauncher {
public
static
void
main
(
String[] args) {
System.out.println
(
"Start of Shop Launcher"
);
final
Vertx vertx =
Vertx.vertx
(
);
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
));
System.out.println
(
"End of Shop Launcher"
);
}
}
C'est tout de suite plus lisible. Bien entendu, on va les lancer dans le bon ordre : Shop, puis Doctor, puis Animal. Voici ce qu'on voit apparaître dans les trois consoles une fois que tout est lancé (en respectant les temps de pause) :
Start of Shop Launcher
End of Shop Launcher
Start of Shop Verticle
Start of Doctor Launcher
End of Doctor Launcher
Start of Doctor Verticle
Start of Doctor Verticle
Start of Animal Launcher
End of Animal Launcher
Start of Dog Verticle
Start of Cat Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
I do
not like dogs
I do
not like dogs
I do
not like dogs
I do
not like dogs
I do
not like dogs
It failed: No handlers for
address MARKET
It failed: No handlers for
address MARKET
It failed: No handlers for
address MARKET
It failed: No handlers for
address MARKET
It failed: No handlers for
address MARKET
It failed: No handlers for
address MARKET
It failed: No handlers for
address MARKET
It failed: No handlers for
address MARKET
It failed: No handlers for
address MARKET
It failed: No handlers for
address MARKET
...
On voit tout de suite que quelque chose cloche. Il n'y a que les messages échangés entre DogVerticle et CatVerticle qui continuent de fonctionner. Les autres messages n'arrivent pas et voici pourquoi. Chaque appel à Vertx.vertx() crée un nouveau bus. Or les différents bus, lancés dans des JVM distinctes, ne sont pas reliés.
Il faut donc passer les bus en mode cluster. La première chose que je vous invite à faire est d'ajouter une dépendance à « vertx-hazelcast » dans votre pom.xml :
<dependency>
<groupId>
io.vertx</groupId>
<artifactId>
vertx-hazelcast</artifactId>
<version>
${vertx.version}</version>
</dependency>
Je peux vous proposer deux façons de traiter la suite, chacune ayant ses avantages et ses inconvénients.
VII-A. Programmatiquement▲
Le plus simple, à ce niveau est de demander à Vert.x de nous fournir un bus en mode cluster :
public
class
ShopLauncher {
public
static
void
main
(
String[] args) {
System.out.println
(
"Start of Shop Launcher"
);
// final Vertx vertx = Vertx.vertx();
final
ClusterManager mgr =
new
HazelcastClusterManager
(
);
final
VertxOptions options =
new
VertxOptions
(
).setClusterManager
(
mgr);
Vertx.clusteredVertx
(
options, res ->
{
if
(
res.succeeded
(
)) {
System.out.println
(
"res ok shop"
);
final
Vertx vertx =
res.result
(
);
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
));
}
else
{
System.out.println
(
"FAIL !!!"
);
}
}
);
System.out.println
(
"End of Shop Launcher"
);
}
}
Faites de même avec les autres launchers :
public
class
DoctorLauncher {
public
static
void
main
(
String[] args) {
System.out.println
(
"Start of Doctor Launcher"
);
// final Vertx vertx = Vertx.vertx();
final
DeploymentOptions doctorOptions =
new
DeploymentOptions
(
) //
.setInstances
(
2
);
final
ClusterManager mgr =
new
HazelcastClusterManager
(
);
final
VertxOptions options =
new
VertxOptions
(
).setClusterManager
(
mgr);
Vertx.clusteredVertx
(
options, res ->
{
if
(
res.succeeded
(
)) {
System.out.println
(
"res ok doctor"
);
final
Vertx vertx =
res.result
(
);
vertx.deployVerticle
(
DoctorVerticle.class
.getName
(
), doctorOptions);
}
else
{
System.out.println
(
"FAIL !!!"
);
}
}
);
System.out.println
(
"End of Doctor Launcher"
);
}
}
public
class
AnimalLauncher {
public
static
void
main
(
String[] args) throws
InterruptedException {
System.out.println
(
"Start of Animal Launcher"
);
final
ClusterManager mgr =
new
HazelcastClusterManager
(
);
final
DeploymentOptions dogOptions =
new
DeploymentOptions
(
) //
.setInstances
(
5
);
final
VertxOptions options =
new
VertxOptions
(
).setClusterManager
(
mgr);
Vertx.clusteredVertx
(
options, res ->
{
if
(
res.succeeded
(
)) {
System.out.println
(
"res ok animal"
);
final
Vertx vertx =
res.result
(
);
vertx.deployVerticle
(
DogVerticle.class
.getName
(
), dogOptions);
vertx.deployVerticle
(
CatVerticle.class
.getName
(
));
}
else
{
System.out.println
(
"FAIL !!!"
);
}
}
);
System.out.println
(
"End of Animal Launcher"
);
}
}
Vous avez également besoin de configurer Hazelcast (dont on parlera dans un prochain article) à l'aide du fichier « cluster.xml ». Prenez celui qui est fourni dans le zip associé à cet article. Il fera largement l'affaire. Voyons ce que ça donne quand on relance.
Ici on utilise Hazelcast en tant que Cluster manager. Hazelcast est bien plus que ça. On aurait aussi pu utiliser jGroup ou Ignite.
Start of Shop Launcher
End of Shop Launcher
avr. 28
, 2016
12
:24
:57
PM com.hazelcast.config.AbstractXmlConfigHelper
AVERTISSEMENT: Name of the hazelcast schema location incorrect using default
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL
] [dev] [3
.5
.2
] Interfaces is enabled, trying to pick one address matching to one of: [127
.0
.0
.1
]
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL
] [dev] [3
.5
.2
] Picked Address[127
.0
.0
.1
]:5701
, using socket ServerSocket[addr
=
/0
:0
:0
:0
:0
:0
:0
:0
,localport
=
5701
], bind any local
is true
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.spi.OperationService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Backpressure is disabled
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.spi.impl.operationexecutor.classic.ClassicOperationExecutor
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Starting with 4
generic operation threads and 8
partition operation threads.
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.system
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Hazelcast 3
.5
.2
(
20150826
- ba8dbba) starting at Address[127
.0
.0
.1
]:5701
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.system
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Copyright (
c) 2008
-2015
, Hazelcast, Inc. All Rights Reserved.
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.instance.Node
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Creating TcpIpJoiner
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.core.LifecycleService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Address[127
.0
.0
.1
]:5701
is STARTING
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Connecting to /127
.0
.0
.1
:5703
, timeout: 0
, bind-any: true
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Connecting to /127
.0
.0
.1
:5702
, timeout: 0
, bind-any: true
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Could not connect to: /127
.0
.0
.1
:5703
. Reason: SocketException[Connection refused to address /127
.0
.0
.1
:5703
]
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Could not connect to: /127
.0
.0
.1
:5702
. Reason: SocketException[Connection refused to address /127
.0
.0
.1
:5702
]
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.cluster.impl.TcpIpJoiner
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Address[127
.0
.0
.1
]:5703
is added to the blacklist.
avr. 28
, 2016
12
:24
:58
PM com.hazelcast.cluster.impl.TcpIpJoiner
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Address[127
.0
.0
.1
]:5702
is added to the blacklist.
avr. 28
, 2016
12
:24
:59
PM com.hazelcast.cluster.impl.TcpIpJoiner
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
]
Members [1
] {
Member [127
.0
.0
.1
]:5701
this
}
avr. 28
, 2016
12
:24
:59
PM com.hazelcast.core.LifecycleService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Address[127
.0
.0
.1
]:5701
is STARTED
avr. 28
, 2016
12
:24
:59
PM com.hazelcast.partition.InternalPartitionService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Initializing cluster partition table first arrangement...
res ok shop
Start of Shop Verticle
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.nio.tcp.SocketAcceptor
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Accepting socket connection from /127
.0
.0
.1
:62836
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Established socket connection between /127
.0
.0
.1
:5701
avr. 28
, 2016
12
:25
:09
PM com.hazelcast.cluster.ClusterService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
]
Members [2
] {
Member [127
.0
.0
.1
]:5701
this
Member [127
.0
.0
.1
]:5702
}
avr. 28
, 2016
12
:25
:09
PM com.hazelcast.partition.InternalPartitionService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Re-partitioning cluster data... Migration queue size: 135
avr. 28
, 2016
12
:25
:10
PM com.hazelcast.partition.InternalPartitionService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] All migration tasks have been completed, queues are empty.
Incoming message: I am the Doctor
Incoming message: I am the Doctor
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.nio.tcp.SocketAcceptor
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Accepting socket connection from /127
.0
.0
.1
:62849
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Established socket connection between /127
.0
.0
.1
:5701
avr. 28
, 2016
12
:25
:17
PM com.hazelcast.cluster.ClusterService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
]
Members [3
] {
Member [127
.0
.0
.1
]:5701
this
Member [127
.0
.0
.1
]:5702
Member [127
.0
.0
.1
]:5703
}
avr. 28
, 2016
12
:25
:17
PM com.hazelcast.partition.InternalPartitionService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] Re-partitioning cluster data... Migration queue size: 90
avr. 28
, 2016
12
:25
:19
PM com.hazelcast.partition.InternalPartitionService
INFOS: [127
.0
.0
.1
]:5701
[dev] [3
.5
.2
] All migration tasks have been completed, queues are empty.
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: A new cat on the roofs...
Msg: Dog power wouaf
avr. 28
, 2016
12
:25
:24
PM io.vertx.core.impl.ContextImpl
GRAVE: Unhandled exception
java.lang.IllegalStateException: No message codec registered with name CatMessageCodec
at io.vertx.core.eventbus.impl.clustered.ClusteredMessage.readFromWire
(
ClusteredMessage.java:148
)
at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$1
.handle
(
ClusteredEventBus.java:256
)
at io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$1
.handle
(
ClusteredEventBus.java:248
)
at io.vertx.core.parsetools.impl.RecordParserImpl.parseFixed
(
RecordParserImpl.java:203
)
Start of Doctor Launcher
End of Doctor Launcher
avr. 28
, 2016
12
:25
:07
PM com.hazelcast.config.AbstractXmlConfigHelper
AVERTISSEMENT: Name of the hazelcast schema location incorrect using default
avr. 28
, 2016
12
:25
:07
PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL
] [dev] [3
.5
.2
] Interfaces is enabled, trying to pick one address matching to one of: [127
.0
.0
.1
]
avr. 28
, 2016
12
:25
:07
PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL
] [dev] [3
.5
.2
] Picked Address[127
.0
.0
.1
]:5702
, using socket ServerSocket[addr
=
/0
:0
:0
:0
:0
:0
:0
:0
,localport
=
5702
], bind any local
is true
avr. 28
, 2016
12
:25
:07
PM com.hazelcast.spi.OperationService
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Backpressure is disabled
avr. 28
, 2016
12
:25
:07
PM com.hazelcast.spi.impl.operationexecutor.classic.ClassicOperationExecutor
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Starting with 4
generic operation threads and 8
partition operation threads.
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.system
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Hazelcast 3
.5
.2
(
20150826
- ba8dbba) starting at Address[127
.0
.0
.1
]:5702
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.system
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Copyright (
c) 2008
-2015
, Hazelcast, Inc. All Rights Reserved.
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.instance.Node
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Creating TcpIpJoiner
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.core.LifecycleService
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Address[127
.0
.0
.1
]:5702
is STARTING
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Connecting to /127
.0
.0
.1
:5701
, timeout: 0
, bind-any: true
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Connecting to /127
.0
.0
.1
:5703
, timeout: 0
, bind-any: true
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Could not connect to: /127
.0
.0
.1
:5703
. Reason: SocketException[Connection refused to address /127
.0
.0
.1
:5703
]
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.cluster.impl.TcpIpJoiner
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Address[127
.0
.0
.1
]:5703
is added to the blacklist.
avr. 28
, 2016
12
:25
:08
PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Established socket connection between /127
.0
.0
.1
:62836
avr. 28
, 2016
12
:25
:09
PM com.hazelcast.cluster.ClusterService
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
]
Members [2
] {
Member [127
.0
.0
.1
]:5701
Member [127
.0
.0
.1
]:5702
this
}
avr. 28
, 2016
12
:25
:11
PM com.hazelcast.core.LifecycleService
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Address[127
.0
.0
.1
]:5702
is STARTED
res ok doctor
Start of Doctor Verticle
Start of Doctor Verticle
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.nio.tcp.SocketAcceptor
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Accepting socket connection from /127
.0
.0
.1
:62848
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
] Established socket connection between /127
.0
.0
.1
:5702
avr. 28
, 2016
12
:25
:17
PM com.hazelcast.cluster.ClusterService
INFOS: [127
.0
.0
.1
]:5702
[dev] [3
.5
.2
]
Members [3
] {
Member [127
.0
.0
.1
]:5701
Member [127
.0
.0
.1
]:5702
this
Member [127
.0
.0
.1
]:5703
}
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Dog event: Grrr
Start of Animal Launcher
End of Animal Launcher
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.config.AbstractXmlConfigHelper
AVERTISSEMENT: Name of the hazelcast schema location incorrect using default
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL
] [dev] [3
.5
.2
] Interfaces is enabled, trying to pick one address matching to one of: [127
.0
.0
.1
]
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.instance.DefaultAddressPicker
INFOS: [LOCAL
] [dev] [3
.5
.2
] Picked Address[127
.0
.0
.1
]:5703
, using socket ServerSocket[addr
=
/0
:0
:0
:0
:0
:0
:0
:0
,localport
=
5703
], bind any local
is true
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.spi.OperationService
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Backpressure is disabled
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.spi.impl.operationexecutor.classic.ClassicOperationExecutor
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Starting with 4
generic operation threads and 8
partition operation threads.
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.system
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Hazelcast 3
.5
.2
(
20150826
- ba8dbba) starting at Address[127
.0
.0
.1
]:5703
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.system
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Copyright (
c) 2008
-2015
, Hazelcast, Inc. All Rights Reserved.
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.instance.Node
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Creating TcpIpJoiner
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.core.LifecycleService
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Address[127
.0
.0
.1
]:5703
is STARTING
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Connecting to /127
.0
.0
.1
:5702
, timeout: 0
, bind-any: true
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.nio.tcp.SocketConnector
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Connecting to /127
.0
.0
.1
:5701
, timeout: 0
, bind-any: true
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Established socket connection between /127
.0
.0
.1
:62848
avr. 28
, 2016
12
:25
:16
PM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Established socket connection between /127
.0
.0
.1
:62849
avr. 28
, 2016
12
:25
:17
PM com.hazelcast.cluster.ClusterService
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
]
Members [3
] {
Member [127
.0
.0
.1
]:5701
Member [127
.0
.0
.1
]:5702
Member [127
.0
.0
.1
]:5703
this
}
avr. 28
, 2016
12
:25
:19
PM com.hazelcast.core.LifecycleService
INFOS: [127
.0
.0
.1
]:5703
[dev] [3
.5
.2
] Address[127
.0
.0
.1
]:5703
is STARTED
res ok animal
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Dog Verticle
Start of Cat Verticle
I do
not like dogs
I do
not like dogs
I do
not like dogs
I do
not like dogs
I do
not like dogs
Received number: Hi 1
: 23
Received number: Hi 4
: 88
Received number: Hi 2
: 23
Received number: Hi 5
: 62
It failed: Failed because 3
is a multiple of 3
.
C'est tout de suite plus bavard. Si vous connaissez déjà HazelcastSite Web d'Hazelcast, vous reconnaissez la structure de ces logs. Ici, il y a deux choses à voir en plus du fonctionnement de nos verticles. La première, c'est que Hazelcast trouve tout seul les membres du cluster. Dans une configuration d'entreprise, on pourra affiner ce comportement.
La deuxième chose est que les codecs ne sont pas partagés sur le bus en cluster. Il faut donc changer le verticle pour que le codec soit chargé des deux côtés :
public
class
ShopVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
...
final
CatMessageCodec catCodec =
new
CatMessageCodec
(
);
final
CatAnimalMessageCodec catAnimalCodec =
new
CatAnimalMessageCodec
(
);
bus.registerCodec
(
catCodec);
bus.registerCodec
(
catAnimalCodec);
...
Tout devrait être bon dans la console :
...
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: I am ready
Incoming message: A new cat on the roofs...
Incoming message: I am ready
Msg: Dog power wouaf
Msg: Dog power wouaf
Nickname object: Kitty-1
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Msg: Dog power wouaf
Name object: Kitty-1
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Nickname object: Kitty-2
Name object: Kitty-2
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Nickname object: Kitty-3
Name object: Kitty-3
Msg: Dog power wouaf
Msg: Dog power wouaf
Msg: Dog power wouaf
Fail fail fail...
Msg: Dog power wouaf
Msg: Dog power wouaf
...
Si un cluster Hazelcast est déjà disponible sur votre plate-forme, vous pouvez bien entendu l'utiliser. Il suffira de bien configurer cluster.xml.
VII-B. Par argument▲
À mon sens, le plus simple et le plus pratique est toutefois de travailler avec des verticles à la place des mains :
public
class
ShopLauncherAsVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Shop Launcher"
);
// final Vertx vertx = Vertx.vertx();
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
));
System.out.println
(
"End of Shop Launcher"
);
}
}
public
class
DoctorLauncherAsVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Doctor Launcher"
);
// final Vertx vertx = Vertx.vertx();
final
DeploymentOptions doctorOptions =
new
DeploymentOptions
(
) //
.setInstances
(
2
);
vertx.deployVerticle
(
DoctorVerticle.class
.getName
(
), doctorOptions);
System.out.println
(
"End of Doctor Launcher"
);
}
}
public
class
AnimalLauncherAsVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Animal Launcher"
);
// final Vertx vertx = Vertx.vertx();
final
DeploymentOptions dogOptions =
new
DeploymentOptions
(
) //
.setInstances
(
5
);
vertx.deployVerticle
(
DogVerticle.class
.getName
(
), dogOptions);
vertx.deployVerticle
(
CatVerticle.class
.getName
(
));
System.out.println
(
"End of Animal Launcher"
);
}
}
Il suffit ensuite de lancer ces verticles avec l'option « -cluster » pour que tout fonctionne. N'oubliez pas d'ajouter les codecs et cluster.xml…
On peut également utiliser l'argument « -ha », pour « high availability », qui indique à Vert.x de relancer les verticles en cas de mort. Dans ce cas, il n'est pas utile d'ajouter « -cluster ».
Si vous lancez les verticles depuis Eclipse (ou un autre IDE), utilisez la classe « io.vertx.core.Launcher » et indiquez « run com.masociete.tutovertx.ShopLauncherAsVerticle -cluster » comme argument de programme.
La classe « Starter » que je vous avais indiquée à l'occasion du tutoriel précédent est désormais dépréciée (Deprecated) au profit de « Launcher ».
VIII. Conclusions▲
Le sujet de cet article peut sembler simple, voire trivial, mais il ne l'est clairement pas. En arrivant jusqu'à cette conclusion, vous avez déjà fait un pas de géant dans votre découverte de Vert.x (et pas seulement). Vous savez maintenant comment envoyer (en point à point) et comment publier des messages sur le bus. Vous savez que le bus peut être local ou distribué (clusterisé), notamment grâce à Hazelcast. Vous savez comment écouter sur le bus et répondre de manière asynchrone et non bloquante. Vous ne savez pas seulement comment envoyer des messages simples, mais également des structures organisées en JSON ainsi que des objets et des grappes.
Dans le prochain article, nous mettrons tout cela en œuvre afin de travailler avec des bases de données. Nous verrons que certains drivers, comme celui de Cassandra, sont asynchrones, tandis que d'autres sont bloquants, à l'image de JDBC.
Vos retours nous aident à améliorer nos publications. N'hésitez donc pas à commenter cet article sur le forum. 3 commentaires
IX. Remerciements▲
D'abord, j'adresse mes remerciements à l'équipe Vert.x et aux créateurs de modules complémentaires, pour avoir développé une bibliothèque aussi utile et pour la maintenir. Je n'oublie pas tous les contributeurs qui participent, notamment sur le forum.
Je souhaite également remercier Clément Escoffier qui a volontiers accepté de relire cet article. Enfin, je tiens à remercier l'équipe de Developpez.com et plus particulièrement à Logan Mauzaize, Mickael Baron et Claude Leloup.
X. Annexes▲
X-A. Liens▲
Vert.x : http://vertx.io/
Hazelcast : http://hazelcast.org/
Mes autres articles : https://thierry-leriche-dessirier.developpez.com/
X-B. Codes complets▲
package
com.masociete.tutovertx;
import
io.vertx.core.AsyncResult;
import
io.vertx.core.DeploymentOptions;
import
io.vertx.core.Handler;
import
io.vertx.core.Vertx;
public
class
AnimalLauncherAllInOne {
public
static
void
main
(
String[] args) throws
InterruptedException {
System.out.println
(
"Start of Animal Launcher"
);
final
DeploymentOptions doctorOptions =
new
DeploymentOptions
(
) //
.setInstances
(
2
);
final
DeploymentOptions dogOptions =
new
DeploymentOptions
(
) //
.setInstances
(
5
);
final
Vertx vertx =
Vertx.vertx
(
);
final
Handler<
AsyncResult<
String>>
doctorCompletionHandler =
dar ->
{
System.out.println
(
"Doctor Verticle Deployed"
);
vertx.deployVerticle
(
DogVerticle.class
.getName
(
), dogOptions);
vertx.deployVerticle
(
CatVerticle.class
.getName
(
));
}
;
final
Handler<
AsyncResult<
String>>
shopCompletionHandler =
sar ->
{
System.out.println
(
"Shop Verticle Deployed"
);
vertx.deployVerticle
(
DoctorVerticle.class
.getName
(
), doctorOptions, doctorCompletionHandler);
}
;
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
), shopCompletionHandler);
System.out.println
(
"End of Animal Launcher"
);
}
}
package
com.masociete.tutovertx;
import
io.vertx.core.DeploymentOptions;
import
io.vertx.core.Vertx;
import
io.vertx.core.VertxOptions;
import
io.vertx.core.spi.cluster.ClusterManager;
import
io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
public
class
AnimalLauncher {
public
static
void
main
(
String[] args) throws
InterruptedException {
System.out.println
(
"Start of Animal Launcher"
);
final
ClusterManager mgr =
new
HazelcastClusterManager
(
);
final
DeploymentOptions dogOptions =
new
DeploymentOptions
(
) //
.setInstances
(
5
);
final
VertxOptions options =
new
VertxOptions
(
).setClusterManager
(
mgr);
Vertx.clusteredVertx
(
options, res ->
{
if
(
res.succeeded
(
)) {
System.out.println
(
"res ok animal"
);
final
Vertx vertx =
res.result
(
);
vertx.deployVerticle
(
DogVerticle.class
.getName
(
), dogOptions);
vertx.deployVerticle
(
CatVerticle.class
.getName
(
));
}
else
{
System.out.println
(
"FAIL !!!"
);
}
}
);
System.out.println
(
"End of Animal Launcher"
);
}
}
package
com.masociete.tutovertx;
import
io.vertx.core.DeploymentOptions;
import
io.vertx.core.Vertx;
import
io.vertx.core.VertxOptions;
import
io.vertx.core.spi.cluster.ClusterManager;
import
io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
public
class
DoctorLauncher {
public
static
void
main
(
String[] args) {
System.out.println
(
"Start of Doctor Launcher"
);
final
DeploymentOptions doctorOptions =
new
DeploymentOptions
(
) //
.setInstances
(
2
);
final
ClusterManager mgr =
new
HazelcastClusterManager
(
);
final
VertxOptions options =
new
VertxOptions
(
).setClusterManager
(
mgr);
Vertx.clusteredVertx
(
options, res ->
{
if
(
res.succeeded
(
)) {
System.out.println
(
"res ok doctor"
);
final
Vertx vertx =
res.result
(
);
vertx.deployVerticle
(
DoctorVerticle.class
.getName
(
), doctorOptions);
}
else
{
System.out.println
(
"FAIL !!!"
);
}
}
);
System.out.println
(
"End of Doctor Launcher"
);
}
}
package
com.masociete.tutovertx;
import
io.vertx.core.Vertx;
import
io.vertx.core.VertxOptions;
import
io.vertx.core.spi.cluster.ClusterManager;
import
io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
public
class
ShopLauncher {
public
static
void
main
(
String[] args) {
System.out.println
(
"Start of Shop Launcher"
);
final
ClusterManager mgr =
new
HazelcastClusterManager
(
);
final
VertxOptions options =
new
VertxOptions
(
).setClusterManager
(
mgr);
Vertx.clusteredVertx
(
options, res ->
{
if
(
res.succeeded
(
)) {
System.out.println
(
"res ok shop"
);
final
Vertx vertx =
res.result
(
);
vertx.deployVerticle
(
ShopVerticle.class
.getName
(
));
}
else
{
System.out.println
(
"FAIL !!!"
);
}
}
);
System.out.println
(
"End of Shop Launcher"
);
}
}
package
com.masociete.tutovertx;
import
io.vertx.core.AbstractVerticle;
import
io.vertx.core.eventbus.EventBus;
public
class
DogVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Dog Verticle"
);
final
String address =
"HELLO"
;
final
String message =
"I am ready"
;
final
EventBus bus =
vertx.eventBus
(
);
bus.send
(
address, message);
Thread.sleep
(
1000
);
//bus.send("dog.address", "Grrr");
bus.publish
(
"dog.address"
, "Grrr"
);
final
int
period =
3000
;
final
String address2 =
"MARKET"
;
final
String message2 =
"Dog power wouaf"
;
vertx.setPeriodic
(
period, (
l) ->
bus.send
(
address2, message2, ar ->
{
if
(
ar.succeeded
(
)) {
System.out.println
(
"Received number: "
+
ar.result
(
).body
(
));
}
else
if
(
ar.failed
(
)) {
System.out.println
(
"It failed: "
+
ar.cause
(
).getMessage
(
));
}
}
));
}
@Override
public
void
stop
(
) throws
Exception {
System.out.println
(
"Stop of Dog Verticle"
);
}
}
package
com.masociete.tutovertx;
import
java.util.concurrent.atomic.AtomicInteger;
import
com.masociete.tutovertx.domain.Cat;
import
io.vertx.core.AbstractVerticle;
import
io.vertx.core.eventbus.DeliveryOptions;
import
io.vertx.core.eventbus.EventBus;
import
io.vertx.core.json.JsonObject;
public
class
CatVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Cat Verticle"
);
final
String message =
"A new cat on the roofs..."
;
final
EventBus bus =
vertx.eventBus
(
);
bus.send
(
"HELLO"
, message);
final
String dogAddress =
"dog.address"
;
bus.consumer
(
dogAddress, msg ->
{
System.out.println
(
"I do not like dogs"
);
}
);
Thread.sleep
(
1000
);
final
CatMessageCodec catCodec =
new
CatMessageCodec
(
);
final
CatAnimalMessageCodec catAnimalCodec =
new
CatAnimalMessageCodec
(
);
try
{
bus.registerCodec
(
catCodec);
}
catch
(
IllegalStateException e) {
System.out.println
(
"Codec allreday registered..."
);
}
try
{
bus.registerCodec
(
catAnimalCodec);
}
catch
(
IllegalStateException e) {
System.out.println
(
"Codec allreday registered..."
);
}
// bus.registerDefaultCodec(Cat.class, catCodec);
final
DeliveryOptions options =
new
DeliveryOptions
(
).setCodecName
(
catCodec.name
(
));
final
DeliveryOptions options2 =
new
DeliveryOptions
(
).setCodecName
(
catAnimalCodec.name
(
));
final
AtomicInteger ai =
new
AtomicInteger
(
0
);
vertx.setPeriodic
(
3000
, (
l) ->
{
final
Cat cat =
new
Cat
(
);
cat.setNickname
(
"Kitty-"
+
ai.incrementAndGet
(
));
cat.setMilkLover
(
true
);
final
JsonObject json =
new
JsonObject
(
) //
.put
(
"nickname"
, cat.getNickname
(
)) //
.put
(
"milkLover"
, cat.isMilkLover
(
));
// bus.send("json.address", json);
bus.send
(
"object.address"
, cat, options);
bus.send
(
"object2.address"
, cat, options2);
// bus.send("object.address", cat);
}
);
}
@Override
public
void
stop
(
) throws
Exception {
System.out.println
(
"Stop of Cat Verticle"
);
}
}
package
com.masociete.tutovertx;
import
io.vertx.core.AbstractVerticle;
import
io.vertx.core.eventbus.EventBus;
import
io.vertx.core.eventbus.MessageConsumer;
public
class
DoctorVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Doctor Verticle"
);
final
EventBus bus =
vertx.eventBus
(
);
bus.send
(
"HELLO"
, "I am the Doctor"
);
final
String dogAddress =
"dog.address"
;
final
MessageConsumer<
String>
consumer =
bus.consumer
(
dogAddress);
consumer.handler
(
message ->
{
System.out.println
(
"Dog event: "
+
message.body
(
));
}
);
}
@Override
public
void
stop
(
) throws
Exception {
System.out.println
(
"Stop of Doctor Verticle"
);
}
}
package
com.masociete.tutovertx;
import
java.util.Random;
import
java.util.concurrent.atomic.AtomicInteger;
import
com.masociete.tutovertx.domain.Animal;
import
com.masociete.tutovertx.domain.Cat;
import
io.vertx.core.AbstractVerticle;
import
io.vertx.core.eventbus.EventBus;
import
io.vertx.core.eventbus.MessageConsumer;
import
io.vertx.core.json.JsonObject;
public
class
ShopVerticle extends
AbstractVerticle {
@Override
public
void
start
(
) throws
Exception {
System.out.println
(
"Start of Shop Verticle"
);
final
String address =
"HELLO"
;
final
EventBus bus =
vertx.eventBus
(
);
final
CatMessageCodec catCodec =
new
CatMessageCodec
(
);
final
CatAnimalMessageCodec catAnimalCodec =
new
CatAnimalMessageCodec
(
);
bus.registerCodec
(
catCodec);
bus.registerCodec
(
catAnimalCodec);
final
MessageConsumer<
String>
consumer =
bus.consumer
(
address);
consumer.handler
(
message ->
{
System.out.println
(
"Incoming message: "
+
message.body
(
));
}
);
final
String address2 =
"MARKET"
;
final
Random random =
new
Random
(
);
final
AtomicInteger ai =
new
AtomicInteger
(
0
);
bus.consumer
(
address2, message ->
{
System.out.println
(
"Msg: "
+
message.body
(
));
final
int
index =
ai.incrementAndGet
(
);
if
(
index %
3
!=
0
) {
message.reply
(
"Hi "
+
index +
" : "
+
random.nextInt
(
99
));
}
else
{
System.out.println
(
"Fail fail fail..."
);
message.fail
(
123
, "Failed because "
+
index +
" is a multiple of 3."
);
}
}
);
bus.<
JsonObject>
consumer
(
"json.address"
, message ->
{
final
JsonObject body =
message.body
(
);
final
String nickname =
body.getString
(
"nickname"
);
System.out.println
(
"Nickname json: "
+
nickname);
}
);
bus.<
Cat>
consumer
(
"object.address"
, message ->
{
final
Cat cat =
message.body
(
);
final
String nickname =
cat.getNickname
(
);
System.out.println
(
"Nickname object: "
+
nickname);
}
);
bus.<
Animal>
consumer
(
"object2.address"
, message ->
{
final
Animal animal =
message.body
(
);
final
String name =
animal.getName
(
);
System.out.println
(
"Name object: "
+
name);
}
);
}
@Override
public
void
stop
(
) throws
Exception {
System.out.println
(
"Stop of Shop Verticle"
);
}
}
package
com.masociete.tutovertx;
import
com.masociete.tutovertx.domain.Cat;
import
io.vertx.core.buffer.Buffer;
import
io.vertx.core.eventbus.MessageCodec;
import
io.vertx.core.json.JsonObject;
public
class
CatMessageCodec implements
MessageCodec<
Cat, Cat>
{
@Override
public
void
encodeToWire
(
Buffer buffer, Cat cat) {
// Easiest ways is using JSON object
final
JsonObject jsonToEncode =
new
JsonObject
(
);
jsonToEncode.put
(
"nickname"
, cat.getNickname
(
));
jsonToEncode.put
(
"milkLover"
, cat.isMilkLover
(
));
// Encode object to string
final
String jsonToStr =
jsonToEncode.encode
(
);
// Length of JSON: is NOT characters count
final
int
length =
jsonToStr.getBytes
(
).length;
// Write data into given buffer
buffer.appendInt
(
length);
buffer.appendString
(
jsonToStr);
}
@Override
public
Cat decodeFromWire
(
int
position, Buffer buffer) {
// My custom message starting from this *position* of buffer
int
_pos =
position;
// Length of JSON
final
int
length =
buffer.getInt
(
_pos);
// Get JSON string by it`s length
// Jump 4 because getInt() == 4 bytes
final
String jsonStr =
buffer.getString
(
_pos +=
4
, _pos +=
length);
final
JsonObject contentJson =
new
JsonObject
(
jsonStr);
// Get fields
final
String nickname =
contentJson.getString
(
"nickname"
);
final
boolean
milkLover =
contentJson.getBoolean
(
"milkLover"
);
// We can finally create the cat object
final
Cat cat =
new
Cat
(
);
cat.setNickname
(
nickname);
cat.setMilkLover
(
milkLover);
return
cat;
}
@Override
public
Cat transform
(
Cat cat) {
return
cat;
}
@Override
public
String name
(
) {
// Each codec must have a unique name.
// This is used to identify a codec when sending a message and for unregistering codecs.
return
this
.getClass
(
).getSimpleName
(
);
}
@Override
public
byte
systemCodecID
(
) {
// Always -1
return
-
1
;
}
}