Request Discovered Services and Replicate Logs

Let’s build on our service discovery to add replication in our service so that we store multiple copies of the log data when we have multiple servers in a cluster. Replication makes our service more resilient to failures. For example, if a node’s disk fails and we can’t recover its data, replication can save our butts because it ensures that there’s a copy saved on another disk.

In the next chapter, we’ll coordinate the servers so our replication will have a defined leader-follower relationship, but for now we simply want the servers to replicate each other when they discover each other and not worry about whether they should, like the scientists from Jurassic Park. Our goal for the rest of this chapter is to build something simple that makes use of our service’s discovery and sets us up for our coordinated replication in the next chapter.

Discovery alone isn’t useful—so what if a bunch of computers discover each other and they just sit there doing nothing? Discovery is important because the discovery events trigger other processes in our service like replication and consensus. When servers discover other servers, we want to trigger the servers to replicate. We need a component in our service that handles when a server joins (or leaves) the cluster and begins (or ends) replicating from it.

Our replication will be pull-based, with the replication component consuming from each discovered server and producing a copy to the local server. In pull-based replication, the consumer periodically polls the data source to check if it has new data to consume. In push-based replication, the data source pushes the data to its replicas. (In the next chapter we’ll integrate Raft to our service—and it’s push-based.)

Pull-based systems’ flexibility can be great for log and message systems where the consumers and work loads can differ—for example, if you have a client that stream processes its data and runs continuously and you have a client that batch processes its data and runs every twenty-four hours. When replicating between servers, we replicate the newest data with as low latency as possible with homogeneous servers, so pull-based and push-based systems behave about the same. But it’ll be easier to write our own pull-based replication that will highlight why we need consensus.

To add replication to our cluster, we need a replication component that acts as a membership handler handling when a server joins and leaves the cluster. When a server joins the cluster, the component will connect to the server and run a loop that consumes from the discovered server and produces to the local server.

In the internal/log directory, create a new file named replicator.go to contain our replication code, beginning with this snippet:

ServerSideServiceDiscovery/internal/log/replicator.go
 package​ log
 
 import​ (
 "context"
 "sync"
 
 "go.uber.org/zap"
 "google.golang.org/grpc"
 
  api ​"github.com/travisjeffery/proglog/api/v1"
 )
 
 type​ Replicator ​struct​ {
  DialOptions []grpc.DialOption
  LocalServer api.LogClient
 
  logger *zap.Logger
 
  mu sync.Mutex
  servers ​map​[​string​]​chan​ ​struct​{}
  closed ​bool
  close ​chan​ ​struct​{}
 }

The replicator connects to other servers with the gRPC client, and we need to configure the client so it can authenticate with the servers. The clientOptions field is how we pass in the options to configure the client. The servers field is a map of server addresses to a channel, which the replicator uses to stop replicating from a server when the server fails or leaves the cluster. The replicator calls the produce function to save a copy of the messages it consumes from the other servers.

Next, put the following Join method below the replicator struct:

ServerSideServiceDiscovery/internal/log/replicator.go
 func​ (r *Replicator) Join(name, addr ​string​) ​error​ {
  r.mu.Lock()
 defer​ r.mu.Unlock()
  r.init()
 
 if​ r.closed {
 return​ nil
  }
 
 if​ _, ok := r.servers[name]; ok {
 // already replicating so skip
 return​ nil
  }
  r.servers[name] = make(​chan​ ​struct​{})
 
 go​ r.replicate(addr, r.servers[name])
 
 return​ nil
 }

The Join(name, addr string) method adds the given server address to the list of servers to replicate and kicks off the add goroutine to run the actual replication logic.

Now put the replicate(addr string) method, containing the replication logic, below the previous snippet:

