Once the master node creates a new masterJobCoordinator instance, it invokes its RunJob method to kick off the execution of the job. Since the method is a bit lengthy, we will break it down into a set of smaller blocks:
execFactory := newMasterExecutorFactory(c.cfg.serializer, c.barrier) executor, err := c.cfg.jobRunner.StartJob(c.cfg.jobDetails, execFactory) if err != nil { c.cancelJobCtx() return xerrors.Errorf("unable to start job on master: %w", err) } for assignedPartition, w := range c.cfg.workers { w.SetDisconnectCallback(c.handleWorkerDisconnect) if err := c.publishJobDetails(w, assignedPartition); err != nil { c.cfg.jobRunner.AbortJob(c.cfg.jobDetails) c.cancelJobCtx() return err } }
The first two lines in the previous block should look a bit familiar. We are following exactly the same initialization pattern as we did with the worker coordinator's implementation, which is we first create our custom executor factory and invoke the user-provided StartJob method to obtain an executor for the graph algorithm. Then, we iterate the list of worker streams and invoke the publishJobDetails helper to construct and send a JobDetails payload to each connected worker.
But how does the publishJobDetails method actually figure what UUID range to include in each outgoing JobDetails message? If you recall from Chapter 10, Building, Packaging, and Deploying Software, the Range type provides the PartitionExtents convenience method, which gives a partition number in the [0, numPartitions) range. It returns the UUID values that correspond to the beginning and end of the requested partition. So, all we need to do here is to treat the worker's index in the worker list as the partition number assigned to the worker!
Once the JobDetails payloads are broadcast by the master and received by the workers, each worker will create its own local job coordinator and begin executing the job just as we saw in the previous section.
As the master is dealing with multiple worker streams, we need to spin up a goroutine for handling incoming payloads from each worker. To ensure that all goroutines properly exit before RunJob returns, we will make use of sync.WaitGroup:
var wg sync.WaitGroup wg.Add(len(c.cfg.workers)) graph := executor.Graph() for workerIndex, worker := range c.cfg.workers { go func(workerIndex int, worker *remoteWorkerStream) { defer wg.Done() c.handleWorkerPayloads(workerIndex, worker, graph) }(workerIndex, worker) }
While our goroutines are busy handling incoming payloads, the master executes the various stages of the graph's state machine:
if err = c.runJobToCompletion(executor); err != nil { c.cfg.jobRunner.AbortJob(c.cfg.jobDetails) if xerrors.Is(err, context.Canceled) { err = errJobAborted } } c.cancelJobCtx() wg.Wait() // wait for any spawned goroutines to exit before returning. return err }
Once the job execution completes (with or without an error), the job context is canceled to send a stop signal to any still-running payload processing goroutines. The RunJob method then blocks until all goroutines exit and then returns.