Building event-driven microservices

In Chapter 9, Asynchronous API Design, we learned about asynchronous programming. An asynchronous API can be achieved by events. The service and client can talk to each other using events. They don't have to wait until one party finishes their job.

An event generator is an entity that generates events. An event consumer consumes the events from other parties. Publish/Subscribe is an architectural pattern that is possible with events. Go Micro supports Publish/Subscribe by using a message broker interface.

See the following diagram to understand the event flow:

A Go Micro client can Subscribe to a topic. A Go microservice can Publish messages into that Topic. Events flow from right to left in this case.

It comes with an inbuilt HTTP message broker and can be easily replaced with widely used message brokers such as RabbitMQ or Kafka. In our discussion, we stick to the default HTTP broker.

We will illustrate Publish/Subscribe with an example. Let's say a microservice should push weather alerts every 5 seconds. Instead of the client calling the service API, the service can publish those changes to a topic where a client can subscribe. The client consumes those alerts and does the processing.

In all the projects we are working on, we should install Go Micro using the dep tool and running the following code:

> dep init
> dep ensure -add "github.com/micro/go-micro"

We are going to create an asyncServer and an asyncClient. The asyncServer generates weather events, and the client consumes them. Let's look at the steps for this example here:

  1. Create a project directory, like this:
> mkdir -p $GOPATH/src/github.com/git-user/chapter11/asyncService
> mkdir $GOPATH/src/github.com/git-user/chapter11/asyncServer/proto

> mkdir -p $GOPATH/src/github.com/git-user/chapter11/asyncClient
> mkdir $GOPATH/src/github.com/git-user/chapter11/asyncClient/proto
  1. Create a weather.proto file in the proto directory of both asyncService and asyncClient. It holds a structure and RPC methods for communication. This file defines an Event that is a weather alert, and the code can be seen as follows:
syntax = "proto3";

// Example message
message Event {
// city name
string city = 1;
// unix timestamp
int64 timestamp = 2;
// temperaure in Celsius
int64 temperature = 3;
}

It has three fields, as follows:

A service should publish this event to a topic called alerts.

  1. Now, compile the .proto file in both the service and client, and get auto-generated Go files, as follows:
> protoc -I=. --micro_out=. --go_out=. proto/weather.proto
For brevity, we are skipping the imports in this example, so please access the chapter repo for the complete code.
  1. Coming to the service, the main.go file should declare a microservice and a publisher. A publisher is created by the micro.NewPublisher method. It takes the topic name alerts and a service.Client() as its arguments, as follows:
func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(
micro.Name("weather"),
)
p := micro.NewPublisher("alerts", service.Client())
...
}
  1. Next, we create a dummy ticker that publishes a weather alert every 15 seconds. We achieve that by using the time.Tick built-in Go method. We spawn a go-routine that loops forever, listens to a tick, and publishes an event into the topic using the publisher.Publish method. The Publish method takes a context object and an event with data as arguments, as can be seen in the following code block:
    go func() {
for now := range time.Tick(15 * time.Second) {
log.Println("Publishing weather alert to Topic: alerts")
p.Publish(context.TODO(), &proto.Event{
City: "Munich",
Timestamp: now.UTC().Unix(),
Temperature: 2,
})
}
}()
  1. After this, finally, we have to run the service by calling the service.Run method, like this:
    // Run the server
if err := service.Run(); err != nil {
log.Println(err)
}
  1. Both service and go-routine run in parallel. When you run this service, you see this output:
> go build && ./asyncService

2019/12/22 21:31:03 log.go:18: Transport [http] Listening on [::]:60243
2019/12/22 21:31:03 log.go:18: Broker [http] Connected to [::]:60244
2019/12/22 21:31:03 log.go:18: Registry [mdns] Registering node: weather-83982bda-5e9e-445b-9ce2-5439d1560d1f
2019-12-22 21:31:18.379616 I | Publishing event to Topic: alerts
2019-12-22 21:31:33.376924 I | Publishing event to Topic: alerts
  1. Now, the service is pushing events, but there is no client to consume them. Let's update the main.go file asyncClient with consuming logic. In the client, we should declare a handler function to process the event. The handler is executed whenever there is an incoming event. It prints out the event in our case, as can be seen here:
// ProcessEvent processes a weather alert
func ProcessEvent(ctx context.Context, event *proto.Event) error {
log.Println("Got alert:", event)
return nil
}
  1. After defining the handler function to process events, we can attach the client with the topic. The micro.RegisterSubscriber function attaches a ProcessEvent handler function to the alerts topic, like this:
func main() {
// Create a new service
service := micro.NewService(micro.Name("weather_client"))
// Initialise the client and parse command line flags
service.Init()
micro.RegisterSubscriber("alerts", service.Server(),
ProcessEvent)

if err := service.Run(); err != nil {
log.Fatal(err)
}
}
  1. If we run this program, it consumes the alerts published by the service we defined previously, as follows:
> go build && ./asyncClient

2019/12/22 21:48:07 log.go:18: Transport [http] Listening on [::]:60445
2019/12/22 21:48:07 log.go:18: Broker [http] Connected to [::]:60446
2019/12/22 21:48:07 log.go:18: Registry [mdns] Registering node: weather_client-73496273-31ca-4bed-84dc-60df07a1570d
2019/12/22 21:48:07 log.go:18: Subscribing weather_client-73496273-31ca-4bed-84dc-60df07a1570d to topic: alerts
2019-12-22 21:48:18.436189 I | Got event: city:"Munich" timestamp:1577047698 temperature:2
2019-12-22 21:48:33.431529 I | Got event: city:"Munich" timestamp:1577047713 temperature:2

This is how asynchronous behavior is achieved in microservices. The border between clients and services can blur, as anyone can publish or subscribe. In a distributed system, services are clients to other services. So, Go Micro provides a lightweight and flexible approach to creating microservices.

In the next section, we will discuss the logging and instrumentation of microservices.