Running a new job

Once the worker receives a new job assignment from the master, it calls the coordinator's constructor and then invokes its RunJob method, which blocks until the job either completes or an error occurs:

func (c *workerJobCoordinator) RunJob() error {
// ...
}

Let's break down the RunJob implementation into smaller chunks and go through each one:

execFactory := newWorkerExecutorFactory(c.cfg.serializer, c.barrier)
executor, err := c.cfg.jobRunner.StartJob(c.cfg.jobDetails, execFactory)
if err != nil {
c.cancelJobCtx()
return xerrors.Errorf("unable to start job on worker: %w", err)
}

graph := executor.Graph()
graph.RegisterRelayer(bspgraph.RelayerFunc(c.relayNonLocalMessage))

The very first thing that RunJob does is to create a workerExecutor factory using the configured serializer and the barrier instance that the constructor already set up. Then, the StartJob method of the user-provided job.Runner is invoked to initialize the graph and return an Executor value that we can use. Note that, up to this point, our code is totally oblivious to how the user-defined algorithm works!

The next step entails the extraction of the bspgraph.Graph instance from the returned Executor instance and the registration of a bspgraph.Relayer helper, which the graph will automatically invoke when a vertex attempts to send a message with an ID that is not recognized by the local graph instance. We will take a closer look at the relayNonLocalMessage method implementation in one of the following sections where we will be discussing the concept of message relaying in more detail. This completes all of the required initialization steps. We are now ready to commence the execution of the graph compute job!

To not only monitor the health of the connection to the master but also asynchronously process any incoming payloads, we will spin up a goroutine:

While our goroutine is busy processing incoming payloads, RunJob invokes the runJobToCompletion helper method that advances through the various stages of the graph execution's state machine. If an error occurs, we invoke the user's AbortJob method and then proceed to check the cause of the error.

If the job execution failed due to a context cancelation, we replace the error with the more meaningful, typed errJobAborted error. On the other hand, if the handleMasterPayloads method reported a more interesting error, we overwrite the returned error value with the reported error:

Before returning, we cancel the job context to trigger a teardown of not only the barrier but also the spawned payload-handling goroutine and Wait on the wait group until the goroutine exits.