As we mentioned previously, vertices communicate with each other by exchanging messages. Sending the same message to all immediate neighbors of a particular vertex is an often recurring pattern in several graph algorithms. Let's define a convenience method for handling this fairly common use case:
func (g *Graph) BroadcastToNeighbors(v *Vertex, msg message.Message) error { for _, e := range v.edges { if err := g.SendMessage(e.dstID, msg); err != nil { return err } } return nil }
BroadcastToNeighbors simply iterates the list of edges for a particular vertex and attempts to send the message to each neighbor with the help of the SendMessage method. With the help of SendMessage, compute functions can send a message to any vertex in the graph, provided that its ID is known to them (for example, discovered through the use of a gossip protocol).
Let's take a look at the implementation for SendMessage:
func (g *Graph) SendMessage(dstID string, msg message.Message) error { dstVert := g.vertices[dstID] if dstVert != nil { queueIndex := (g.superstep + 1) % 2 return dstVert.msgQueue[queueIndex].Enqueue(msg) } if g.relayer != nil { if err := g.relayer.Relay(dstID, msg); !xerrors.Is(err, ErrDestinationIsLocal) { return err } } return xerrors.Errorf("message cannot be delivered to %q: %w", dstID, ErrInvalidMessageDestination) }
First things first, we need to look up the destination vertex in the graph's vertex map. If the lookup yields a valid Vertex instance, then we can enqueue the message so that it can be delivered to the vertex in the following super-step.
Things get a bit more interesting when the vertex lookup fails… A failed lookup can occur because of two reasons:
- We are running in distributed mode and the vertex is managed by a remote graph instance
- The vertex simply does not exist
To handle vertices that are potentially hosted remotely, the Graph type allows users of the bspgraph package to register a helper that can relay messages between remote graph instances. More specifically, these helpers:
- Are aware of the topology of a distributed graph (that is, the vertex ID ranges that are managed by each node in a cluster)
- Provide a mechanism for shuttling messages back and forth between the cluster nodes
User-defined relay helpers must implement the Relayer interface and can be registered with a graph instance through the RegisterRelayer method:
type Relayer interface { // Relay a message to a vertex that is not known locally. Calls // to Relay must return ErrDestinationIsLocal if the provided dst value // is not a valid remote destination. Relay(dst string, msg message.Message) error } func (g *Graph) RegisterRelayer(relayer Relayer) { g.relayer = relayer }
To make it easier for users to provide functions or closures as a suitable Relayer implementation, let's also go ahead and define the RelayerFunc adapter, which converts a function with the appropriate signature into a Relayer:
type RelayerFunc func(string, message.Message) error // Relay calls f(dst, msg). func (f RelayerFunc) Relay(dst string, msg message.Message) error { return f(dst, msg) }
If the destination vertex ID cannot be located by the graph and the user has registered a Relayer instance, SendMessage invokes its Relay method and checks the response for errors. If we get an error other than ErrDestinationLocal, we return the error as-is back to the caller.
If the relay helper detects that the destination vertex ID should, in fact, be managed by the local graph instance, it will fail with the typed ErrDestinationIsLocal error to indicate this. In such a case, we assume that the vertex ID is invalid and return the typed ErrInvalidMessageDestination error to the caller.