Asynchronous control flow with streams

Going through the examples that we presented so far, it should be clear that streams can be useful not only to handle I/O, but also as an elegant programming pattern that can be used to process any kind of data. But the advantages do not end at the simple appearance; streams can also be leveraged to turn asynchronous control flow into flow control, as we will see in this section.

By default, streams will handle data in a sequence, for example, a _transform() function of a Transform stream will never be invoked again with the next chunk of data, until the previous invocation completes by executing callback(). This is an important property of streams, crucial for processing each chunk in the right order, but it can also be exploited to turn streams into an elegant alternative to the traditional control flow patterns.

Some code is always better than too much explanation, so let's work on an example to demonstrate how we can use streams to execute asynchronous tasks in a sequence. Let's create a function that concatenates a set of files received as input, making sure to honor the order in which they are provided. Let's create a new module called concatFiles.js and let's define its contents starting from its dependencies:

We will be using through2 to simplify the creation of Transform streams and from2-array in order to create a readable stream from an array of objects.

Next, we can define the concatFiles() function:

The preceding function implements a sequential iteration over the files array by transforming it into a stream. The procedure followed by the function is explained as follows:

We can now try to use the little module we just created. Let's do that in a new file, called concat.js:

We can now run the preceding program by passing the destination file as the first command line argument followed by a list of files to concatenate, for example:

This should create a new file called allTogether.txt containing, in order, the contents of file1.txt and file2.txt.

With the concatFiles() function, we were able to obtain an asynchronous sequential iteration using only streams. As we saw in Chapter 2, Asynchronous Control Flow Patterns, this would have required the use of an iterator, if implemented with pure JavaScript, or an external library such as async. We have now provided another option for achieving the same result, which as we see is also very compact and elegant.

We just saw that streams process each data chunk in a sequence, but sometimes this can be a bottleneck as we would not make the most of the Node.js concurrency. If we have to execute a slow asynchronous operation for every data chunk, it can be advantageous to parallelize the execution and speed up the overall process. Of course, this pattern can only be applied if there is no relationship between each chunk of data, which might happen frequently for object streams, but very rarely for binary streams.

To parallelize the execution of a Transform stream, we can apply the same patterns that we learned in Chapter 2, Asynchronous Control Flow Patterns, but with some adaptations to get them working with streams. Let's see how this works.

Let's demonstrate this immediately with an example; let's create a module called parallelStream.js and define a generic Transform stream that executes a given transform function in parallel. Let's start to define its constructor:

The constructor accepts a userTransform() function, which is then saved as an instance variable; we also invoke the parent constructor and for convenience we enable the object mode by default.

Next, it is the turn of the _transform() method:

In the _transform() method, we execute the userTransform() function, then we increment the count of running tasks; finally, we notify that the current transformation step is complete by invoking done(). The trick for triggering the processing of another item in parallel is exactly this; we are not waiting for the userTransform() function to complete before invoking done(), instead we do it immediately. On the other hand, we provide a special callback to userTransform(), which is the this._onComplete() method (we are going to define it in a moment); this allows us to get notified when the userTransform() completes.

Next, it is the turn of the _flush() method:

The _flush() method is invoked just before the stream terminates, so if there are still tasks running we can put on hold the release of the finish event by not invoking the done() callback immediately; instead, we assign it to the this.terminateCallback variable. To understand how the stream is then properly terminated, we have to look into the _onComplete() method:

The _onComplete() method is invoked every time an asynchronous task completes. It checks whether there are any more tasks running and, if there are none, it invokes the this.terminateCallback() function, which will cause the stream to end, releasing the finish event which was put on hold in the _flush() method.

The ParallelStream class, we just built allows us to easily create a Transform stream which executes its tasks in parallel, but there is a caveat: it does not preserve the order of the items as they are received. In fact, asynchronous operations can complete and push data at any time, regardless of when they are started. We immediately understand that this property does not play well with binary streams where the order of data usually matters, but it can surely be useful with some types of object streams.

Now, let's apply our ParallelStream to a concrete example. Let's imagine that we wanted to build a simple service to monitor the status of a big list of URLs. Let's imagine all these URLs are contained in a single file and are newline separated.

