Because the compiler generated a server stub, the job left for us is to write it. To implement a server, you need to build a struct whose methods match the service definition in your protobuf.
Create an internal/server directory tree in the root of your project by running mkdir -p internal/server. Internal packages are magical packages in Go that can only be imported by nearby code. For example, you can import code in /a/b/c/internal/d/e/f by code rooted by /a/b/c, but not code rooted by /a/b/g. In this directory, we’ll implement our server in a file called server.go and a package named server. The first order of business is to define our server type and a factory function to create an instance of the server.
Here’s the code we need to add to our server.go file:
| package server |
| |
| import ( |
| "context" |
| |
| api "github.com/travisjeffery/proglog/api/v1" |
| "google.golang.org/grpc" |
| ) |
| |
| type Config struct { |
| CommitLog CommitLog |
| } |
| |
| var _ api.LogServer = (*grpcServer)(nil) |
| |
| type grpcServer struct { |
| api.UnimplementedLogServer |
| *Config |
| } |
| |
| func newgrpcServer(config *Config) (srv *grpcServer, err error) { |
| srv = &grpcServer{ |
| Config: config, |
| } |
| return srv, nil |
| } |
To implement the API you saw in log_grpc.pb.go, we need to implement the Consume and Produce handlers. Our gRPC layer is thin because it defers to our log library, so to implement these methods, you call down to the library and handle any errors. Add the following code below your newgrpcServer function:
| func (s *grpcServer) Produce(ctx context.Context, req *api.ProduceRequest) ( |
| *api.ProduceResponse, error) { |
| offset, err := s.CommitLog.Append(req.Record) |
| if err != nil { |
| return nil, err |
| } |
| return &api.ProduceResponse{Offset: offset}, nil |
| } |
| |
| func (s *grpcServer) Consume(ctx context.Context, req *api.ConsumeRequest) ( |
| *api.ConsumeResponse, error) { |
| record, err := s.CommitLog.Read(req.Offset) |
| if err != nil { |
| return nil, err |
| } |
| return &api.ConsumeResponse{Record: record}, nil |
| } |
With this snippet, we’ve implemented the Produce(context.Context, *api.ProduceRequest) and Consume(context.Context, *api.ConsumeRequest) methods on our server. These methods handle the requests made by clients to produce and consume to the server’s log. Now let’s add the streaming APIs. Put the following code below the previous snippet:
| func (s *grpcServer) ProduceStream( |
| stream api.Log_ProduceStreamServer, |
| ) error { |
| for { |
| req, err := stream.Recv() |
| if err != nil { |
| return err |
| } |
| res, err := s.Produce(stream.Context(), req) |
| if err != nil { |
| return err |
| } |
| if err = stream.Send(res); err != nil { |
| return err |
| } |
| } |
| } |
| |
| |
| func (s *grpcServer) ConsumeStream( |
| req *api.ConsumeRequest, |
| stream api.Log_ConsumeStreamServer, |
| ) error { |
| for { |
| select { |
| case <-stream.Context().Done(): |
| return nil |
| default: |
| res, err := s.Consume(stream.Context(), req) |
| switch err.(type) { |
| case nil: |
| case api.ErrOffsetOutOfRange: |
| continue |
| default: |
| return err |
| } |
| if err = stream.Send(res); err != nil { |
| return err |
| } |
| req.Offset++ |
| } |
| } |
| } |
ProduceStream(api.Log_ProduceStreamServer) implements a bidirectional streaming RPC so the client can stream data into the server’s log and the server can tell the client whether each request succeeded. ConsumeStream(*api.ConsumeRequest, api.Log_ConsumeStreamServer) implements a server-side streaming RPC so the client can tell the server where in the log to read records, and then the server will stream every record that follows—even records that aren’t in the log yet! When the server reaches the end of the log, the server will wait until someone appends a record to the log and then continue streaming records to the client.
The code that makes up our gRPC service is short and simple, which is a sign that we have a clean separation between our networking code and log code. However, one reason our service’s code is so short is because we have the most basic error handling ever: we just send the client whatever error our library returned.
If a client tried to consume a message but the request failed, the developer would want to know why. Could the server not find the message? Did the server fail unexpectedly? The server communicates this info with a status code. Also, end users need to know when the application fails, so the server should send back a human-readable version of the error for the client to show to the user.
Let’s explore how to improve our service’s error handling, shall we?
Yet another nice feature of gRPC is how it handles errors. In the previous code, we return errors just like you’d see in code from the Go standard library. Even though this code is handling calls between people on different computers, you wouldn’t know it—thanks to gRPC, which abstracts away the networking details. By default your errors will only have a string description, but you may want to include more information such as a status code or some other arbitrary data.
Go’s gRPC implementation has an awesome status package[18] that you can use to build errors with status codes or whatever other data you want to include in your errors. To create an error with a status code, you create the error with the Error function from the status package and pass the relevant code from the codes package[19] that matches the type of error you have. Any status code you attach on the error here must be a code defined in the codes package—they’re meant to be consistent across the languages gRPC supports. For example, if you couldn’t find a record for some ID, then you’d use the NotFound code like this:
| err := status.Error(codes.NotFound, "id was not found") |
| return nil, err |
On the client side, you’d parse out the code from the error with the FromError function from the status package. Your goal is to have as few non-status errors as possible so you know why the errors happen and can handle them gracefully. The non-status errors that are OK are unforeseen, internal server errors. Here’s how to use the FromError function to parse out a status from a gRPC error:
| st, ok := status.FromError(err) |
| if !ok { |
| // Error was not a status error |
| } |
| // Use st.Message() and st.Code() |
When you want more than a status code (say you’re trying to debug an error and want more details like logs or traces), then you can use the status package’s WithDetails function, which allows you to attach any protobuf message you want to the error.
The errdetailspackage[20] provides some protobufs you’ll likely find useful when building your service, including messages to use to handle bad requests, debug info, and localized messages.
Let’s use the LocalizedMessage from the errdetails package to change the previous example to respond with an error message that’s safe to return to the user. In the following code, we first create a new not-found status, then we create the localized message specifying the message and locale used. Next we attach the details to the status, and then finally convert and return the status as a Go error:
| st := status.New(codes.NotFound, "id was not found") |
| d := &errdetails.LocalizedMessage{ |
| Locale: "en-US", |
| Message: fmt.Sprintf( |
| "We couldn't find a user with the email address: %s", |
| id, |
| ), |
| } |
| var err error |
| st, err = st.WithDetails(d) |
| if err != nil { |
| // If this errored, it will always error |
| // here, so better panic so we can figure |
| // out why than have this silently passing. |
| panic(fmt.Sprintf("Unexpected error attaching metadata: %v", err)) |
| } |
| return st.Err() |
To extract these details on the client side, you need to convert the error back into a status, pull out the details via its Details method, and then convert the type of the details to match the type of the protobuf you set on the server, which in our case is *errdetails.LocalizedMessage.
The code to do that looks like this:
| st := status.Convert(err) |
| for _, detail := range st.Details() { |
| switch t := detail.(type) { |
| case *errdetails.LocalizedMessage: |
| // send t.Message back to the user |
| } |
| } |
Focusing back on our service, let’s add a custom error named ErrOffsetOutOfRange that the server will send back to the client when the client tries to consume an offset that’s outside of the log. Create an error.go file inside the api/v1 directory with the following code:
| package log_v1 |
| |
| import ( |
| "fmt" |
| |
| "google.golang.org/genproto/googleapis/rpc/errdetails" |
| "google.golang.org/grpc/status" |
| ) |
| |
| type ErrOffsetOutOfRange struct { |
| Offset uint64 |
| } |
| |
| func (e ErrOffsetOutOfRange) GRPCStatus() *status.Status { |
| st := status.New( |
| 404, |
| fmt.Sprintf("offset out of range: %d", e.Offset), |
| ) |
| msg := fmt.Sprintf( |
| "The requested offset is outside the log's range: %d", |
| e.Offset, |
| ) |
| d := &errdetails.LocalizedMessage{ |
| Locale: "en-US", |
| Message: msg, |
| } |
| std, err := st.WithDetails(d) |
| if err != nil { |
| return st |
| } |
| return std |
| } |
| |
| func (e ErrOffsetOutOfRange) Error() string { |
| return e.GRPCStatus().Err().Error() |
| } |
Next, let’s update your log to use this error. Find this section of the Read(offset uint64) method of your log in internal/log/log.go:
| if s == nil || s.nextOffset <= off { |
| return nil, fmt.Errorf("offset out of range: %d", off) |
| } |
And then change that section to this:
| if s == nil || s.nextOffset <= off { |
| return nil, api.ErrOffsetOutOfRange{Offset: off} |
| } |
Finally, we need to update the associated testOutOfRange(*testing.T, *log.Log) test in internal/log/log_test.go to the following code:
| func testOutOfRangeErr(t *testing.T, log *Log) { |
| read, err := log.Read(1) |
| require.Nil(t, read) |
| apiErr := err.(api.ErrOffsetOutOfRange) |
| require.Equal(t, uint64(1), apiErr.Offset) |
| } |
With our custom error, when the client tries to consume an offset that’s outside of the log, the log returns an error with plenty of useful information: a localized message, a status code, and an error message. Because our error is a struct type, we can type-switch the error returned by the Read(offset uint64) method to know what happened. We already use this feature in our ConsumeStream(*api.ConsumeRequest, api.Log_ConsumeStreamServer) method to know whether the server has read to the end of the log and just needs to wait until someone produces another record to the client:
| func (s *grpcServer) ConsumeStream( |
| req *api.ConsumeRequest, |
| stream api.Log_ConsumeStreamServer, |
| ) error { |
| for { |
| select { |
| case <-stream.Context().Done(): |
| return nil |
| default: |
| res, err := s.Consume(stream.Context(), req) |
| switch err.(type) { |
| case nil: |
| case api.ErrOffsetOutOfRange: |
| continue |
| default: |
| return err |
| } |
| if err = stream.Send(res); err != nil { |
| return err |
| } |
| req.Offset++ |
| } |
| } |
| } |
We’ve improved our service’s error handling to include status codes and a human-readable, localized error message to help our users know why a failure occurred. Next, let’s define the log field that’s on our service such that we can pass in different log implementations and make the service easier to write tests against.
Our server depends on a log abstraction. For example, when running in a production environment—where we need our service to persist our user’s data—the service will depend on our library. But when running in a test environment, where we don’t need to persist our test data, we could use a naive, in-memory log. An in-memory log would also be good for testing because it would make the tests run faster.
As you can see from these examples, it would be best if our service weren’t tied to a specific log implementation. Instead, we want to pass in a log implementation based on our needs at the time. We can make this possible by having our service depend on a log interface rather than on a concrete type. That way, the service can use any log implementation that satisfies the log interface.
Add this code below your grpcServer methods in server.go:
| type CommitLog interface { |
| Append(*api.Record) (uint64, error) |
| Read(uint64) (*api.Record, error) |
| } |
That’s all we need to do to allow our service to use any given log implementation that satisfies our CommitLog interface. Easy, huh?
Now, let’s write an exported API that enables our users to instantiate a new service.