In the gRPC architecture, pickers handle the RPC balancing logic. They’re called pickers because they pick a server from the servers discovered by the resolver to handle each RPC. Pickers can route RPCs based on information about the RPC, client, and server, so their utility goes beyond balancing to any kind of request-routing logic.
To implement the picker builder, create a file named picker.go in internal/loadbalance that begins with this code:
| package loadbalance |
| |
| import ( |
| "strings" |
| "sync" |
| "sync/atomic" |
| |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/base" |
| ) |
| |
| var _ base.PickerBuilder = (*Picker)(nil) |
| |
| type Picker struct { |
| mu sync.RWMutex |
| leader balancer.SubConn |
| followers []balancer.SubConn |
| current uint64 |
| } |
| |
| func (p *Picker) Build(buildInfo base.PickerBuildInfo) balancer.Picker { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| var followers []balancer.SubConn |
| for sc, scInfo := range buildInfo.ReadySCs { |
| isLeader := scInfo. |
| Address. |
| Attributes. |
| Value("is_leader").(bool) |
| if isLeader { |
| p.leader = sc |
| continue |
| } |
| followers = append(followers, sc) |
| } |
| p.followers = followers |
| return p |
| } |
Pickers use the builder pattern just like resolvers. gRPC passes a map of subconnections with information about those subconnections to Build to set up the picker—behind the scenes, gRPC connected to the addresses that our resolver discovered. Our picker will route consume RPCs to follower servers and produce RPCs to the leader server. The address attributes help us differentiate the servers.
To implement the picker, add this snippet below Build:
| var _ balancer.Picker = (*Picker)(nil) |
| |
| func (p *Picker) Pick(info balancer.PickInfo) ( |
| balancer.PickResult, error) { |
| p.mu.RLock() |
| defer p.mu.RUnlock() |
| var result balancer.PickResult |
| if strings.Contains(info.FullMethodName, "Produce") || |
| len(p.followers) == 0 { |
| result.SubConn = p.leader |
| } else if strings.Contains(info.FullMethodName, "Consume") { |
| result.SubConn = p.nextFollower() |
| } |
| if result.SubConn == nil { |
| return result, balancer.ErrNoSubConnAvailable |
| } |
| return result, nil |
| } |
| |
| func (p *Picker) nextFollower() balancer.SubConn { |
| cur := atomic.AddUint64(&p.current, uint64(1)) |
| len := uint64(len(p.followers)) |
| idx := int(cur % len) |
| return p.followers[idx] |
| } |
Pickers have one method: Pick(balancer.PickInfo). gRPC gives Pick a balancer.PickInfo containing the RPC’s name and context to help the picker know what subconnection to pick. If you have header metadata, you can read it from the context. Pick returns a balancer.PickResult with the subconnection to handle the call. Optionally, you can set a Done callback on the result that gRPC calls when the RPC completes. The callback tells you the RPC’s error, trailer metadata, and whether there were bytes sent and received to and from the server.
We look at the RPC’s method name to know whether the call is an append or consume call, and if we should pick a leader subconnection or a follower subconnection. We balance the consume calls across the followers with the round-robin algorithm. Put this snippet at the end of the file to register the picker with gRPC and finish the picker’s code:
| func init() { |
| balancer.Register( |
| base.NewBalancerBuilder(Name, &Picker{}, base.Config{}), |
| ) |
| } |
Though pickers handle routing the calls, which we’d traditionally consider handling the balancing, gRPC has a balancer type that takes input from gRPC, manages subconnections, and collects and aggregates connectivity states. gRPC provides a base balancer; you probably don’t need to write your own.
Time to test our picker. Create a test file named picker_test.go in internal/loadbalance that begins with this snippet:
| package loadbalance_test |
| |
| import ( |
| "testing" |
| |
| "google.golang.org/grpc/attributes" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/base" |
| "google.golang.org/grpc/resolver" |
| |
| "github.com/stretchr/testify/require" |
| |
| "github.com/travisjeffery/proglog/internal/loadbalance" |
| ) |
| |
| func TestPickerNoSubConnAvailable(t *testing.T) { |
| picker := &loadbalance.Picker{} |
| for _, method := range []string{ |
| "/log.vX.Log/Produce", |
| "/log.vX.Log/Consume", |
| } { |
| info := balancer.PickInfo{ |
| FullMethodName: method, |
| } |
| result, err := picker.Pick(info) |
| require.Equal(t, balancer.ErrNoSubConnAvailable, err) |
| require.Nil(t, result.SubConn) |
| } |
| } |
TestPickerNoSubConnAvailable tests that a picker initially returns balancer.ErrNoSubConnAvailable before the resolver has discovered servers and updated the picker’s state with available subconnections. balancer.ErrNoSubConnAvailable instructs gRPC to block the client’s RPCs until the picker has an available subconnection to handle them.
Next add this snippet below TestPickerNoSubConnAvailable to test pickers with subconnections to pick from:
| func TestPickerProducesToLeader(t *testing.T) { |
| picker, subConns := setupTest() |
| info := balancer.PickInfo{ |
| FullMethodName: "/log.vX.Log/Produce", |
| } |
| for i := 0; i < 5; i++ { |
| gotPick, err := picker.Pick(info) |
| require.NoError(t, err) |
| require.Equal(t, subConns[0], gotPick.SubConn) |
| } |
| } |
| |
| func TestPickerConsumesFromFollowers(t *testing.T) { |
| picker, subConns := setupTest() |
| info := balancer.PickInfo{ |
| FullMethodName: "/log.vX.Log/Consume", |
| } |
| for i := 0; i < 5; i++ { |
| pick, err := picker.Pick(info) |
| require.NoError(t, err) |
| require.Equal(t, subConns[i%2+1], pick.SubConn) |
| } |
| } |
TestPickerProducesToLeader tests that the picker picks the leader subconnection for append calls. TestPickerConsumesFromFollowers tests that the picker picks the followers subconnections in a round-robin for consume calls.
Put this final snippet at the end of the file to define the tests’ helpers:
| func setupTest() (*loadbalance.Picker, []*subConn) { |
| var subConns []*subConn |
| buildInfo := base.PickerBuildInfo{ |
| ReadySCs: make(map[balancer.SubConn]base.SubConnInfo), |
| } |
| for i := 0; i < 3; i++ { |
| sc := &subConn{} |
| addr := resolver.Address{ |
| Attributes: attributes.New("is_leader", i == 0), |
| } |
| // 0th sub conn is the leader |
| sc.UpdateAddresses([]resolver.Address{addr}) |
| buildInfo.ReadySCs[sc] = base.SubConnInfo{Address: addr} |
| subConns = append(subConns, sc) |
| } |
| picker := &loadbalance.Picker{} |
| picker.Build(buildInfo) |
| return picker, subConns |
| } |
| |
| // subConn implements balancer.SubConn. |
| type subConn struct { |
| addrs []resolver.Address |
| } |
| |
| func (s *subConn) UpdateAddresses(addrs []resolver.Address) { |
| s.addrs = addrs |
| } |
| |
| func (s *subConn) Connect() {} |
setupTest builds the test picker with some mock subconnections. We create the picker with build information that contains addresses with the same attributes as our resolver sets.
Run the picker’s tests to verify they pass. Now we’re ready to put everything together.