As in real-life plumbing, Node.js streams also can be piped together following different patterns; we can, in fact, merge the flow of two different streams into one, split the flow of one stream into two or more pipes, or redirect the flow based on a condition. In this section, we are going to explore the most important plumbing techniques that can be applied to Node.js streams.
In this chapter, we stressed a lot on the fact that streams provide a simple infrastructure to modularize and reuse our code, but there is one last piece missing in this puzzle: what if we want to modularize and reuse an entire pipeline? What if we want to combine multiple streams so that they look like one from the outside? The following figure shows what this means:
From the preceding diagram, we should already get a hint of how this works:
A combined stream is usually a Duplex
stream, which is built by connecting the first stream to its Writable
side and the last stream to its Readable
side.
To create a Duplex
stream out of two different streams, one Writable
and one Readable
, we can use an npm
module such as duplexer2
(https://npmjs.org/package/duplexer2).
But that's not enough; in fact, another important characteristic of a combined stream is that it has to capture all the errors that are emitted from any stream inside the pipeline. As we already mentioned, any error
event is not automatically propagated down the pipeline; so, if we want to have proper error management (and we should), we will have to explicitly attach an error listener to each stream. However, if the combined stream is really a black box, this means that we don't have access to any of the streams in the middle of the pipeline; so it's crucial for the combined stream to also act as an aggregator for all the errors coming from any stream in the pipeline.
To recap, a combined stream has two major advantages:
error
listener to each stream in the pipeline, but just to the combined stream itselfCombining streams is a pretty generic and common practice, so if we don't have any particular need we might just want to reuse an existing solution such as multipipe
(https://www.npmjs.org/package/multipipe) or combine-stream
(https://www.npmjs.org/package/combine-stream), just to name a few.
To make a simple example, let's consider the case of the following two transform streams:
Using a library such as multipipe
, we can easily build these streams by combining some of the streams that we already have available from the core libraries (file 'combinedStreams.js'
):
var zlib = require('zlib'); var crypto = require('crypto'); var combine = require('multipipe'); var fs = require('fs'); module.exports.compressAndEncrypt = function (password) { return combine( zlib.createGzip(), crypto.createCipher('aes192', password) ); } module.exports.decryptAndDecompress = function (password) { return combine( crypto.createDecipher('aes192', password), zlib.createGunzip() ); }
We can now use these combined streams, as if they were black boxes, for example, to create a small application that archives a file, by compressing and encrypting it. Let's do that in a new module named archive.js
:
var fs = require('fs'); var compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt; fs.createReadStream(process.argv[3]) .pipe(compressAndEncryptStream(process.argv[2])) .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"));
We can further improve the preceding code by building a combined stream out of the pipeline that we created, this time not to obtain a reusable black box but only to take advantage of its aggregated error management. In fact, as we already mentioned many times, writing something such as the following will only catch the errors that are emitted by the last stream:
fs.createReadStream(process.argv[3]) .pipe(compressAndEncryptStream(process.argv[2])) .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc")) .on('error', function(err) { //Only errors from the last stream console.log(err); });
However, by combining all the streams together we can fix the problem elegantly. Let's then rewrite the 'archive.js'
file as follows:
var combine = require('multipipe'); var fs = require('fs'); var compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt; combine( fs.createReadStream(process.argv[3]), compressAndEncryptStream(process.argv[2]), fs.createWriteStream(process.argv[3] + ".gz.aes") ).on('error', function(err) { //this error may come from any stream in the pipeline console.log(err); });
As we can see, we can now attach an error listener directly to the combined stream and it will receive any error
event that is emitted by any of its internal streams.
Now, to run the archive
module, simply specify a password and a file in the command line arguments:
node archive mypassword /path/to/a/file.txt
With this example, we have clearly demonstrated how important it is to combine streams; from one aspect, it allows us to create reusable compositions of streams and from another it simplifies the error management of a pipeline.
We can perform a fork of a stream by piping a single Readable
stream into multiple Writable
streams. This is useful when we want to send the same data to different destinations, for example, two different sockets or two different files. It can also be used when we want to perform different transformations on the same data, or when we want to split the data based on some criteria. The following figure gives us a graphical representation of this pattern:
Forking a stream in Node.js is a trivial matter; let's see why by working on an example.
Let's create a small utility that outputs both the sha1
and md5
hashes of a given file. Let's call this new module generateHashes.js
and let's start by initializing our checksum streams:
var fs = require('fs'); var crypto = require('crypto'); var sha1Stream = crypto.createHash('sha1'); sha1Stream.setEncoding('base64'); var md5Stream = crypto.createHash('md5'); md5Stream.setEncoding('base64');
Nothing special so far; the next part of the module is actually where we will create a Readable
stream from a file and fork it to two different streams in order to obtain two other files, one containing the sha1
hash and the other containing the md5
checksum:
var inputFile = process.argv[2]; var inputStream = fs.createReadStream(inputFile); inputStream .pipe(sha1Stream) .pipe(fs.createWriteStream(inputFile + '.sha1')); inputStream .pipe(md5Stream) .pipe(fs.createWriteStream(inputFile + '.md5'));
Very simple, right? The inputStream
variable is piped into sha1Stream
on one side and md5Stream
on the other. There are a couple of things to note, though, that happen behind the scenes:
md5Stream
and sha1Stream
will be ended automatically when inputStream
ends, unless we specify {end: false}
as an option when invoking pipe()
inputStream
will go as fast as the slowest branch of the fork!Merging is the opposite operation to forking and consists of piping a set of Readable
streams into a single Writable
stream, as shown in the following figure:
Merging multiple streams into one is in general a simple operation; however, we have to pay attention to the way we handle the end
event, as piping using the auto end option will cause the destination stream to be ended as soon as one of the sources ends. This can often lead to an error situation, as the other active sources will still continue to write to an already terminated stream. The solution to this problem is to use the option {end: false}
when piping multiple sources to a single destination and then invoke end()
on the destination only when all the sources have completed reading.
To make a simple example, let's implement a small program that creates a tarball from the contents of two different directories. For this purpose, we are going to introduce two new npm
packages:
tar
(https://npmjs.org/package/tar): a streaming library to create tarballsfstream
(https://npmjs.org/package/fstream): a library to create object streams from filesystem filesOur new module is going to be called mergeTar.js
; let's define its contents starting from some initialization steps:
var tar = require('tar'); var fstream = require('fstream'); var path = require('path'); var destination = path.resolve(process.argv[2]); var sourceA = path.resolve(process.argv[3]); var sourceB = path.resolve(process.argv[4]);
In the preceding code, we are just loading all the dependencies and initializing the variables that contain the name of the destination
file and the two source directories (sourceA
and sourceB
).
Next, we will create the tar stream and pipe it into its destination:
var pack = tar.Pack(); pack.pipe(fstream.Writer(destination));
Now it's time to initialize the source streams:
var endCount = 0; function onEnd() { if(++endCount === 2) { pack.end(); } } var sourceStreamA = fstream.Reader({type: "Directory", path: sourceA}) .on('end', onEnd); var sourceStreamB = fstream.Reader({type: "Directory", path: sourceB}) .on('end', onEnd);
In the preceding code, we created the streams that read from both the two source directories (sourceStreamA
and sourceStreamB
); then for each source stream we attach an end
listener, which will terminate the pack
stream only when both the directories are read completely.
Finally, it is time to perform the real merge:
sourceStreamA.pipe(pack, {end: false}); sourceStreamB.pipe(pack, {end: false});
We pipe both the sources into the pack
stream and take care to disable the auto ending of the destination stream by providing the option {end: false}
to the two pipe()
invocations.
With this, we completed our simple tar utility. We can try this utility by providing the destination file as the first command line argument, followed by the two source directories:
node mergeTar dest.tar /path/to/sourceA /path/to/sourceB
To conclude this section, it's worth mentioning that, on npm
, we can find a few modules that can simplify the merging of streams, for example:
merge-stream
(https://npmjs.org/package/merge-stream)multistream-merge
(https://npmjs.org/package/multistream-merge)As for the last comment on the stream merge pattern, it's worth reminding that the data piped into the destination stream is randomly intermingled; this is a property that can be acceptable in some types of object streams (as we saw in the last example) but it is often an undesired effect when dealing with binary streams.
However, there is one variation of the pattern that allows us to merge streams in order; it consists of consuming the source streams one after the other, when the previous one ends, the next one starts emitting chunks (it is like concatenating the output of all the sources). As always, on npm
we can find some packages that also deal with this situation, one of them is multistream
(https://npmjs.org/package/multistream).
There is a particular variation of the merge stream pattern in which we don't really want to just join multiple streams together but, instead, to use a shared channel to deliver the data of a set of streams. This is a conceptually different operation because the source streams remain logically separated inside the shared channel, which allows us to split the stream again once the data reaches the other end of the shared channel. The following figure clarifies the situation:
The operation of combining multiple streams together (in this case also known as channels) to allow transmission over a single stream is called multiplexing, while the opposite operation, namely reconstructing the original streams from the data received from a shared stream, is called demultiplexing. The devices that perform these operations are called multiplexer (or mux) and demultiplexer (or demux) respectively. This is a widely studied area in Computer Science and Telecommunications in general, as it is one of the foundations of almost any type of communication media such as telephony, radio, TV, and of course the Internet itself. For the scope of this book, we will not go too far with the explanations, as this is a vast topic.
What we want to demonstrate in this section, instead, is how it's possible to use a shared Node.js stream in order to convey multiple logically separated streams that are then split again at the other end of the shared stream.
Let's use an example to drive our discussion. We want to have a small program that starts a child process and redirects both its standard output and standard error to a remote server, which in turn saves the two streams into two separate files. So, in this case the shared medium is a TCP connection, while the two channels to be multiplexed are the stdout
and stderr
of a child process. We will leverage a technique called
packet switching, the same technique that is used by protocols such as IP, TCP or UDP and that consists of wrapping the data into packets allowing us to specify various meta information, useful for mutiplexing, routing, controlling the flow, checking for corrupted data, and so on. The protocol that we are going to implement for our example is very minimalist, in fact, we will simply wrap our data into packets having the following structure:
As shown in the preceding figure, the packet contains the actual data, but also a header (Channel ID + Data length), which will make it possible to differentiate the data of each stream and enable the demultiplexer to route the packet to the right channel.
Let's start to build our application from the client side. With a lot of creativity, we will call the module client.js
; this represents the part of the application that is responsible for starting a child process and multiplexing its streams.
So, let's start by defining the module. First, we need some dependencies:
var child_process = require('child_process'); var net = require('net'); var path = require('path');
Then, let's implement a function that performs the multiplexing of a list of sources:
function multiplexChannels(sources, destination) { var totalChannels = sources.length; for(var i = 0; i < sources.length; i++) { sources[i] .on('readable', function(i) { //[1] var chunk; while((chunk = this.read()) !== null) { var outBuff = new Buffer(1 + 4 + chunk.length); //[2] outBuff.writeUInt8(i, 0); outBuff.writeUInt32BE(chunk.length, 1); chunk.copy(outBuff, 5); console.log('Sending packet to channel: ' + i); destination.write(outBuff); //[3] } }.bind(sources[i], i)) .on('end', function() { //[4] if(--totalChannels === 0) { destination.end(); } }); } }
The mutiplexChannels()
function takes in as input the source streams to be multiplexed and the destination channel, and then it performs the following steps:
readable
event where we read the data from the stream using the non-flowing mode.chunk
is read, we wrap it into a packet that contains in order: 1 byte (UInt8
) for the channel ID, 4 bytes (UInt32BE
) for the packet size, and then the actual data.end
event so that we can terminate the destination stream when all the source streams are ended.Now the last part of our client becomes very easy:
var socket = net.connect(3000, function() { //[1] var child = child_process.fork( //[2] process.argv[2], process.argv.slice(3) , {silent: true} ); multiplexChannels([child.stdout, child.stderr], socket); //[3] });
In this last code fragment, we perform the following operations::
localhost:3000
.process.argv
array as arguments for the child process. We specify the option {silent: true}
, so that the child process does not inherit stdout
and stderr
of the parent.stdout
and stderr
of the child
process and we multiplex them into socket
using the mutiplexChannels()
function.Now we can take care of creating the server side of the application (server.js
), where we demultiplex the streams from the remote connection and pipe them into two different files. Let's start by creating a function called demultiplexChannel()
:
function demultiplexChannel(source, destinations) { var currentChannel = null; var currentLength = null; source .on('readable', function() { //[1] var chunk; if(currentChannel === null) { //[2] chunk = this.read(1); currentChannel = chunk && chunk.readUInt8(0); } if(currentLength === null) { //[3] chunk = this.read(4); currentLength = chunk && chunk.readUInt32BE(0); if(currentLength === null) { return; } } chunk = this.read(currentLength); //[4] if(chunk === null) { return; } console.log('Received packet from: ' + currentChannel); destinations[currentChannel].write(chunk); //[5] currentChannel = null; currentLength = null; }) .on('end', function() { //[6] destinations.forEach(function(destination) { destination.end(); }); console.log('Source channel closed'); }); }
The preceding code might look complicated but it is not; thanks to the pull nature of Node.js Readable
streams, we can easily implement the demultiplexing of our little protocol as follows:
this.read()
invocation to return null. In such a case, we simply interrupt the parsing and retry at the next readable
event.currentChannel
and currentLength
variables (these will be used to parse the next packet).Now that we can demultiplex the source stream, let's put our new function at work:
net.createServer(function(socket) { var stdoutStream = fs.createWriteStream('stdout.log'); var stderrStream = fs.createWriteStream('stderr.log'); demultiplexChannel(socket, [stdoutStream, stderrStream]); }).listen(3000, function() { console.log('Server started'); });
In the preceding code, we first start a TCP server on the port 3000
, then for each connection that we receive, we will create two Writable
streams pointing to two different files, one for the standard output and another for the standard error; these are our destination channels. Finally, we use demultiplexChannel()
to demultiplex the socket
stream into stdoutStream
and stderrStream
.
Now, we are ready to try our new mux/demux application, but first let's create a small Node.js program to produce some sample output; let's call it generateData.js
:
console.log("out1"); console.log("out2"); console.error("err1"); console.log("out3"); console.error("err2");
Okay, now we are ready to try our remote logging application. First, let's start the server:
node server
Then the client, by providing the file that we want to start as child process:
node client generateData.js
The client will run almost immediately, but at the end of the process the standard input and standard output of the generateData
application have traveled through one single TCP connection and then, on the server, have beendemultiplexed into two separate files.
Please make a note that, as we are using child_process.fork()
(http://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options), our client will be able to launch only other Node.js modules.
The example that we have just shown demonstrated how to multiplex and demultiplex a binary/text stream, but it's worth mentioning that the same rules apply also to object streams. The greatest difference is that, using objects, we already have a way to transmit the data using atomic messages (the objects), so multiplexing would be as easy as setting a property channelID
into each object, while demultiplexing would simply involve reading the channelID
property and routing each object towards the right destination stream.
Another pattern involving only demultiplexing consists in routing the data coming from a source depending on some condition. With this pattern, we can implement complex flows, such as the one shown in the following diagram:
The demultiplexer used in the system described by the preceding diagram, takes a stream of objects representing animals and distributes each of them to the right destination stream based on the class of the animal: reptiles, amphibians, and mammals.
Using the same principle, we can also implement an if
-else
statement for streams; for some inspiration, take a look at the ternary-stream
package (https://npmjs.org/package/ternary-stream) that allows us to do exactly that.