The workers' executor factory

To create a suitable executor factory for workers, we can use the following helper function:

func newWorkerExecutorFactory(serializer Serializer, barrier *workerStepBarrier) bspgraph.ExecutorFactory {
f := &workerExecutorFactory{ serializer: serializer, barrier: barrier }
return func(g *bspgraph.Graph, cb bspgraph.ExecutorCallbacks) *bspgraph.Executor {
f.origCallbacks = cb
patchedCb := bspgraph.ExecutorCallbacks{
PreStep: f.preStepCallback,
PostStep: f.postStepCallback,
PostStepKeepRunning: f.postStepKeepRunningCallback,
}
return bspgraph.NewExecutor(g, patchedCb)
}
}

The newWorkerExecutorFactory function expects two arguments, namely a Serializer instance and an initialized workerStepBarrier object. The serializer instance is responsible for serializing and unserializing the aggregator values to and from the any.Any protocol buffer messages that workers exchange with the master when they enter or exit the various step barriers. In the following code, you can see the definition of the Serializer interface:

type Serializer interface {
Serialize(interface{}) (*any.Any, error)
Unserialize(*any.Any) (interface{}, error)
}

As you can see in the preceding code snippet, the newWorkerExecutorFactory function allocates a new workerExecutorFactory value and returns a closure that satisfies the ExecutorFactory signature. When the generated factory function is invoked, its implementation captures the original callbacks and invokes the real executor constructor with a set of patched callbacks.

Let's take a look at what happens inside each one of the patched callbacks, starting with the one responsible for handling the PRE step:

func (f *workerExecutorFactory) preStepCallback(ctx context.Context, g *bspgraph.Graph) error {
if _, err := f.barrier.Wait(&proto.Step{Type: proto.Step_PRE}); err != nil {
return err
}

if f.origCallbacks.PreStep != nil {
return f.origCallbacks.PreStep(ctx, g)
}
return nil
}

As you can see, the callback immediately joins the barrier and, once instructed to exit, it invokes the original (if defined) PRE step callback. The following code shows the next callback on our list, invoked immediately after executing a graph super-step:

func (f *workerExecutorFactory) postStepCallback(ctx context.Context, g *bspgraph.Graph, activeInStep int) error {
aggrDeltas, err := serializeAggregatorDeltas(g, f.serializer)
if err != nil {
return xerrors.Errorf("unable to serialize aggregator deltas")
}
stepUpdateMsg, err := f.barrier.Wait(&proto.Step{
Type: proto.Step_POST,
AggregatorValues: aggrDeltas,
})
if err != nil {
return err
} else if err = setAggregatorValues(g, stepUpdateMsg.AggregatorValues, f.serializer); err != nil {
return err
} else if f.origCallbacks.PostStep != nil {
return f.origCallbacks.PostStep(ctx, g, activeInStep)
}
return nil
}

We mentioned before that, during the POST step, workers must transmit their partial aggregator deltas to the master when they enter the POST step barrier. This is exactly what happens in the preceding previous snippet.

The serializeAggregatorDeltas helper function iterates the list of aggregators that are defined on the graph and uses the provided Serializer instance to convert them into map[string]*any.Any. The map with the serialized deltas is then attached to a Step message and sent to the master via the barrier's Wait method.

The master tallies the deltas from each worker and broadcasts back a new Step message that contains the updated set of global aggregator values. Once we receive the updated message, we invoke the setAggregatorValues helper, which unserializes the incoming map[string]*any.Any map entries and overwrites the aggregator values for the local graph instance. Before returning, the callback wrapper invokes the original user-defined POST step callback if one is actually defined.

The last callback wrapper implementation that we will inspect is the one invoked for the POST_KEEP_RUNNING step, given as follows:

func (f *workerExecutorFactory) postStepKeepRunningCallback(ctx context.Context, g *bspgraph.Graph, activeInStep int) (bool, error) {
stepUpdateMsg, err := f.barrier.Wait(&proto.Step{
Type: proto.Step_POST_KEEP_RUNNING,
ActiveInStep: int64(activeInStep),
})
if err != nil {
return false, err
}

if f.origCallbacks.PostStepKeepRunning != nil {
return f.origCallbacks.PostStepKeepRunning(ctx, g, int(stepUpdateMsg.ActiveInStep))
}
return true, nil
}

As with every other callback wrapper implementation, the first thing we do is to enter the barrier for the current step type. Note that the outgoing Step message includes the local number of active vertices in this step. The response we get back from the master includes the global number of active vertices, which is the actual value that must be passed to the user-defined callback for this step.