The gRPC resolver we’ll write in this section will call the GetServers endpoint we made and pass its information to gRPC so that the picker knows what servers it can route requests to.
To start, create a new package for our resolver and picker code by running $ mkdir internal/loadbalance.
gRPC uses the builder pattern for resolvers and pickers, so each has a builder interface and an implementation interface. Because the builder interfaces have one simple method—Build—we’ll implement both interfaces with one type. Create a file named resolver.go in internal/loadbalance that begins with this code:
| package loadbalance |
| |
| import ( |
| "context" |
| "fmt" |
| "sync" |
| |
| "go.uber.org/zap" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/attributes" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/serviceconfig" |
| |
| api "github.com/travisjeffery/proglog/api/v1" |
| ) |
| |
| type Resolver struct { |
| mu sync.Mutex |
| clientConn resolver.ClientConn |
| resolverConn *grpc.ClientConn |
| serviceConfig *serviceconfig.ParseResult |
| logger *zap.Logger |
| } |
Resolver is the type we’ll implement into gRPC’s resolver.Builder and resolver.Resolver interfaces. The clientConn connection is the user’s client connection and gRPC passes it to the resolver for the resolver to update with the servers it discovers. The resolverConn is the resolver’s own client connection to the server so it can call GetServers and get the servers.
Add this snippet below the Resolver type to implement gRPC’s resolver.Builder interface:
| var _ resolver.Builder = (*Resolver)(nil) |
| |
| func (r *Resolver) Build( |
| target resolver.Target, |
| cc resolver.ClientConn, |
| opts resolver.BuildOptions, |
| ) (resolver.Resolver, error) { |
| r.logger = zap.L().Named("resolver") |
| r.clientConn = cc |
| var dialOpts []grpc.DialOption |
| if opts.DialCreds != nil { |
| dialOpts = append( |
| dialOpts, |
| grpc.WithTransportCredentials(opts.DialCreds), |
| ) |
| } |
| r.serviceConfig = r.clientConn.ParseServiceConfig( |
| fmt.Sprintf(`{"loadBalancingConfig":[{"%s":{}}]}`, Name), |
| ) |
| var err error |
| r.resolverConn, err = grpc.Dial(target.Endpoint, dialOpts...) |
| if err != nil { |
| return nil, err |
| } |
| r.ResolveNow(resolver.ResolveNowOptions{}) |
| return r, nil |
| } |
| |
| const Name = "proglog" |
| |
| func (r *Resolver) Scheme() string { |
| return Name |
| } |
| |
| func init() { |
| resolver.Register(&Resolver{}) |
| } |
resolver.Builder comprises two methods—Build and Scheme:
Build receives the data needed to build a resolver that can discover the servers (like the target address) and the client connection the resolver will update with the servers it discovers. Build sets up a client connection to our server so the resolver can call the GetServers API.
Scheme returns the resolver’s scheme identifier. When you call grpc.Dial, gRPC parses out the scheme from the target address you gave it and tries to find a resolver that matches, defaulting to its DNS resolver. For our resolver, you’ll format the target address like this: proglog://your-service-address.
We register this resolver with gRPC in init so gRPC knows about this resolver when it’s looking for resolvers that match the target’s scheme.
Put this snippet below init to implement gRPC’s resolver.Resolver interface:
| var _ resolver.Resolver = (*Resolver)(nil) |
| |
| func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) { |
| r.mu.Lock() |
| defer r.mu.Unlock() |
| client := api.NewLogClient(r.resolverConn) |
| // get cluster and then set on cc attributes |
| ctx := context.Background() |
| res, err := client.GetServers(ctx, &api.GetServersRequest{}) |
| if err != nil { |
| r.logger.Error( |
| "failed to resolve server", |
| zap.Error(err), |
| ) |
| return |
| } |
| var addrs []resolver.Address |
| for _, server := range res.Servers { |
| addrs = append(addrs, resolver.Address{ |
| Addr: server.RpcAddr, |
| Attributes: attributes.New( |
| "is_leader", |
| server.IsLeader, |
| ), |
| }) |
| } |
| r.clientConn.UpdateState(resolver.State{ |
| Addresses: addrs, |
| ServiceConfig: r.serviceConfig, |
| }) |
| } |
| |
| func (r *Resolver) Close() { |
| if err := r.resolverConn.Close(); err != nil { |
| r.logger.Error( |
| "failed to close conn", |
| zap.Error(err), |
| ) |
| } |
| } |
resolver.Resolver comprises two methods—ResolveNow and Close. gRPC calls ResolveNow to resolve the target, discover the servers, and update the client connection with the servers. How your resolver will discover the servers depends on your resolver and the service you’re working with. For example, a resolver built for Kubernetes could call Kubernetes’ API to get the list of endpoints. We create a gRPC client for our service and call the GetServers API to get the cluster’s servers.
Services can specify how clients should balance their calls to the service by updating the state with a service config. We update the state with a service config that specifies to use the “proglog” load balancer we’ll write in Route and Balance Requests with Pickers.
You update the state with a slice of resolver.Address to inform the load balancer what servers it can choose from. A resolver.Address has three fields:
Addr (required)—the address of the server to connect to.
Attributes (optional but useful)—a map containing any data that’s useful for the load balancer. We’ll tell the picker what server is the leader and what servers are followers with this field.
ServerName (optional and you likely don’t need to set)—the name used as the transport certificate authority for the address, instead of the hostname taken from the Dial target string.
After we’ve discovered the servers, we update the client connection by calling UpdateState with the resolver.Address’s. We set up the addresses with the data in the api.Server’s. gRPC may call ResolveNow concurrently, so we use a mutex to protect access across goroutines.
Close closes the resolver. In our resolver, we close the connection to our server created in Build.
That’s it for our resolver’s code. Let’s test it.
Create a test file named resolver_test.go in internal/loadbalance that begins with this snippet:
1: | package loadbalance_test |
- | |
- | import ( |
- | "net" |
5: | "testing" |
- | |
- | "github.com/stretchr/testify/require" |
- | "google.golang.org/grpc" |
- | "google.golang.org/grpc/attributes" |
10: | "google.golang.org/grpc/credentials" |
- | "google.golang.org/grpc/resolver" |
- | "google.golang.org/grpc/serviceconfig" |
- | |
- | api "github.com/travisjeffery/proglog/api/v1" |
15: | "github.com/travisjeffery/proglog/internal/loadbalance" |
- | "github.com/travisjeffery/proglog/internal/config" |
- | "github.com/travisjeffery/proglog/internal/server" |
- | ) |
- | |
20: | func TestResolver(t *testing.T) { |
- | l, err := net.Listen("tcp", "127.0.0.1:0") |
- | require.NoError(t, err) |
- | |
- | tlsConfig, err := config.SetupTLSConfig(config.TLSConfig{ |
25: | CertFile: config.ServerCertFile, |
- | KeyFile: config.ServerKeyFile, |
- | CAFile: config.CAFile, |
- | Server: true, |
- | ServerAddress: "127.0.0.1", |
30: | }) |
- | require.NoError(t, err) |
- | serverCreds := credentials.NewTLS(tlsConfig) |
- | |
- | srv, err := server.NewGRPCServer(&server.Config{ |
35: | GetServerer: &getServers{}, |
- | }, grpc.Creds(serverCreds)) |
- | require.NoError(t, err) |
- | |
- | go srv.Serve(l) |
This code sets up a server for our test resolver to try and discover some servers from. We pass in a mock GetServerers on line 35 so we can set what servers the resolver should find.
Put this snippet below the previous snippet to continue writing the test:
| conn := &clientConn{} |
| tlsConfig, err = config.SetupTLSConfig(config.TLSConfig{ |
| CertFile: config.RootClientCertFile, |
| KeyFile: config.RootClientKeyFile, |
| CAFile: config.CAFile, |
| Server: false, |
| ServerAddress: "127.0.0.1", |
| }) |
| require.NoError(t, err) |
| clientCreds := credentials.NewTLS(tlsConfig) |
| opts := resolver.BuildOptions{ |
| DialCreds: clientCreds, |
| } |
| r := &loadbalance.Resolver{} |
| _, err = r.Build( |
| resolver.Target{ |
| Endpoint: l.Addr().String(), |
| }, |
| conn, |
| opts, |
| ) |
| require.NoError(t, err) |
This code creates and builds the test resolver and configures its target endpoint to point to the server we set up in the previous snippet. The resolver will call GetServers to resolve the servers and update the client connection with the servers’ addresses.
Add this snippet below the previous snippet to finish writing the test:
| wantState := resolver.State{ |
| Addresses: []resolver.Address{{ |
| Addr: "localhost:9001", |
| Attributes: attributes.New("is_leader", true), |
| }, { |
| Addr: "localhost:9002", |
| Attributes: attributes.New("is_leader", false), |
| }}, |
| } |
| require.Equal(t, wantState, conn.state) |
| |
| conn.state.Addresses = nil |
| r.ResolveNow(resolver.ResolveNowOptions{}) |
| require.Equal(t, wantState, conn.state) |
| } |
We check that the resolver updated the client connection with the servers and data we expected. We wanted the resolver to find two servers and mark the 9001 server as the leader.
Our test depended on some mock types. Add this code at the bottom of the file:
| type getServers struct{} |
| |
| func (s *getServers) GetServers() ([]*api.Server, error) { |
| return []*api.Server{{ |
| Id: "leader", |
| RpcAddr: "localhost:9001", |
| IsLeader: true, |
| }, { |
| Id: "follower", |
| RpcAddr: "localhost:9002", |
| }}, nil |
| } |
| |
| type clientConn struct { |
| resolver.ClientConn |
| state resolver.State |
| } |
| |
| func (c *clientConn) UpdateState(state resolver.State) { |
| c.state = state |
| } |
| |
| func (c *clientConn) ReportError(err error) {} |
| |
| func (c *clientConn) NewAddress(addrs []resolver.Address) {} |
| |
| func (c *clientConn) NewServiceConfig(config string) {} |
| |
| func (c *clientConn) ParseServiceConfig( |
| config string, |
| ) *serviceconfig.ParseResult { |
| return nil |
| } |
getServers implements GetServerers, whose job is to return a known server set for the resolver to find. clientConn implements resolver.ClientConn, and its job is to keep a reference to the state the resolver updated it with so that we can verify that the resolver updates the client connection with the correct data.
Run the resolver tests to verify that they pass. And now, we’re on to the picker.