In an asynchronous pipeline design, once a stage processes an incoming payload and emits it to the next stage, it can immediately begin processing the next available payload without having to wait for the currently processed payload to exit the pipeline, as would be the case in a synchronous pipeline design. This approach ensures that all stages are continuously kept busy processing payloads instead of idling.
It is important to note that asynchronous pipelines typically require some form of concurrency. A common pattern is to run each stage in a separate goroutine. Of course, this introduces additional complexity to the mix as we need to do the following:
- Manage the lifecycle of each goroutine
- Make use of concurrency primitives, such as locks, to avoid data races
Nevertheless, asynchronous pipelines have much better throughput characteristics compared to synchronous pipelines. This is the main reason why the pipeline package that we will be building in this chapter will feature an asynchronous pipeline implementation... with a small twist! Even though all pipeline components (input, output, and stages) will be running asynchronously, end users will be interacting with the pipeline using a synchronous API.
A quick survey of the most popular Go software development kits (SDKs) out there will reveal a general consensus toward exposing synchronous APIs. From the perspective of the API consumer, synchronous APIs are definitely easier to consume as the end user does not need to worry about managing resources, such as Go channels, or writing complex select statements to coordinate reads and/or writes between channels. Contrast this approach with having an asynchronous API, where the end user would have to deal with an input, output, and error channel every time they wanted to execute a pipeline run!
As mentioned previously, the pipeline internals will be executing asynchronously. The typical way to accomplish this in Go would be to start a goroutine for each pipeline component and link the individual goroutines together by means of Go channels. The pipeline implementation will be responsible for fully managing the lifecycle of any goroutine it spins up, in a way that is totally transparent to the end user of the pipeline package.
Failure to heed this bit of advice can introduce goroutine leaks in long-running applications that typically require quite a bit of time and effort to track down.
Exposing a synchronous API for the pipeline package has yet another benefit that we haven't yet mentioned. It is pretty trivial for the end users of the pipeline package to wrap the synchronous API in a goroutine and make it asynchronous. The goroutine would simply invoke the blocking code and use a channel to signal the application code when the pipeline execution has completed.