The message sender we will demonstrate is based on the HTTP server implementation we demonstrated in the Microservices section. It is not necessary to do it this way. In real-life code, a vertical typically sends messages automatically, either to get data it needs, to provide data some other vertical wants, to notify another vertical, to store data in the database, or for any other reason. But for demonstration purposes, we have decided that the sender will listen to some port for messages, and we will send it messages manually (using the curl command) or automatically, via some periodic service described in the Microservices section. That is why the message sender looks a bit more complex than the message consumer:
package com.packt.javapath.ch18demo.reactivesystem;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.http.HttpServer;
public class EventBusSend extends AbstractVerticle {
private int port;
private String address, name;
public EventBusSend(int port, String address) {
this.port = port;
this.address = address;
this.name = this.getClass().getSimpleName() +
"(port " + port + ", send to " + address + ")";
}
public void start() throws Exception {
System.out.println(name + " starts...");
HttpServer server = vertx.createHttpServer();
server.requestStream().toObservable().subscribe(request -> {
String msg = request.getParam("msg");
request.response().setStatusCode(200).end();
vertx.eventBus().rxSend(address, msg).subscribe(reply -> {
System.out.println(name + " got reply:\n " + reply.body());
},
e -> {
if(StringUtils.contains(e.toString(), "NO_HANDLERS")){
vertx.undeploy(deploymentID());
System.out.println(name + " undeployed.");
} else {
e.printStackTrace();
}
});
});
server.rxListen(port).subscribe();
System.out.println(Thread.currentThread().getName()
+ " is waiting on port " + port + "...");
}
}
Most of the preceding code is related to the HTTP server functionality. The few lines that send the message (received by the HTTP server) are these ones:
vertx.eventBus().rxSend(address, msg).subscribe(reply -> {
System.out.println(name + " got reply:\n " + reply.body());
}, e -> {
if(StringUtils.contains(e.toString(), "NO_HANDLERS")){
vertx.undeploy(deploymentID());
System.out.println(name + " undeployed.");
} else {
e.printStackTrace();
}
});
After the message is sent, the sender subscribes to the possible reply and prints it (if the reply was received). If an error happens (an exception is thrown during message sending), we can check whether the exception (converted to the String value) contains literal NO_HANDLERS, and un-deploy the sender if so. It took us a while to figure out how to identify the case when there are no consumers assigned to the address, for which this sender is dedicated to sending messages. If there are no consumers (all are un-deployed, most likely), then there is no need for the sender, so we un-deploy it.
It is a good practice to clean up and un-deploy all of the verticles that are not needed anymore. But if you run the verticles in IDE, chances are, all the verticles are stopped as soon as you stop the main process (that has created the verticles) in your IDE. If not, run the jcmd command and see whether there are still Vert.x verticles running. The first number for each of the listed processes is the process ID. Identify the verticles that you do not need anymore and use the kill -9 <process ID> command to stop them.
Now, let's deploy two message consumers, and send them messages via our message sender:
String address = "One";
Vertx vertx = vertx();
RxHelper.deployVerticle(vertx, new MsgConsumer("1",address));
RxHelper.deployVerticle(vertx, new MsgConsumer("2",address));
RxHelper.deployVerticle(vertx, new EventBusSend(8082, address));
After you run the preceding code, the Terminal shows the following messages:
MsgConsumer(1,One) starts...
MsgConsumer(2,One) starts...
EventBusSend(port 8082, send to One) starts...
vert.x-eventloop-thread-1 is waiting on address One...
vert.x-eventloop-thread-0 is waiting on address One...
vert.x-eventloop-thread-2 is waiting on port 8082...
Notice the different event loops running to support each verticle.
Now, let's send a few messages, using the following commands from a terminal window:
curl localhost:8082?msg=Hello!
curl localhost:8082?msg=Hi!
curl localhost:8082?msg=How+are+you?
curl localhost:8082?msg=Just+saying...
The plus sign (+) is necessary because a URL cannot contain spaces and has to be encoded, which means, among other things, replacing spaces with plus sign + or %20. In response to the preceding commands, we will see the following messages:
MsgConsumer(2,One) got message: Hello!
EventBusSend(port 8082, send to One) got reply:
MsgConsumer(2,One) got message: Hello!
MsgConsumer(1,One) got message: Hi!
EventBusSend(port 8082, send to One) got reply:
MsgConsumer(1,One) got message: Hi!
MsgConsumer(2,One) got message: How are you?
EventBusSend(port 8082, send to One) got reply:
MsgConsumer(2,One) got message: How are you?
MsgConsumer(1,One) got message: Just saying...
EventBusSend(port 8082, send to One) got reply:
MsgConsumer(1,One) got message: Just saying...
As expected, the consumers received messages by turns, according to the round-robin algorithm. Now, let's deploy all of the verticles:
curl localhost:8082?msg=undeploy
curl localhost:8082?msg=undeploy
curl localhost:8082?msg=undeploy
Here are the messages displayed in response to the preceding commands:
MsgConsumer(1,One) got message: undeploy
MsgConsumer(1,One) undeployed.
EventBusSend(port 8082, send to One) got reply:
MsgConsumer(1,One) undeployed.
MsgConsumer(2,One) got message: undeploy
MsgConsumer(2,One) undeployed.
EventBusSend(port 8082, send to One) got reply:
MsgConsumer(2,One) undeployed.
EventBusSend(port 8082, send to One) undeployed.
According to the preceding messages, all of our verticals are un-deployed. If we submit the undeploy message again, we will see:
curl localhost:8082?msg=undeploy
curl: (7) Failed to connect to localhost port 8082: Connection refused
That is because the sender is undeployed, and there is no HTTP server that listens to port 8082 of the localhost.