Remote master stream

Next, we will move to the worker side and examine the equivalent stream helper for handling a connection to the master node. The definition of the remoteMasterStream type is pretty much the same as remoteWorkerStream, given as follows:

type remoteMasterStream struct {
stream proto.JobQueue_JobStreamClient
recvMsgCh chan *proto.MasterPayload
sendMsgCh chan *proto.WorkerPayload

ctx context.Context
cancelFn func()

mu sync.Mutex
onDisconnectFn func()
disconnected bool
}

Once the worker connects to the master node and receives a job assignment, it will invoke the newRemoteMasterStream function to wrap the obtained stream connection with a remoteMasterStream instance:

func newRemoteMasterStream(stream proto.JobQueue_JobStreamClient) *remoteMasterStream {
ctx, cancelFn := context.WithCancel(context.Background())

return &remoteMasterStream{
ctx: ctx,
cancelFn: cancelFn,
stream: stream,
recvMsgCh: make(chan *proto.MasterPayload, 1),
sendMsgCh: make(chan *proto.WorkerPayload, 1),
}
}

As you can see in the previous code snippet, the constructor creates a cancelable context and allocates a pair of channels to be used for interfacing with the stream.

Just as we did for the remoteWorkerStream implementation, we will define a pair of convenience methods for accessing these channels, as follows:

func (s *remoteMasterStream) RecvFromMasterChan() <-chan *proto.MasterPayload {
return s.recvMsgCh
}

func (s *remoteMasterStream) SendToMasterChan() chan<- *proto.WorkerPayload {
return s.sendMsgCh
}

The HandleSendRecv method is responsible for receiving incoming messages from the master and for transmitting outgoing messages from the worker.

As you can see in the following block of code, the implementation is more or less the same as the remoteWorkerStream implementation with two small differences. Can you spot them? Take a look:

func (s *remoteMasterStream) HandleSendRecv() error {
defer func() {
s.cancelFn()
_ = s.stream.CloseSend()
}()
go s.handleRecv()
for {
select {
case wPayload := <-s.sendMsgCh:
if err := s.stream.Send(wPayload); err != nil && !xerrors.Is(err, io.EOF) {
return err
}
case <-s.ctx.Done():
return nil
}
}
}

The first difference has to do with the way we handle errors returned by the stream's Send method. If the worker closes the send stream while the preceding block of code is attempting to send a payload to the master, Send will return an io.EOF error to let us know that we cannot send any more messages through the stream. Since the worker is the one that controls the send stream, we treat io.EOF errors as expected and ignore them.

Secondly, as the worker is the initiator of the RPC, it is not allowed to terminate the send stream with a specific error code as we did in the case of the master stream implementation. Consequently, for this implementation, there is no need to maintain (and poll) a dedicated error channel.

On the other hand, the following receive side code is implemented in exactly the same way as remoteMasterStream:

func (s *remoteMasterStream) handleRecv() {
for {
mPayload, err := s.stream.Recv()
if err != nil {
s.handleDisconnect()
s.cancelFn()
return
}
select {
case s.recvMsgCh <- mPayload:
case <-s.ctx.Done():
return
}
}
}

To actually shut down the stream and cause the HandleSendRecv method to exit, the worker can invoke the Close method of remoteMasterStream:

func (s *remoteMasterStream) Close() {
s.cancelFn()
}

The Close method first cancels the context monitored by the select blocks in both the receive and send code. As we discussed a few lines preceding, the latter action will cause any pending Send calls to fail with an io.EOF error and allow the HandleSendRecv method to return. Furthermore, the cancelation of the context enables the handleRecv goroutine to also return, hence ensuring that our implementation is not leaking any goroutines.