Streams can offer a very efficient and elegant solution to this problem, especially if we use our ParallelStream to parallelize the checking of the URLs.

Let's build this simple application immediately in a new module called checkUrls.js:

As we can see, with streams our code looks very elegant and straightforward; let's see how it works:

  1. First, we create a Readable stream from the file given as input.
  2. We pipe the contents of the input file through split (https://npmjs.org/package/split), a Transform stream that ensures outputting each line on a different chunk.
  3. Then, it's the time to use our ParallelStream to check the URL. We do this by sending a HEAD request and waiting for a response. When the callback is invoked, we push the result of the operation down the stream.
  4. Finally, all the results are piped into a file, results.txt.

Now, we can run the checkUrls module with a command such as this:

Where the file urlList.txt contains a list of URLs, for example:

When the command finishes running, we will see that a file results.txt was created. This contains the results of the operation, for example:

There is a good probability that the order in which the results are written is different from the order in which the URLs were specified in the input file. This is clear evidence that our stream executes its tasks in parallel, and it does not enforce any order between the various data chunks in the stream.

If we try to run the checkUrls application against a file that contains thousands or millions of URLs, we will surely run into trouble. Our application will create an uncontrolled number of connections all at once, sending a considerable amount of data in parallel and potentially undermining the stability of the application and the availability of the entire system. As we already know, the solution to keep the load and resource usage under control is to limit the concurrency of the parallel tasks.

Let's see how this works with streams by creating a limitedParallelStream.js module, which is an adaptation of parallelStream.js that we created in the previous section.

Let's see how it looks like, starting from its constructor (we will highlight the changed parts):

We need a concurrency limit to be taken as the input and this time we are going to save two callbacks, one for any pending _transform method (continueCallback) and another one for the callback of the _flush method (terminateCallback).

Next is the _transform() method:

This time in the _transform() method, we have to check whether we have any free execution slots before we invoke done() and trigger the processing of the next item. If we have already reached the maximum number of concurrent running streams, we can simply save the done() callback into the continueCallback variable, so that it can be invoked as soon as a task finishes.

The _flush() method remains exactly the same as in the ParallelStream class, so let's move directly to implementing the _onComplete() method:

Every time a task completes we invoke any saved continueCallback()that will cause the stream to unblock, triggering the processing of the next item.

That's it for the limitedParallelStream module; we can now use it in the checkUrls module in place of parallelStream and have the concurrency of our tasks limited to the value that we set.

The parallel streams that we created previously might shuffle the order of the emitted data, but there are situations where this is not acceptable; sometimes, in fact, it is necessary to have each chunk emitted in the same order in which it was received. But not all the hopes are lost, we can still run the transform function in parallel; all we have to do is to sort the data emitted by each task so that it follows the same order in which the data was received.

This technique involves the use of a buffer to reorder the chunks while they are emitted by each running task. For brevity, we are not going to provide an implementation of such a stream, as it's quite verbose for the scope of this book; what we are going to do instead is reuse one of the available packages on npm built for this specific purpose, for example, through2-parallel (https://npmjs.org/package/through2-parallel).

We can quickly check the behavior of an ordered parallel execution by modifying our existing checkUrls module. Let's say that we want our results to be written in the same order as the URLs in the input file, while executing our checks in parallel. We can do this using through2-parallel:

[...]
var throughParallel = require('through2-parallel');

fs.createReadStream(process.argv[2])
  .pipe(split())
  .pipe(throughParallel.obj({concurrency: 2},
    function(url, enc, done) {
      [...]
    })
  )
  .pipe(fs.createWriteStream('results.txt'))
  .on('finish', function() {
    console.log('All urls were checked');
  });

As we see, the interface of through2-parallel is very similar to that of through2; the only difference is that we can also specify a concurrency limit for the transform function that we provide. If we try to run this new version of checkUrls, we will now see that the results.txt file lists the results in the same order as the URLs appear in the input file.

With this, we conclude our analysis of the asynchronous control flow with streams; next we are going to focus on some piping patterns.