The runJobToCompletion implementation for the master job coordinator is nearly identical to the one used by the worker:
func (c *masterJobCoordinator) runJobToCompletion(executor *bspgraph.Executor) error { if err := executor.RunToCompletion(c.jobCtx); err != nil { return err } else if _, err := c.barrier.WaitForWorkers(proto.Step_EXECUTED_GRAPH); err != nil { return err } else if err := c.barrier.NotifyWorkers(&proto.Step{Type: proto.Step_EXECUTED_GRAPH}); err != nil { return err } else if err := c.cfg.jobRunner.CompleteJob(c.cfg.jobDetails); err != nil { return err } else if _, err := c.barrier.WaitForWorkers(proto.Step_PESISTED_RESULTS); err != nil { return err } else if err := c.barrier.NotifyWorkers(&proto.Step{Type: proto.Step_PESISTED_RESULTS}); err != nil { return err } else if _, err := c.barrier.WaitForWorkers(proto.Step_COMPLETED_JOB); err != nil { return err } return nil }
Again, the user-defined algorithm is executed until the terminating condition is met. Assuming that no error occurred, the master simply waits for all workers to transition through the remaining steps of the graph execution state machine (EXECUTED_GRAPH, PERSISTED_RESULTS, and COMPLETED_JOB).
Note that, in the preceding implementation, the master does not invoke NotifyWorkers on the barrier for the COMPLETED_JOB step. This is intentional; once all workers reach this stage, there is no further operation that needs to be performed. We can simply go ahead and close each workers' job stream.