The 1-to-N broadcasting pattern allows us to support use cases where each incoming payload must to be processed in parallel by N different processors, each one of which implements FIFO-like semantics.
The following code is the definition of the broadcast type and the Broadcast helper function that serves as its constructor:
type broadcast struct { fifos []StageRunner } func Broadcast(procs ...Processor) StageRunner { if len(procs) == 0 { panic("Broadcast: at least one processor must be specified") } fifos := make([]StageRunner, len(procs)) for i, p := range procs { fifos[i] = FIFO(p) } return &broadcast{fifos: fifos} }
As you can see, the variadic Broadcast function receives a list of Processor instances as arguments and creates a FIFO instance for each one. These FIFO instances are stored inside the returned broadcast instance and used within its Run method implementation, which we will be dissecting as follows:
var wg sync.WaitGroup var inCh = make([]chan Payload, len(b.fifos)) for i := 0; i < len(b.fifos); i++ { wg.Add(1) inCh[i] = make(chan Payload) go func(fifoIndex int) { fifoParams := &workerParams{ stage: params.StageIndex(), inCh: inCh[fifoIndex], outCh: params.Output(), errCh: params.Error(), } b.fifos[fifoIndex].Run(ctx, fifoParams) wg.Done() }(i) }
Similar to the fixed worker pool implementation that we examined in the previous section, the first thing that we do inside Run is to spawn up a goroutine for each FIFO StageRunner instance. A sync.WaitGroup allows us to wait for all workers to exit before Run can return.
To avoid data races, the implementation for the broadcasting stage must intercept each incoming payload, clone it, and deliver a copy to each one of the generated FIFO processors. Consequently, the generated FIFO processor instances cannot be directly wired to the input channel for the stage, but must instead be configured with a dedicated input channel for reading . To this end, the preceding block of code generates a new workerParams value (an internal type to the pipeline package that implements the StageParams interface) for each FIFO instance and supplies it as an argument to its Run method. Note that while each FIFO instance is configured with a separate input channel, they all share the same output and error channels.
The next part of the Run method's implementation is the, by now familiar, main loop where we wait for the next incoming payload to appear:
done: for { // Read incoming payloads and pass them to each FIFO select { case <-ctx.Done(): break done case payload, ok := <-params.Input(): if !ok { break done } // Clone payload and dispatch to each FIFO worker... // (see following listing) } }
Once a new payload is received, the implementation writes a copy of the payload to the input channel for each FIFO instance, but the first one receives the original incoming payload:
for i := len(b.fifos) - 1; i >= 0; i-- { var fifoPayload = payload if i != 0 { fifoPayload = payload.Clone() } select { case <-ctx.Done(): break done case inCh[i] <- fifoPayload: // payload sent to i_th FIFO } }
After publishing the payload to all FIFO instances, a new iteration of the main loop begins. The main loop keeps executing until either the input channel closes or the context gets cancelled. After exiting the main loop, the following sentinel block of code gets executed before Run returns:
// Close input channels and wait for all FIFOs to exit for _, ch := range inCh { close(ch) } wg.Wait()
In the preceding code snippet, we signal each one of the FIFO workers to shut down by closing their dedicated input channels. We then invoke the Wait method of the WaitGroup to wait for all FIFO workers to terminate.