The first and most crucial question we need to answer before we begin working on the pipeline package implementation is how can we describe pipeline payloads in a generic way using Go?
The kind of obvious answer to this question is to define payloads as empty interface values (an interface{} in Go terminology). The key argument in favor of this approach is that the pipeline internals shouldn't really care about payloads per se; all the pipeline needs to do is shuttle payloads between the various pipeline stages.
The interpretation of the payload contents (for example, by casting the input to a known type) should be the sole responsibility of the processing functions that execute at each stage. Given that the processing functions are specified by the end user of the pipeline, this approach would probably be a good fit for our particular requirements.
However, as Rob Pike quite eloquently puts it in one of his famous Go proverbs, interface{} says nothing. There is quite a bit of truth in that statement. The empty interface conveys no useful information about the underlying type. As a matter of fact, if we were to follow the empty interface approach, we would be effectively disabling the Go compiler's ability to do static type checking of some parts of our code base!
On one hand, the use of empty interfaces is generally considered an antipattern by the Go community and is therefore a practice we would ideally want to avoid. On the other hand, Go has no support for generics, which makes it much more difficult to write code that can work with objects whose type is not known in advance. So, instead of trying to find a silver bullet solution to this problem, let's try to compromise: how about we try to enforce a set of common operations that all payload types must support and create a Payload interface to describe them? That would give us an extra layer of type-safety while still making it possible for pipeline processor functions to cast incoming payloads to the type they expect. Here is a possible definition for the Payload interface:
// Payload is implemented by values that can be sent through a pipeline. type Payload interface { Clone() Payload MarkAsProcessed() }
As you can see, we expect that, regardless of the way that a payload is defined, it must be able to perform at least two simple (and quite common) operations:
- Perform a deep-copy of itself: As we will see in one of the following sections, this operation will be required for avoiding data races when multiple processors are operating on the same payload concurrently.
- Mark itself as processed: Payloads are considered to be processed when they either reach the end of the pipeline (the sink) or if they are discarded at an intermediate pipeline stage. Having such a method invoked on payloads when they exit the pipeline is quite useful for scenarios where we are interested in collecting per-payload metrics (total processing time, time spent in the queue before entering the pipeline, and so on).