The async library

If we take a look for a moment at every control flow pattern we have analyzed so far, we can see that they could be used as a base to build reusable and more generic solutions. For example, we could wrap the unlimited parallel execution algorithm into a function which accepts a list of tasks, runs them in parallel, and invokes the given callback when all of them are complete. This way of wrapping control flow algorithms into reusable functions can lead to a more declarative and expressive way to define asynchronous control flows, and that's exactly what async (https://npmjs.org/package/async) does. The async library is a very popular solution, in Node.js and JavaScript in general, to deal with asynchronous code. It offers a set of functions that greatly simplify the execution of a set of tasks in different configurations and it also provides useful helpers for dealing with collections asynchronously. Even though there are several other libraries with a similar goal, async is a de facto standard in Node.js due to its popularity.

Let's try it straightaway to demonstrate its capabilities.

The async library can help us immensely when implementing complex asynchronous control flows, but one difficulty with it is choosing the right helper for the problem at hand. For example, for the case of the sequential execution flow, there are around 20 different functions to choose from, including: eachSeries(), mapSeries(), filterSeries(), rejectSeries(), reduce(), reduceRight(), detectSeries(), concatSeries(), series(), whilst(), doWhilst(), until(), doUntil(), forever(), waterfall(), compose(), seq(), applyEachSeries(), iterator(), and timesSeries().

Choosing the right function is an important step in writing more compact and readable code, but this also requires some experience and practice. In our examples, we are going to cover only a few of these situations, but they will still provide a solid base to understand and efficiently use the rest of the library.

Now, to show in practice how async works, we are going to adapt our web spider application. Let's start directly with version 2, the one that downloads all the links recursively in sequence.

However, first let's make sure we install the async library into our current project:

Then we need to load the new dependency from the spider.js module:

Let's modify the download() function first. As we have already seen, it executes the following three tasks in sequence:

The ideal function to use with this flow is definitely async.series(), which has the following signature:

It takes a list of tasks and a callback function that is invoked when all the tasks have been completed. Each task is just a function that accepts a callback function, which must be invoked when the task completes its execution:

The nice thing about async is that it uses the same callback conventions of Node.js, and it automatically handles error propagation. So, if any of the tasks invoke its callback with an error, async will skip the remaining tasks in the list and jump directly to the final callback.

With this in mind, let's see how the download() function would change by using async:

If we remember the callback hell version of this code, we will surely appreciate the way async allows us to organize our tasks. There is no need to nest callbacks anymore, as we just have to provide a flat list of tasks, usually one for each asynchronous operation, which async will then execute in sequence. This is how we define each task:

For this specific situation, a possible alternative to async.series() would be async.waterfall(), which still executes the tasks in sequence but in addition, it also provides the output of each task as input to the next. In our situation, we could use this feature to propagate the body variable until the end of our sequence. As an exercise, you can try to implement the same function using the waterfall flow and then take a look at the differences.

The async library doesn't lack functions to handle parallel flows, among them we can find each(), map(), filter(), reject(), detect(), some(), every(), concat(), parallel(), applyEach(), and times(). They follow the same logic of the functions we have already seen for the sequential execution, with the difference that the tasks provided are executed in parallel.

To demonstrate that, we can try to apply one of these functions to implement version 3 of our web spider application, the one performing the downloads using an unlimited parallel flow.

If we remember the code we used earlier to implement the sequential version of the spiderLinks() function, adapting it to make it work in parallel is a trivial task:

The function is exactly the same one that we used for the sequential download, but this time we used async.each() instead of async.eachSeries(). This clearly demonstrates the power of abstracting the asynchronous flow with a library such as async. The code is not bound to a particular execution flow anymore; there is no code specifically written for that, most of it is just application logic.

If you are wondering if async can also be used to limit the concurrency of parallel tasks, the answer is yes, it can! We have a few functions we can use for that, namely, eachLimit(), mapLimit(), parallelLimit(), queue(), and cargo().

Let's try to exploit one of them to implement version 4 of the web spider application, the one executing the download of the links in parallel with limited concurrency. Fortunately, async has async.queue(), which works in a similar way as the TaskQueue class we created earlier in the chapter. The async.queue() function creates a new queue, which uses a worker() function to execute a set of tasks with a specified concurrency limit:

The worker() function receives, as input, the task to run and a callback function to invoke, when the task completes:

We should notice that task in this case can be anything, not just a function. In fact, it's the responsibility of the worker to handle a task in the most appropriate way. New tasks can be added to the queue by using q.push(task, callback). The callback associated to a task has to be invoked by the worker after the task has been processed.

Now, let's modify our code again to implement a parallel globally limited execution flow, using async.queue(). First of all, we need to create a new queue:

The code is really straightforward. We are just creating a new queue with a concurrency limit of 2, having a worker that simply invokes our spider() function with the data associated with a task. Next, we implement the spiderLinks() function:

The preceding code should look very familiar, as it's almost the same as the one we used to implement the same flow using the TaskQueue object. Also, in this case, the important part to analyze is where we push a new task into the queue. At that point, we ensure that we pass a callback that enables us to check if all the download tasks for the current page are completed, and eventually invoke the final callback.

Thanks to async.queue(), we could easily replicate the functionality of our TaskQueue object, again demonstrating that with async, we can really avoid writing asynchronous control flow patterns from scratch, reducing our efforts and saving precious lines of code.