Going through the examples that we presented so far, it should be clear that streams can be useful not only to handle I/O, but also as an elegant programming pattern that can be used to process any kind of data. But the advantages do not end at the simple appearance; streams can also be leveraged to turn asynchronous control flow into flow control, as we will see in this section.
By default, streams will handle data in a sequence, for example, a _transform()
function of a Transform
stream will never be invoked again with the next chunk of data, until the previous invocation completes by executing callback()
. This is an important property of streams, crucial for processing each chunk in the right order, but it can also be exploited to turn streams into an elegant alternative to the traditional control flow patterns.
Some code is always better than too much explanation, so let's work on an example to demonstrate how we can use streams to execute asynchronous tasks in a sequence. Let's create a function that concatenates a set of files received as input, making sure to honor the order in which they are provided. Let's create a new module called concatFiles.js
and let's define its contents starting from its dependencies:
var fromArray = require('from2-array'); var through = require('through2'); var fs = require('fs');
We will be using through2
to simplify the creation of Transform
streams and from2-array
in order to create a readable stream from an array of objects.
Next, we can define the concatFiles()
function:
function concatFiles(destination, files, callback) { var destStream = fs.createWriteStream(destination); fromArray.obj(files) //[1] .pipe(through.obj(function(file, enc, done) { //[2] var src = fs.createReadStream(file); src.pipe(destStream, {end: false}); src.on('end', function() { //[3] done(); }); })) .on('finish', function() { //[4] destStream.end(); callback(); }); } module.exports = concatFiles;
The preceding function implements a sequential iteration over the files
array by transforming it into a stream. The procedure followed by the function is explained as follows:
from2-array
to create a Readable
stream from the files
array.Transform
) stream to handle each file in the sequence. For each file, we create a Readable
stream and we pipe it into destStream
, which represents the output file. We make sure not to close destStream
after the source file finishes reading, by specifying {end: false}
into the pipe()
options.destStream
, we invoke done()
, which triggers the processing of the next file.files
have been processed, the finish
event is fired; we can finally end destStream
and invoke the callback()
function of concatFiles()
, which signals the completion of the whole operation.We can now try to use the little module we just created. Let's do that in a new file, called concat.js
:
var concatFiles = require('./concatFiles'); concatFiles(process.argv[2], process.argv.slice(3), function() { console.log('Files concatenated succesfully'); });
We can now run the preceding program by passing the destination file as the first command line argument followed by a list of files to concatenate, for example:
node concat allTogether.txt file1.txt file2.txt
This should create a new file called allTogether.txt
containing, in order, the contents of file1.txt
and file2.txt
.
With the concatFiles()
function, we were able to obtain an asynchronous sequential iteration using only streams. As we saw in Chapter 2, Asynchronous Control Flow Patterns, this would have required the use of an iterator, if implemented with pure JavaScript, or an external library such as async
. We have now provided another option for achieving the same result, which as we see is also very compact and elegant.
We just saw that streams process each data chunk in a sequence, but sometimes this can be a bottleneck as we would not make the most of the Node.js concurrency. If we have to execute a slow asynchronous operation for every data chunk, it can be advantageous to parallelize the execution and speed up the overall process. Of course, this pattern can only be applied if there is no relationship between each chunk of data, which might happen frequently for object streams, but very rarely for binary streams.
To parallelize the execution of a Transform
stream, we can apply the same patterns that we learned in Chapter 2, Asynchronous Control Flow Patterns, but with some adaptations to get them working with streams. Let's see how this works.
Let's demonstrate this immediately with an example; let's create a module called parallelStream.js
and define a generic Transform
stream that executes a given transform function in parallel. Let's start to define its constructor:
var stream = require('stream'); var util = require('util'); function ParallelStream(userTransform) { stream.Transform.call(this, {objectMode: true}); this.userTransform = userTransform; this.running = 0; this.terminateCallback = null; } util.inherits(ParallelStream, stream.Transform);
The constructor accepts a userTransform()
function, which is then saved as an instance variable; we also invoke the parent constructor and for convenience we enable the object mode by default.
Next, it is the turn of the _transform()
method:
ParallelStream.prototype._transform = function(chunk, enc, done) { this.running++; this.userTransform(chunk, enc, this._onComplete.bind(this)); done(); }
In the _transform()
method, we execute the userTransform()
function, then we increment the count of running tasks; finally, we notify that the current transformation step is complete by invoking done()
. The trick for triggering the processing of another item in parallel is exactly this; we are not waiting for the userTransform()
function to complete before invoking done()
, instead we do it immediately. On the other hand, we provide a special callback to userTransform()
, which is the this._onComplete()
method (we are going to define it in a moment); this allows us to get notified when the userTransform()
completes.
Next, it is the turn of the _flush()
method:
ParallelStream.prototype._flush = function(done) { if(this.running > 0) { this.terminateCallback = done; } else { done(); } }
The _flush()
method is invoked just before the stream terminates, so if there are still tasks running we can put on hold the release of the finish
event by not invoking the done()
callback immediately; instead, we assign it to the this.terminateCallback
variable. To understand how the stream is then properly terminated, we have to look into the _onComplete()
method:
ParallelStream.prototype._onComplete = function(err) { this.running--; if(err) { return this.emit('error', err); } if(this.running === 0) { this.terminateCallback && this.terminateCallback(); } } module.exports = ParallelStream;
The _onComplete()
method is invoked every time an asynchronous task completes. It checks whether there are any more tasks running and, if there are none, it invokes the this.terminateCallback()
function, which will cause the stream to end, releasing the finish
event which was put on hold in the _flush()
method.
The
ParallelStream
class, we just built allows us to easily create a Transform
stream which executes its tasks in parallel, but there is a caveat: it does not preserve the order of the items as they are received. In fact, asynchronous operations can complete and push data at any time, regardless of when they are started. We immediately understand that this property does not play well with binary streams where the order of data usually matters, but it can surely be useful with some types of object streams.
Now, let's apply our ParallelStream
to a concrete example. Let's imagine that we wanted to build a simple service to monitor the status of a big list of URLs. Let's imagine all these URLs are contained in a single file and are newline separated.
Streams can offer a very efficient and elegant solution to this problem, especially if we use our ParallelStream
to parallelize the checking of the URLs.
Let's build this simple application immediately in a new module called checkUrls.js
:
var fs = require('fs'); var split = require('split'); var request = require('request'); var ParallelStream = require('./parallelStream'); fs.createReadStream(process.argv[2]) //[1] .pipe(split()) //[2] .pipe(new ParallelStream(function(url, enc, done) { //[3] if(!url) return done(); var self = this; request.head(url, function(err, response) { self.push(url + ' is ' + (err ? 'down' : 'up') + '\n'); done(); }); })) .pipe(fs.createWriteStream('results.txt')) //[4] .on('finish', function() { console.log('All urls were checked'); });
As we can see, with streams our code looks very elegant and straightforward; let's see how it works:
Readable
stream from the file given as input.split
(https://npmjs.org/package/split), a Transform
stream that ensures outputting each line on a different chunk.ParallelStream
to check the URL. We do this by sending a HEAD
request and waiting for a response. When the callback is invoked, we push the result of the operation down the stream.results.txt
.Now, we can run the checkUrls
module with a command such as this:
node checkUrls urlList.txt
Where the file urlList.txt
contains a list of URLs, for example:
http://www.example.com http://www.example.com/link1 http://thiswillbedownforsure.com
When the command finishes running, we will see that a file results.txt
was created. This contains the results of the operation, for example:
http://thiswillbedownforsure.com is down http://www.example.com/link1 is up http://www.example.com is up
There is a good probability that the order in which the results are written is different from the order in which the URLs were specified in the input file. This is clear evidence that our stream executes its tasks in parallel, and it does not enforce any order between the various data chunks in the stream.
For the sake of curiosity, we might want to try replacing ParallelStream
with a normal through2
stream, and compare the behavior and performances of the two (you might want to do this as an exercise). We will see that using through2
is way more slower, because each URL would be checked in a sequence, but also that the order of the results in the file results.txt
would be preserved.
If we try to run the checkUrls
application against a file that contains thousands or millions of URLs, we will surely run into trouble. Our application will create an uncontrolled number of connections all at once, sending a considerable amount of data in parallel and potentially undermining the stability of the application and the availability of the entire system. As we already know, the solution to keep the load and resource usage under control is to limit the concurrency of the parallel tasks.
Let's see how this works with streams by creating a limitedParallelStream.js
module, which is an adaptation of parallelStream.js
that we created in the previous section.
Let's see how it looks like, starting from its constructor (we will highlight the changed parts):
function LimitedParallelStream(concurrency, userTransform) { stream.Transform.call(this, {objectMode: true}); this.userTransform = userTransform; this.running = 0; this.terminateCallback = null; this.continueCallback = null; this.concurrency = concurrency; }
We need a concurrency
limit to be taken as the input and this time we are going to save two callbacks, one for any pending _transform
method (continueCallback
) and another one for the callback of the _flush
method (terminateCallback
).
Next is the _transform()
method:
LimitedParallelStream.prototype._transform = function(chunk, enc, done) { this.running++; this.userTransform(chunk, enc, this._onComplete.bind(this)); if(this.running < this.concurrency) { done(); } else { this.continueCallback = done; } }
This time in the _transform()
method, we have to check whether we have any free execution slots before we invoke done()
and trigger the processing of the next item. If we have already reached the maximum number of concurrent running streams, we can simply save the done()
callback into the continueCallback
variable, so that it can be invoked as soon as a task finishes.
The _flush()
method remains exactly the same as in the ParallelStream
class, so let's move directly to implementing the _onComplete()
method:
LimitedParallelStream.prototype._onComplete = function(err, chunk) { this.running--; if(err) { return this.emit('error', err); } var tmpCallback = this.continueCallback; this.continueCallback = null; tmpCallback && tmpCallback(); if(this.running === 0) { this.terminateCallback && this.terminateCallback(); } }
Every time a task completes we invoke any saved continueCallback()
that will cause the stream to unblock, triggering the processing of the next item.
That's it for the limitedParallelStream
module; we can now use it in the checkUrls
module in place of parallelStream
and have the concurrency of our tasks limited to the value that we set.
The parallel streams that we created previously might shuffle the order of the emitted data, but there are situations where this is not acceptable; sometimes, in fact, it is necessary to have each chunk emitted in the same order in which it was received. But not all the hopes are lost, we can still run the transform function in parallel; all we have to do is to sort the data emitted by each task so that it follows the same order in which the data was received.
This technique involves the use of a buffer to reorder the chunks while they are emitted by each running task. For brevity, we are not going to provide an implementation of such a stream, as it's quite verbose for the scope of this book; what we are going to do instead is reuse one of the available packages on npm
built for this specific purpose, for example, through2-parallel
(https://npmjs.org/package/through2-parallel).
We can quickly check the behavior of an ordered parallel execution by modifying our existing checkUrls
module. Let's say that we want our results to be written in the same order as the URLs in the input file, while executing our checks in parallel. We can do this using through2-parallel
:
[...] var throughParallel = require('through2-parallel'); fs.createReadStream(process.argv[2]) .pipe(split()) .pipe(throughParallel.obj({concurrency: 2}, function(url, enc, done) { [...] }) ) .pipe(fs.createWriteStream('results.txt')) .on('finish', function() { console.log('All urls were checked'); });
As we see, the interface of through2-parallel
is very similar to that of through2
; the only difference is that we can also specify a concurrency
limit for the transform function that we provide. If we try to run this new version of checkUrls
, we will now see that the results.txt
file lists the results in the same order as the URLs appear in the input file.
With this, we conclude our analysis of the asynchronous control flow with streams; next we are going to focus on some piping patterns.