ServerSideServiceDiscovery/internal/log/replicator.go
 func​ (r *Replicator) replicate(addr ​string​, leave ​chan​ ​struct​{}) {
  cc, err := grpc.Dial(addr, r.DialOptions...)
 if​ err != nil {
  r.logError(err, ​"failed to dial"​, addr)
 return
  }
 defer​ cc.Close()
 
  client := api.NewLogClient(cc)
 
  ctx := context.Background()
  stream, err := client.ConsumeStream(ctx,
  &api.ConsumeRequest{
  Offset: 0,
  },
  )
 if​ err != nil {
  r.logError(err, ​"failed to consume"​, addr)
 return
  }
 
  records := make(​chan​ *api.Record)
 go​ ​func​() {
 for​ {
  recv, err := stream.Recv()
 if​ err != nil {
  r.logError(err, ​"failed to receive"​, addr)
 return
  }
  records <- recv.Record
  }
  }()

You saw most of this code before when we tested our stream consumer and producer. Here we create a client and open up a stream to consume all logs on the server.

Append the following snippet to finish implementing replicate:

ServerSideServiceDiscovery/internal/log/replicator.go
 for​ {
 select​ {
 case​ <-r.close:
 return
 case​ <-leave:
 return
 case​ record := <-records:
  _, err = r.LocalServer.Produce(ctx,
  &api.ProduceRequest{
  Record: record,
  },
  )
 if​ err != nil {
  r.logError(err, ​"failed to produce"​, addr)
 return
  }
  }
  }
 }

The loop consumes the logs from the discovered server in a stream and then produces to the local server to save a copy. We replicate messages from the other server until that server fails or leaves the cluster and the replicator closes the channel for that server, which breaks the loop and ends the replicate goroutine. The replicator closes the channel when Serf receives an event saying that the other server left the cluster, and then this server calls the Leave method that we’re about to add.

Write the Leave(name string) method beneath your replicate method with the following code:

ServerSideServiceDiscovery/internal/log/replicator.go
 func​ (r *Replicator) Leave(name ​string​) ​error​ {
  r.mu.Lock()
 defer​ r.mu.Unlock()
  r.init()
 if​ _, ok := r.servers[name]; !ok {
 return​ nil
  }
  close(r.servers[name])
  delete(r.servers, name)
 return​ nil
 }

This Leave(name string) method handles the server leaving the cluster by removing the server from the list of servers to replicate and closes the server’s associated channel. Closing the channel signals to the receiver in the replicate goroutine to stop replicating from that server.

Next, add the following init helper below your Leave method:

ServerSideServiceDiscovery/internal/log/replicator.go
 func​ (r *Replicator) init() {
 if​ r.logger == nil {
  r.logger = zap.L().Named(​"replicator"​)
  }
 if​ r.servers == nil {
  r.servers = make(​map​[​string​]​chan​ ​struct​{})
  }
 if​ r.close == nil {
  r.close = make(​chan​ ​struct​{})
  }
 }

We use this init helper to lazily initialize the server map. You should use lazy initialization to give your structs a useful zero value[42] because having a useful zero value reduces the API’s size and complexity while maintaining the same functionality. Without a useful zero value, we’d either have to export a replicator constructor function for the user to call or export the servers field on the replicator struct for the user to set—making more API for the user to learn and then requiring them to write more code before they can use our struct.

Append the following snippet to implement the Close method:

ServerSideServiceDiscovery/internal/log/replicator.go
 func​ (r *Replicator) Close() ​error​ {
  r.mu.Lock()
 defer​ r.mu.Unlock()
  r.init()
 
 if​ r.closed {
 return​ nil
  }
  r.closed = true
  close(r.close)
 return​ nil
 }

Close closes the replicator so it doesn’t replicate new servers that join the cluster and it stops replicating existing servers by causing the replicate goroutines to return.

We have one last helper to add to handle errors. Add this logError(err error, msg, addr string) method at the bottom of the file:

ServerSideServiceDiscovery/internal/log/replicator.go
 func​ (r *Replicator) logError(err ​error​, msg, addr ​string​) {
  r.logger.Error(
  msg,
  zap.String(​"addr"​, addr),
  zap.Error(err),
  )
 }

