Resolve the Servers

The gRPC resolver we’ll write in this section will call the GetServers endpoint we made and pass its information to gRPC so that the picker knows what servers it can route requests to.

To start, create a new package for our resolver and picker code by running $ mkdir internal/loadbalance.

gRPC uses the builder pattern for resolvers and pickers, so each has a builder interface and an implementation interface. Because the builder interfaces have one simple method—Build—we’ll implement both interfaces with one type. Create a file named resolver.go in internal/loadbalance that begins with this code:

ClientSideServiceDiscovery/internal/loadbalance/resolver.go
 package​ loadbalance
 
 import​ (
 "context"
 "fmt"
 "sync"
 
 "go.uber.org/zap"
 "google.golang.org/grpc"
 "google.golang.org/grpc/attributes"
 "google.golang.org/grpc/resolver"
 "google.golang.org/grpc/serviceconfig"
 
  api ​"github.com/travisjeffery/proglog/api/v1"
 )
 
 type​ Resolver ​struct​ {
  mu sync.Mutex
  clientConn resolver.ClientConn
  resolverConn *grpc.ClientConn
  serviceConfig *serviceconfig.ParseResult
  logger *zap.Logger
 }

Resolver is the type we’ll implement into gRPC’s resolver.Builder and resolver.Resolver interfaces. The clientConn connection is the user’s client connection and gRPC passes it to the resolver for the resolver to update with the servers it discovers. The resolverConn is the resolver’s own client connection to the server so it can call GetServers and get the servers.

Add this snippet below the Resolver type to implement gRPC’s resolver.Builder interface:

ClientSideServiceDiscovery/internal/loadbalance/resolver.go
 var​ _ resolver.Builder = (*Resolver)(nil)
 
 func​ (r *Resolver) Build(
  target resolver.Target,
  cc resolver.ClientConn,
  opts resolver.BuildOptions,
 ) (resolver.Resolver, ​error​) {
  r.logger = zap.L().Named(​"resolver"​)
  r.clientConn = cc
 var​ dialOpts []grpc.DialOption
 if​ opts.DialCreds != nil {
  dialOpts = append(
  dialOpts,
  grpc.WithTransportCredentials(opts.DialCreds),
  )
  }
  r.serviceConfig = r.clientConn.ParseServiceConfig(
  fmt.Sprintf(​`{"loadBalancingConfig":[{"%s":{}}]}`​, Name),
  )
 var​ err ​error
  r.resolverConn, err = grpc.Dial(target.Endpoint, dialOpts...)
 if​ err != nil {
 return​ nil, err
  }
  r.ResolveNow(resolver.ResolveNowOptions{})
 return​ r, nil
 }
 
 const​ Name = ​"proglog"
 
 func​ (r *Resolver) Scheme() ​string​ {
 return​ Name
 }
 
 func​ init() {
  resolver.Register(&Resolver{})
 }

resolver.Builder comprises two methods—Build and Scheme:

We register this resolver with gRPC in init so gRPC knows about this resolver when it’s looking for resolvers that match the target’s scheme.

Put this snippet below init to implement gRPC’s resolver.Resolver interface:

ClientSideServiceDiscovery/internal/loadbalance/resolver.go
 var​ _ resolver.Resolver = (*Resolver)(nil)
 
 func​ (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
  r.mu.Lock()
 defer​ r.mu.Unlock()
  client := api.NewLogClient(r.resolverConn)
 // get cluster and then set on cc attributes
  ctx := context.Background()
  res, err := client.GetServers(ctx, &api.GetServersRequest{})
 if​ err != nil {
  r.logger.Error(
 "failed to resolve server"​,
  zap.Error(err),
  )
 return
  }
 var​ addrs []resolver.Address
 for​ _, server := ​range​ res.Servers {
  addrs = append(addrs, resolver.Address{
  Addr: server.RpcAddr,
  Attributes: attributes.New(
 "is_leader"​,
  server.IsLeader,
  ),
  })
  }
  r.clientConn.UpdateState(resolver.State{
  Addresses: addrs,
  ServiceConfig: r.serviceConfig,
  })
 }
 
 func​ (r *Resolver) Close() {
 if​ err := r.resolverConn.Close(); err != nil {
  r.logger.Error(
 "failed to close conn"​,
  zap.Error(err),
  )
  }
 }

resolver.Resolver comprises two methods—ResolveNow and Close. gRPC calls ResolveNow to resolve the target, discover the servers, and update the client connection with the servers. How your resolver will discover the servers depends on your resolver and the service you’re working with. For example, a resolver built for Kubernetes could call Kubernetes’ API to get the list of endpoints. We create a gRPC client for our service and call the GetServers API to get the cluster’s servers.

Services can specify how clients should balance their calls to the service by updating the state with a service config. We update the state with a service config that specifies to use the “proglog” load balancer we’ll write in Route and Balance Requests with Pickers.

You update the state with a slice of resolver.Address to inform the load balancer what servers it can choose from. A resolver.Address has three fields:

