Demuxing is a little bit more complicated because we don't need to receive data blindly in one goroutine or another – we actually need to synchronize a series of channels. A good approach to avoid race conditions is to create another channel where all the data from the various input channels will be received. We also need to make sure that this merge channel gets closed once all the channels are done. We also have to keep in mind that the channel will be closed if the context is cancelled. We are using sync.Waitgroup here to wait for all the channels to finish:
wg := sync.WaitGroup{}
merge := make(chan map[string]int)
wg.Add(len(src))
go func() {
wg.Wait()
close(merge)
}()
The problem is that we have two possible triggers for closing the channel: regular transmission ending and context cancelling.
We have to make sure that if the context ends, no message is sent to the outbound channel. Here, we are collecting the values from the input channels and sending them to the merge channel, but only if the context isn't complete. We do this in order to avoid a send operation being sent to a closed channel, which would make our application panic:
for _, ch := range src {
go func(ch <-chan map[string]int) {
defer wg.Done()
for v := range ch {
select {
case <-ctx.Done():
return
case merge <- v:
}
}
}(ch)
}
Finally, we can focus on the last operation, which uses the merge channel to execute our final word count:
count := make(map[string]int)
for {
select {
case <-ctx.Done():
return count
case c, ok := <-merge:
if !ok {
return count
}
for k, v := range c {
count[k] += v
}
}
}
The application's main function, with the addition of the fan-in, will look as follows:
func main() {
ctx, canc := context.WithCancel(context.Background())
defer canc()
src := SourceLineWords(ctx, ioutil.NopCloser(strings.NewReader(cantoUno)))
count1, count2 := WordOccurrence(ctx, src), WordOccurrence(ctx, src)
final := MergeCounts(ctx, count1, count2)
fmt.Println(final)
}
We can see that the fan-in is the most complex and critical part of the application. Let's recap the decisions that helped build a fan-in function that is free from panic or deadlock:
- Use a merge channel to collect values from the various input.
- Have sync.WaitGroup with a counter equal to the number of input channels.
- Use it in a separate goroutine and wait for it to close the channel.
- For each input channel, create a goroutine that transfers the values to the merge channel.
- Ensure that you send the record only if the context is not complete.
- Use the wait group's done function before exiting such a goroutine.
Following the preceding steps will allow us to use the merge channel with a simple range. In our example, we are also checking whether the context is complete before receiving from the channel in order to allow for an early exit from the goroutine.