Piping patterns

As in real-life plumbing, Node.js streams also can be piped together following different patterns; we can, in fact, merge the flow of two different streams into one, split the flow of one stream into two or more pipes, or redirect the flow based on a condition. In this section, we are going to explore the most important plumbing techniques that can be applied to Node.js streams.

In this chapter, we stressed a lot on the fact that streams provide a simple infrastructure to modularize and reuse our code, but there is one last piece missing in this puzzle: what if we want to modularize and reuse an entire pipeline? What if we want to combine multiple streams so that they look like one from the outside? The following figure shows what this means:

Combining streams

From the preceding diagram, we should already get a hint of how this works:

A combined stream is usually a Duplex stream, which is built by connecting the first stream to its Writable side and the last stream to its Readable side.

But that's not enough; in fact, another important characteristic of a combined stream is that it has to capture all the errors that are emitted from any stream inside the pipeline. As we already mentioned, any error event is not automatically propagated down the pipeline; so, if we want to have proper error management (and we should), we will have to explicitly attach an error listener to each stream. However, if the combined stream is really a black box, this means that we don't have access to any of the streams in the middle of the pipeline; so it's crucial for the combined stream to also act as an aggregator for all the errors coming from any stream in the pipeline.

To recap, a combined stream has two major advantages:

Combining streams is a pretty generic and common practice, so if we don't have any particular need we might just want to reuse an existing solution such as multipipe (https://www.npmjs.org/package/multipipe) or combine-stream (https://www.npmjs.org/package/combine-stream), just to name a few.

To make a simple example, let's consider the case of the following two transform streams:

Using a library such as multipipe, we can easily build these streams by combining some of the streams that we already have available from the core libraries (file 'combinedStreams.js'):

We can now use these combined streams, as if they were black boxes, for example, to create a small application that archives a file, by compressing and encrypting it. Let's do that in a new module named archive.js:

We can further improve the preceding code by building a combined stream out of the pipeline that we created, this time not to obtain a reusable black box but only to take advantage of its aggregated error management. In fact, as we already mentioned many times, writing something such as the following will only catch the errors that are emitted by the last stream:

However, by combining all the streams together we can fix the problem elegantly. Let's then rewrite the 'archive.js' file as follows:

As we can see, we can now attach an error listener directly to the combined stream and it will receive any error event that is emitted by any of its internal streams.

Now, to run the archive module, simply specify a password and a file in the command line arguments:

With this example, we have clearly demonstrated how important it is to combine streams; from one aspect, it allows us to create reusable compositions of streams and from another it simplifies the error management of a pipeline.

We can perform a fork of a stream by piping a single Readable stream into multiple Writable streams. This is useful when we want to send the same data to different destinations, for example, two different sockets or two different files. It can also be used when we want to perform different transformations on the same data, or when we want to split the data based on some criteria. The following figure gives us a graphical representation of this pattern:

Forking streams

Forking a stream in Node.js is a trivial matter; let's see why by working on an example.

Let's create a small utility that outputs both the sha1 and md5 hashes of a given file. Let's call this new module generateHashes.js and let's start by initializing our checksum streams:

Nothing special so far; the next part of the module is actually where we will create a Readable stream from a file and fork it to two different streams in order to obtain two other files, one containing the sha1 hash and the other containing the md5 checksum:

Very simple, right? The inputStream variable is piped into sha1Stream on one side and md5Stream on the other. There are a couple of things to note, though, that happen behind the scenes:

Merging is the opposite operation to forking and consists of piping a set of Readable streams into a single Writable stream, as shown in the following figure:

Merging streams

Merging multiple streams into one is in general a simple operation; however, we have to pay attention to the way we handle the end event, as piping using the auto end option will cause the destination stream to be ended as soon as one of the sources ends. This can often lead to an error situation, as the other active sources will still continue to write to an already terminated stream. The solution to this problem is to use the option {end: false} when piping multiple sources to a single destination and then invoke end() on the destination only when all the sources have completed reading.

To make a simple example, let's implement a small program that creates a tarball from the contents of two different directories. For this purpose, we are going to introduce two new npm packages:

Our new module is going to be called mergeTar.js; let's define its contents starting from some initialization steps:

In the preceding code, we are just loading all the dependencies and initializing the variables that contain the name of the destination file and the two source directories (sourceA and sourceB).