With this method, we just log the errors because we have no other use for them and to keep the code short and simple. If your users need access to the errors, a technique you can use to expose these errors is to export an error channel and send the errors into it for your users to receive and handle.

That’s it for our replicator. In terms of components, we now have our replicator, membership, log, and server. Each service instance must set up and connect these components together to work. For simpler, short-running programs, I’ll make a run package that exports a Run function that’s responsible for running the program. Rob Pike’s Ivy project[43] works this way. For more complex, long-running services, I’ll make an agent package that exports an Agent type that manages the different components and processes that make up the service. Hashicorp’s Consul[44] works this way. Let’s write an Agent for our service and then test our log, server, membership, and replicator end-to-end.

Create an internal/agent directory with a file named agent.go inside that begins with this code:

ServerSideServiceDiscovery/internal/agent/agent.go
 package​ agent
 
 import​ (
 "crypto/tls"
 "fmt"
 "net"
 "sync"
 
 "go.uber.org/zap"
 
 "google.golang.org/grpc"
 "google.golang.org/grpc/credentials"
 
  api ​"github.com/travisjeffery/proglog/api/v1"
 "github.com/travisjeffery/proglog/internal/auth"
 "github.com/travisjeffery/proglog/internal/discovery"
 "github.com/travisjeffery/proglog/internal/log"
 "github.com/travisjeffery/proglog/internal/server"
 )
 
 type​ Agent ​struct​ {
  Config
 
  log *log.Log
  server *grpc.Server
  membership *discovery.Membership
  replicator *log.Replicator
 
  shutdown ​bool
  shutdowns ​chan​ ​struct​{}
  shutdownLock sync.Mutex
 }

An Agent runs on every service instance, setting up and connecting all the different components. The struct references each component (log, server, membership, replicator) that the Agent manages.

After the Agent, add its Config struct:

ServerSideServiceDiscovery/internal/agent/agent.go
 type​ Config ​struct​ {
  ServerTLSConfig *tls.Config
  PeerTLSConfig *tls.Config
  DataDir ​string
  BindAddr ​string
  RPCPort ​int
  NodeName ​string
  StartJoinAddrs []​string
  ACLModelFile ​string
  ACLPolicyFile ​string
 }
 
 func​ (c Config) RPCAddr() (​string​, ​error​) {
  host, _, err := net.SplitHostPort(c.BindAddr)
 if​ err != nil {
 return​ ​""​, err
  }
 return​ fmt.Sprintf(​"%s:%d"​, host, c.RPCPort), nil
 }

The Agent sets up the components so its Config comprises the components’ parameters to pass them through to the components.

Below Config, place this Agent creator function:

ServerSideServiceDiscovery/internal/agent/agent.go
 func​ New(config Config) (*Agent, ​error​) {
  a := &Agent{
  Config: config,
  shutdowns: make(​chan​ ​struct​{}),
  }
  setup := []​func​() ​error​{
  a.setupLogger,
  a.setupLog,
  a.setupServer,
  a.setupMembership,
  }
 for​ _, fn := ​range​ setup {
 if​ err := fn(); err != nil {
 return​ nil, err
  }
  }
 return​ a, nil
 }

New(Config) creates an Agent and runs a set of methods to set up and run the agent’s components. After we run New, we expect to have a running, functioning service. We’ve seen most of these setup codes before when testing our components, so we’ll cover them quickly.

First, set up the logger with this setupLogger method. Put setupLogger under New:

ServerSideServiceDiscovery/internal/agent/agent.go
 func​ (a *Agent) setupLogger() ​error​ {
  logger, err := zap.NewDevelopment()
 if​ err != nil {
 return​ err
  }
  zap.ReplaceGlobals(logger)
 return​ nil
 }

Then, we set up the log with this setupLog method. Put setupLog under the previous snippet:

