To create a new worker, the user of the package invokes the NewWorker constructor, which returns a new Worker instance. The definition of the Worker type looks as follows:
The Worker type stores the following:
- The client gRPC connection to the master
- An instance of the JobQueueClient that the protoc compiler has automatically generated for us from the RPC definition for the job queue
- The required components for interfacing with the user's bspgraph-based algorithm implementation (that is, a job Runner and Serializer for graph messages and aggregator values)
After obtaining a new Worker instance, the user has to connect to the master by invoking the worker's Dial method:
func (w *Worker) Dial(masterEndpoint string, dialTimeout time.Duration) error { var dialCtx context.Context if dialTimeout != 0 { var cancelFn func() dialCtx, cancelFn = context.WithTimeout(context.Background(), dialTimeout) defer cancelFn() } conn, err := grpc.DialContext(dialCtx, masterEndpoint, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return xerrors.Errorf("unable to dial master: %w", err) } w.masterConn = conn w.masterCli = proto.NewJobQueueClient(conn) return nil }
Once a connection to the master has been successfully established, the user can ask the worker to fetch and execute the next job from the master by invoking the worker's RunJob method. Let's see what happens within that method:
stream, err := w.masterCli.JobStream(ctx) if err != nil { return err } w.cfg.Logger.Info("waiting for next job") jobDetails, err := w.waitForJob(stream) if err != nil { return err }
First of all, the worker makes an RPC call to the job queue and obtains a gRPC stream. Then, the worker invokes the waitForJob helper, which performs a blocking Recv operation on the stream and waits for the master to publish a job details payload. After the payload is obtained, its contents are validated and unpacked into a job.Details instance, which is returned to the RunJob method:
masterStream := newRemoteMasterStream(stream) jobLogger := w.cfg.Logger.WithField("job_id", jobDetails.JobID) coordinator := newWorkerJobCoordinator(ctx, workerJobCoordinatorConfig{ jobDetails: jobDetails, masterStream: masterStream, jobRunner: w.cfg.JobRunner, serializer: w.cfg.Serializer, logger: jobLogger, })
Next, the worker initializes the required components for executing the job. As you can see in the previous code, we create a wrapper for the stream and pass it as an argument to the job coordinator constructor.
We are now ready to delegate the job execution to the coordinator! However, before we do that, there is one last thing we need to do, that is, we need to fire up a dedicated goroutine for handling the send and receive ends of the wrapped stream:
var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() if err := masterStream.HandleSendRecv(); err != nil { coordinator.cancelJobCtx() } }()
Finally, we invoke the coordinator's RunJob method and emit a logline depending on whether the job succeeded or failed:
if err = coordinator.RunJob(); err != nil { jobLogger.WithField("err", err).Error("job execution failed") } else { jobLogger.Info("job completed successfully") } masterStream.Close() wg.Wait() return err
Just as we did so far with all other blocks of code that spin up goroutines, before returning from the RunJob method, we terminate the RPC stream (but leave the client connection intact for the next RPC call) and wait until the stream-handling goroutines cleanly exits.
Let's move on to defining the necessary APIs for creating new master instances.