Next, we will create the tar stream and pipe it into its destination:

Now it's time to initialize the source streams:

In the preceding code, we created the streams that read from both the two source directories (sourceStreamA and sourceStreamB); then for each source stream we attach an end listener, which will terminate the pack stream only when both the directories are read completely.

Finally, it is time to perform the real merge:

We pipe both the sources into the pack stream and take care to disable the auto ending of the destination stream by providing the option {end: false} to the two pipe() invocations.

With this, we completed our simple tar utility. We can try this utility by providing the destination file as the first command line argument, followed by the two source directories:

To conclude this section, it's worth mentioning that, on npm, we can find a few modules that can simplify the merging of streams, for example:

As for the last comment on the stream merge pattern, it's worth reminding that the data piped into the destination stream is randomly intermingled; this is a property that can be acceptable in some types of object streams (as we saw in the last example) but it is often an undesired effect when dealing with binary streams.

However, there is one variation of the pattern that allows us to merge streams in order; it consists of consuming the source streams one after the other, when the previous one ends, the next one starts emitting chunks (it is like concatenating the output of all the sources). As always, on npm we can find some packages that also deal with this situation, one of them is multistream (https://npmjs.org/package/multistream).

There is a particular variation of the merge stream pattern in which we don't really want to just join multiple streams together but, instead, to use a shared channel to deliver the data of a set of streams. This is a conceptually different operation because the source streams remain logically separated inside the shared channel, which allows us to split the stream again once the data reaches the other end of the shared channel. The following figure clarifies the situation:

Multiplexing and demultiplexing

The operation of combining multiple streams together (in this case also known as channels) to allow transmission over a single stream is called multiplexing, while the opposite operation, namely reconstructing the original streams from the data received from a shared stream, is called demultiplexing. The devices that perform these operations are called multiplexer (or mux) and demultiplexer (or demux) respectively. This is a widely studied area in Computer Science and Telecommunications in general, as it is one of the foundations of almost any type of communication media such as telephony, radio, TV, and of course the Internet itself. For the scope of this book, we will not go too far with the explanations, as this is a vast topic.

What we want to demonstrate in this section, instead, is how it's possible to use a shared Node.js stream in order to convey multiple logically separated streams that are then split again at the other end of the shared stream.

Let's use an example to drive our discussion. We want to have a small program that starts a child process and redirects both its standard output and standard error to a remote server, which in turn saves the two streams into two separate files. So, in this case the shared medium is a TCP connection, while the two channels to be multiplexed are the stdout and stderr of a child process. We will leverage a technique called packet switching, the same technique that is used by protocols such as IP, TCP or UDP and that consists of wrapping the data into packets allowing us to specify various meta information, useful for mutiplexing, routing, controlling the flow, checking for corrupted data, and so on. The protocol that we are going to implement for our example is very minimalist, in fact, we will simply wrap our data into packets having the following structure:

Building a remote logger

As shown in the preceding figure, the packet contains the actual data, but also a header (Channel ID + Data length), which will make it possible to differentiate the data of each stream and enable the demultiplexer to route the packet to the right channel.

Let's start to build our application from the client side. With a lot of creativity, we will call the module client.js; this represents the part of the application that is responsible for starting a child process and multiplexing its streams.

So, let's start by defining the module. First, we need some dependencies:

Then, let's implement a function that performs the multiplexing of a list of sources:

The mutiplexChannels() function takes in as input the source streams to be multiplexed and the destination channel, and then it performs the following steps:

Now the last part of our client becomes very easy:

In this last code fragment, we perform the following operations::

Now we can take care of creating the server side of the application (server.js), where we demultiplex the streams from the remote connection and pipe them into two different files. Let's start by creating a function called demultiplexChannel():

The preceding code might look complicated but it is not; thanks to the pull nature of Node.js Readable streams, we can easily implement the demultiplexing of our little protocol as follows:

Now that we can demultiplex the source stream, let's put our new function at work:

In the preceding code, we first start a TCP server on the port 3000, then for each connection that we receive, we will create two Writable streams pointing to two different files, one for the standard output and another for the standard error; these are our destination channels. Finally, we use demultiplexChannel() to demultiplex the socket stream into stdoutStream and stderrStream.