Let’s test that our service discovery and replication works in an end-to-end test. We’ll set up a cluster with three nodes. We’ll produce a record to one server and verify that we can consume the message from the other servers that have (hopefully) replicated for us.
In internal/agent, create an agent_test.go file, beginning with this snippet:
| package agent_test |
| |
| import ( |
| "context" |
| "crypto/tls" |
| "fmt" |
| "io/ioutil" |
| "os" |
| "testing" |
| "time" |
| |
| "github.com/stretchr/testify/require" |
| "github.com/travisjeffery/go-dynaport" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/credentials" |
| |
| api "github.com/travisjeffery/proglog/api/v1" |
| "github.com/travisjeffery/proglog/internal/agent" |
| "github.com/travisjeffery/proglog/internal/config" |
| ) |
What can I say? Our end-to-end test has a lot going on and requires a lot of imports to make it happen.
Now we can write the test beginning with this code:
| func TestAgent(t *testing.T) { |
| serverTLSConfig, err := config.SetupTLSConfig(config.TLSConfig{ |
| CertFile: config.ServerCertFile, |
| KeyFile: config.ServerKeyFile, |
| CAFile: config.CAFile, |
| Server: true, |
| ServerAddress: "127.0.0.1", |
| }) |
| require.NoError(t, err) |
| |
| peerTLSConfig, err := config.SetupTLSConfig(config.TLSConfig{ |
| CertFile: config.RootClientCertFile, |
| KeyFile: config.RootClientKeyFile, |
| CAFile: config.CAFile, |
| Server: false, |
| ServerAddress: "127.0.0.1", |
| }) |
| require.NoError(t, err) |
This snippet defines the certificate configurations used in our test to test our security. The serverTLSConfig defines the configuration of the certificate that’s served to clients. And the peerTLSConfig defines the configuration of the certificate that’s served between servers so they can connect with and replicate each other.
Now set up the cluster by placing this code after the previous snippet:
| var agents []*agent.Agent |
| for i := 0; i < 3; i++ { |
| ports := dynaport.Get(2) |
| bindAddr := fmt.Sprintf("%s:%d", "127.0.0.1", ports[0]) |
| rpcPort := ports[1] |
| |
| dataDir, err := ioutil.TempDir("", "agent-test-log") |
| require.NoError(t, err) |
| |
| var startJoinAddrs []string |
| if i != 0 { |
| startJoinAddrs = append( |
| startJoinAddrs, |
| agents[0].Config.BindAddr, |
| ) |
| } |
| |
| agent, err := agent.New(agent.Config{ |
| NodeName: fmt.Sprintf("%d", i), |
| StartJoinAddrs: startJoinAddrs, |
| BindAddr: bindAddr, |
| RPCPort: rpcPort, |
| DataDir: dataDir, |
| ACLModelFile: config.ACLModelFile, |
| ACLPolicyFile: config.ACLPolicyFile, |
| ServerTLSConfig: serverTLSConfig, |
| PeerTLSConfig: peerTLSConfig, |
| }) |
| require.NoError(t, err) |
| |
| agents = append(agents, agent) |
| } |
| defer func() { |
| for _, agent := range agents { |
| err := agent.Shutdown() |
| require.NoError(t, err) |
| require.NoError(t, |
| os.RemoveAll(agent.Config.DataDir), |
| ) |
| } |
| }() |
| time.Sleep(3 * time.Second) |
This code sets up a three-node cluster. The second and third nodes join the first node’s cluster.
Because we now have two addresses to configure in our service (the RPC address and the Serf address), and because we run our tests on a single host, we need two ports. We used the 0 port trick in Test a gRPC Server and Client, to get a port automatically assigned to a listener by net.Listen,[45] but now we just want the port—with no listener—so we use the dynaport library to allocate the two ports we need: one for our gRPC log connections and one for our Serf service discovery connections.
We defer a function call that runs after the test to verify that the agents successfully shut down and to delete the test data. We make the test sleep for a few seconds to give the nodes time to discover each other.
Now that we have a cluster, we can test it works. Put this code after the previous snippet:
| leaderClient := client(t, agents[0], peerTLSConfig) |
| produceResponse, err := leaderClient.Produce( |
| context.Background(), |
| &api.ProduceRequest{ |
| Record: &api.Record{ |
| Value: []byte("foo"), |
| }, |
| }, |
| ) |
| require.NoError(t, err) |
| consumeResponse, err := leaderClient.Consume( |
| context.Background(), |
| &api.ConsumeRequest{ |
| Offset: produceResponse.Offset, |
| }, |
| ) |
| require.NoError(t, err) |
| require.Equal(t, consumeResponse.Record.Value, []byte("foo")) |
This code is the same as our testProduceConsume test case in Test a gRPC Server and Client: it checks that we can produce to and consume from a single node. Now we need to check that another node replicated the record. We do that by adding this code to the test, below the previous snippet:
| // wait until replication has finished |
| time.Sleep(3 * time.Second) |
| |
| followerClient := client(t, agents[1], peerTLSConfig) |
| consumeResponse, err = followerClient.Consume( |
| context.Background(), |
| &api.ConsumeRequest{ |
| Offset: produceResponse.Offset, |
| }, |
| ) |
| require.NoError(t, err) |
| require.Equal(t, consumeResponse.Record.Value, []byte("foo")) |
| } |
Because our replication works asynchronously across servers, the logs produced to one server won’t be immediately available on the replica servers. This process causes latency between when the message is produced to the first server and when it’s replicated to the second. The stupid, simple[46] way to fix this (especially since we’re black-box testing[47]) is to add a big enough delay in the test for the replicator to have replicated the message, but as small a delay as possible to keep our tests fast. Then we check that we can consume the replicated message.
Too Much Sleep Will Make Your Tests Too Slow | |
---|---|
![]() | If we had enough test cases that needed a delay like this, eventually our tests would be slow and annoying to run, in which case we’d want to use a different technique. For example, you could retry your test’s assertion in a loop with a small delay between iterations and timeout after a few seconds. Or you could have your server expose an event channel that included when the server produced a message. Then you’d wait to receive an event on that channel in your test so your test blocked and then continued the instant the second server replicated the message. |
Lastly, we need to add our client helper that sets up a client for the service:
| func client( |
| t *testing.T, |
| agent *agent.Agent, |
| tlsConfig *tls.Config, |
| ) api.LogClient { |
| tlsCreds := credentials.NewTLS(tlsConfig) |
| opts := []grpc.DialOption{grpc.WithTransportCredentials(tlsCreds)} |
| rpcAddr, err := agent.Config.RPCAddr() |
| require.NoError(t, err) |
| conn, err := grpc.Dial(fmt.Sprintf( |
| "%s", |
| rpcAddr, |
| ), opts...) |
| require.NoError(t, err) |
| client := api.NewLogClient(conn) |
| return client |
| } |
Now, run your tests with $ make test. If all is well, your tests pass and you’ve officially made a distributed service that can replicate data. Congrats!