After we’ve discovered the servers, we update the client connection by calling UpdateState with the resolver.Address’s. We set up the addresses with the data in the api.Server’s. gRPC may call ResolveNow concurrently, so we use a mutex to protect access across goroutines.

Close closes the resolver. In our resolver, we close the connection to our server created in Build.

That’s it for our resolver’s code. Let’s test it.

Create a test file named resolver_test.go in internal/loadbalance that begins with this snippet:

ClientSideServiceDiscovery/internal/loadbalance/resolver_test.go
1: package​ loadbalance_test
import​ (
"net"
5: "testing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
10: "google.golang.org/grpc/credentials"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
api ​"github.com/travisjeffery/proglog/api/v1"
15: "github.com/travisjeffery/proglog/internal/loadbalance"
"github.com/travisjeffery/proglog/internal/config"
"github.com/travisjeffery/proglog/internal/server"
)
20: func​ TestResolver(t *testing.T) {
l, err := net.Listen(​"tcp"​, ​"127.0.0.1:0"​)
require.NoError(t, err)
tlsConfig, err := config.SetupTLSConfig(config.TLSConfig{
25:  CertFile: config.ServerCertFile,
KeyFile: config.ServerKeyFile,
CAFile: config.CAFile,
Server: true,
ServerAddress: ​"127.0.0.1"​,
30:  })
require.NoError(t, err)
serverCreds := credentials.NewTLS(tlsConfig)
srv, err := server.NewGRPCServer(&server.Config{
35:  GetServerer: &getServers{},
}, grpc.Creds(serverCreds))
require.NoError(t, err)
go​ srv.Serve(l)

This code sets up a server for our test resolver to try and discover some servers from. We pass in a mock GetServerers on line 35 so we can set what servers the resolver should find.

Put this snippet below the previous snippet to continue writing the test:

ClientSideServiceDiscovery/internal/loadbalance/resolver_test.go
 conn := &clientConn{}
 tlsConfig, err = config.SetupTLSConfig(config.TLSConfig{
  CertFile: config.RootClientCertFile,
  KeyFile: config.RootClientKeyFile,
  CAFile: config.CAFile,
  Server: false,
  ServerAddress: ​"127.0.0.1"​,
 })
 require.NoError(t, err)
 clientCreds := credentials.NewTLS(tlsConfig)
 opts := resolver.BuildOptions{
  DialCreds: clientCreds,
 }
 r := &loadbalance.Resolver{}
 _, err = r.Build(
  resolver.Target{
  Endpoint: l.Addr().String(),
  },
  conn,
  opts,
 )
 require.NoError(t, err)

This code creates and builds the test resolver and configures its target endpoint to point to the server we set up in the previous snippet. The resolver will call GetServers to resolve the servers and update the client connection with the servers’ addresses.

Add this snippet below the previous snippet to finish writing the test:

ClientSideServiceDiscovery/internal/loadbalance/resolver_test.go
  wantState := resolver.State{
  Addresses: []resolver.Address{{
  Addr: ​"localhost:9001"​,
  Attributes: attributes.New(​"is_leader"​, true),
  }, {
  Addr: ​"localhost:9002"​,
  Attributes: attributes.New(​"is_leader"​, false),
  }},
  }
  require.Equal(t, wantState, conn.state)
 
  conn.state.Addresses = nil
  r.ResolveNow(resolver.ResolveNowOptions{})
  require.Equal(t, wantState, conn.state)
 }

We check that the resolver updated the client connection with the servers and data we expected. We wanted the resolver to find two servers and mark the 9001 server as the leader.

Our test depended on some mock types. Add this code at the bottom of the file:

ClientSideServiceDiscovery/internal/loadbalance/resolver_test.go
 type​ getServers ​struct​{}
 
 func​ (s *getServers) GetServers() ([]*api.Server, ​error​) {
 return​ []*api.Server{{
  Id: ​"leader"​,
  RpcAddr: ​"localhost:9001"​,
  IsLeader: true,
  }, {
  Id: ​"follower"​,
  RpcAddr: ​"localhost:9002"​,
  }}, nil
 }
 
 type​ clientConn ​struct​ {
  resolver.ClientConn
  state resolver.State
 }
 
 func​ (c *clientConn) UpdateState(state resolver.State) {
  c.state = state
 }
 
 func​ (c *clientConn) ReportError(err ​error​) {}
 
 func​ (c *clientConn) NewAddress(addrs []resolver.Address) {}
 
 func​ (c *clientConn) NewServiceConfig(config ​string​) {}
 
 func​ (c *clientConn) ParseServiceConfig(
  config ​string​,
 ) *serviceconfig.ParseResult {
 return​ nil
 }

getServers implements GetServerers, whose job is to return a known server set for the resolver to find. clientConn implements resolver.ClientConn, and its job is to keep a reference to the state the resolver updated it with so that we can verify that the resolver updates the client connection with the correct data.

Run the resolver tests to verify that they pass. And now, we’re on to the picker.