ServerSideServiceDiscovery/internal/agent/agent.go
 func​ (a *Agent) setupLog() ​error​ {
 var​ err ​error
  a.log, err = log.NewLog(
  a.Config.DataDir,
  log.Config{},
  )
 return​ err
 }

Now we set up the server with setupServer. Add setupServer after setupLog:

ServerSideServiceDiscovery/internal/agent/agent.go
 func​ (a *Agent) setupServer() ​error​ {
  authorizer := auth.New(
  a.Config.ACLModelFile,
  a.Config.ACLPolicyFile,
  )
  serverConfig := &server.Config{
  CommitLog: a.log,
  Authorizer: authorizer,
  }
 var​ opts []grpc.ServerOption
 if​ a.Config.ServerTLSConfig != nil {
  creds := credentials.NewTLS(a.Config.ServerTLSConfig)
  opts = append(opts, grpc.Creds(creds))
  }
 var​ err ​error
  a.server, err = server.NewGRPCServer(serverConfig, opts...)
 if​ err != nil {
 return​ err
  }
  rpcAddr, err := a.RPCAddr()
 if​ err != nil {
 return​ err
  }
  ln, err := net.Listen(​"tcp"​, rpcAddr)
 if​ err != nil {
 return​ err
  }
 go​ ​func​() {
 if​ err := a.server.Serve(ln); err != nil {
  _ = a.Shutdown()
  }
  }()
 return​ err
 }

Then we set up the membership with setupMembership. Place setupMembership after setupServer:

ServerSideServiceDiscovery/internal/agent/agent.go
 func​ (a *Agent) setupMembership() ​error​ {
  rpcAddr, err := a.Config.RPCAddr()
 if​ err != nil {
 return​ err
  }
 var​ opts []grpc.DialOption
 if​ a.Config.PeerTLSConfig != nil {
  opts = append(opts, grpc.WithTransportCredentials(
  credentials.NewTLS(a.Config.PeerTLSConfig),
  ),
  )
  }
  conn, err := grpc.Dial(rpcAddr, opts...)
 if​ err != nil {
 return​ err
  }
  client := api.NewLogClient(conn)
  a.replicator = &log.Replicator{
  DialOptions: opts,
  LocalServer: client,
  }
  a.membership, err = discovery.New(a.replicator, discovery.Config{
  NodeName: a.Config.NodeName,
  BindAddr: a.Config.BindAddr,
  Tags: ​map​[​string​]​string​{
 "rpc_addr"​: rpcAddr,
  },
  StartJoinAddrs: a.Config.StartJoinAddrs,
  })
 return​ err
 }

setupMembership sets up a Replicator with the gRPC dial options needed to connect to other servers and a client so the replicator can connect to other servers, consume their data, and produce a copy of the data to the local server. Then we create a Membership passing in the replicator and its handler to notify the replicator when servers join and leave the cluster.

That’s all of the agent’s setup code. If we call New now, we’d have a running agent. At some point we’ll want to shut down the agent, so put this Shutdown method at the bottom of the file:

ServerSideServiceDiscovery/internal/agent/agent.go
 func​ (a *Agent) Shutdown() ​error​ {
  a.shutdownLock.Lock()
 defer​ a.shutdownLock.Unlock()
 if​ a.shutdown {
 return​ nil
  }
  a.shutdown = true
  close(a.shutdowns)
 
  shutdown := []​func​() ​error​{
  a.membership.Leave,
  a.replicator.Close,
 func​() ​error​ {
  a.server.GracefulStop()
 return​ nil
  },
  a.log.Close,
  }
 for​ _, fn := ​range​ shutdown {
 if​ err := fn(); err != nil {
 return​ err
  }
  }
 return​ nil
 }

This ensures that the agent will shut down once even if people call Shutdown multiple times. Then we shut down the agent and its components by:

We’ve implemented Serf into our service, so we can now run multiple instances of our service that discover and then replicate each other’s data. Let’s write a test to check that our service discovery and replication works and to prevent us from introducing a regression when we build consensus in Chapter 8, Coordinate Your Services with Consensus.