Our resolver will need a way to discover the cluster’s servers. It needs to know each server’s address and whether or not it is the leader. In Implement Raft in Our Service, we built Raft into our service, which knows the cluster’s server and what server is the leader. We can expose this information to the resolver with an endpoint on our gRPC service.
Using an RPC for discovery will be easy because we built Serf and Raft into our service already. Kafka clients discover the cluster’s brokers by requesting a metadata endpoint. Kafka’s metadata endpoint responds with data that’s stored and coordinated with ZooKeeper, though the Kafka developers plan to remove the dependency on ZooKeeper and build Raft into Kafka to coordinate this data, similar to our service. This will be a big change in how this data works in Kafka, specifically with how it manages what servers are in the cluster and how it elects leaders; however, little to nothing will have to change with how the clients discover the servers, thus showing the benefit of using a service endpoint for client-side discovery.
Open the api/v1/log.proto file and update the Log service to include the GetServers endpoint like so:
| service Log { |
| rpc Produce(ProduceRequest) returns (ProduceResponse) {} |
| rpc Consume(ConsumeRequest) returns (ConsumeResponse) {} |
| rpc ConsumeStream(ConsumeRequest) returns (stream ConsumeResponse) {} |
| rpc ProduceStream(stream ProduceRequest) returns (stream ProduceResponse) |
| {} |
| rpc GetServers(GetServersRequest) returns (GetServersResponse) {} |
| } |
This is the endpoint resolvers will call to get the cluster’s servers.
Now, add this snippet to the end of the file to define the endpoint’s request and response:
| message GetServersRequest {} |
| |
| message GetServersResponse { |
| repeated Server servers = 1; |
| } |
| |
| message Server { |
| string id = 1; |
| string rpc_addr = 2; |
| bool is_leader = 3; |
| } |
The endpoint response includes the server addresses clients should connect to and what server is the leader. This information will tell the picker what server to send the server produce calls and what servers to send consume calls.
We’ll implement the endpoint on our server, but before we do, we need an API on our DistributedLog that exposes Raft’s server data. Open internal/log/distributed.go and put this GetServers method below DistributedLog.Close:
| func (l *DistributedLog) GetServers() ([]*api.Server, error) { |
| future := l.raft.GetConfiguration() |
| if err := future.Error(); err != nil { |
| return nil, err |
| } |
| var servers []*api.Server |
| for _, server := range future.Configuration().Servers { |
| servers = append(servers, &api.Server{ |
| Id: string(server.ID), |
| RpcAddr: string(server.Address), |
| IsLeader: l.raft.Leader() == server.Address, |
| }) |
| } |
| return servers, nil |
| } |
Raft’s configuration comprises the servers in the cluster and includes each server’s ID, address, and suffrage—whether the server votes in Raft elections (we don’t need the suffrage in our project). Raft can tell us the address of the cluster’s leader, too. GetServers converts the data from Raft’s raft.Server type into our *api.Server type for our API to respond with.
Let’s update the DistributedLog tests to check that GetServers returns the servers in the cluster as we expect. Open internal/log/distributed_test.go and add the new code in this snippet that surrounds the old lines 8 and 9:
1: | servers, err := logs[0].GetServers() |
- | require.NoError(t, err) |
- | require.Equal(t, 3, len(servers)) |
- | require.True(t, servers[0].IsLeader) |
5: | require.False(t, servers[1].IsLeader) |
- | require.False(t, servers[2].IsLeader) |
- | |
- | err = logs[0].Leave("1") |
- | require.NoError(t, err) |
10: | |
- | time.Sleep(50 * time.Millisecond) |
- | |
- | servers, err = logs[0].GetServers() |
- | require.NoError(t, err) |
15: | require.Equal(t, 2, len(servers)) |
- | require.True(t, servers[0].IsLeader) |
- | require.False(t, servers[1].IsLeader) |
The assertions before line 8 test that GetServers returns all three servers in the cluster and sets the leader server as the leader. After line 9, we expect the cluster to have two servers because these assertions run after we’ve made one server leave the cluster.
That’s it for the DistributedLog changes and tests. Next we’ll implement the endpoint on the server that calls DistributedLog.GetServers.
Open internal/server/server.go and update the Config to:
| type Config struct { |
| CommitLog CommitLog |
| Authorizer Authorizer |
| GetServerer GetServerer |
| } |
And put this snippet below the ConsumeStream method:
| func (s *grpcServer) GetServers( |
| ctx context.Context, req *api.GetServersRequest, |
| ) ( |
| *api.GetServersResponse, error) { |
| servers, err := s.GetServerer.GetServers() |
| if err != nil { |
| return nil, err |
| } |
| return &api.GetServersResponse{Servers: servers}, nil |
| } |
| |
| type GetServerer interface { |
| GetServers() ([]*api.Server, error) |
| } |
These two snippets enable us to inject different structs that can get servers. We don’t want to add the GetServers method to our CommitLog interface because a non-distributed log like our Log type doesn’t know about servers. So we made a new interface whose sole method GetServers matches DistributedLog.GetServers. When we update the end-to-end tests in the agent package, we’ll set our DistributedLog on the config as both the CommitLog and the GetServerer—which our new server endpoint wraps with error handling.
In agent.go, update your setupServer method to configure the server to get the cluster’s servers from the DistributedLog:
| serverConfig := &server.Config{ |
| CommitLog: a.log, |
| Authorizer: authorizer, |
» | GetServerer: a.log, |
| } |
Now we have a server endpoint that clients can call to get the cluster’s servers. We’re now ready to build our resolver.