The ES6 specification introduces another mechanism that, besides other things, can be used to simplify the asynchronous control flow of our Node.js applications. We are talking about generators, also known as semi-coroutines. They are a generalization of subroutines, where there can be different entry points. In a normal function, in fact, we can have only one entry point, which corresponds to the invocation of the function itself. A generator is similar to a function, but in addition, it can be suspended (using the yield statement) and then resumed at a later time. Generators are particularly useful when implementing iterators, and this should ring a bell, as we have already seen how iterators can be used to implement important asynchronous control flow patterns such as sequential and limited parallel execution.
In Node.js, generators are available starting from Version 0.11, but at the moment of writing, this feature is still not enabled by default and it's necessary to invoke Node.js with the --harmony
or --harmony-generators
flags to get generators working. To try the examples in this section, make sure you have the right version of Node.js installed (Version 0.11.0 and later), by running the following command:
node --version
Before we explore the use of generators for asynchronous control flow, it's important we learn some basic concepts. Let's start from the syntax; a generator function can be declared by appending the *
(asterisk) operator after the function
keyword:
function* makeGenerator() { //body }
Inside the makeGenerator()
function, we can pause the execution using the keyword yield
and return to the caller the value passed to it:
function* makeGenerator() { yield 'Hello World'; console.log('Re-entered'); }
In the preceding code, the generator yields a string, Hello World
, by putting the execution of the function on pause. When the generator is resumed, the execution will start from console.log('Re-entered')
.
The makeGenerator()
function is essentially a factory that, when invoked, returns a new generator object:
var gen = makeGenerator();
The most important method of the generator object is next()
, which is used to start/resume the execution of the generator and returns an object in the following form:
{ value: <yielded value> done: <true if the execution reached the end> }
This object contains the value yielded by the generator (value
) and a flag to indicate if the generator has completed its execution (done
).
To demonstrate generators, let's create a new module. We can call it fruitGenerator.js
and include the following code:
function* fruitGenerator() { yield 'apple'; yield 'orange'; return 'watermelon'; } var newFruitGenerator = fruitGenerator(); console.log(newFruitGenerator.next()); //[1] console.log(newFruitGenerator.next()); //[2] console.log(newFruitGenerator.next()); //[3]
We can run the new module with the following command:
node --harmony-generators fruitGenerator
The preceding code should print the following output:
{ value: 'apple', done: false } { value: 'orange', done: false } { value: 'watermelon', done: true }
This is a short explanation of what happened in the preceding code:
newFruitGenerator.next()
was invoked, the generator started its execution until it reached the first yield
command, which put the generator on pause and returned the value apple
, to the caller.newFruitGenerator.next()
, the generator resumed, starting from the second yield
command, which in turn put on pause the execution again, while returning the value orange
to the caller.newFruitGenerator.next()
caused the execution of the generator to resume from its last instruction, a return
statement, which terminates the generator, returns the value, watermelon
, and sets the done
property to true
in the result object.To better understand why generators are so useful for the implementation of iterators, let's build one. In a new module, which we will call iteratorGenerator.js
, let's write the following code:
function* iteratorGenerator(arr) { for(var i = 0; i < arr.length; i++) { yield arr[i]; }; } var iterator = iteratorGenerator(['apple', 'orange', 'watermelon']); var currentItem = iterator.next(); while(!currentItem.done) { console.log(currentItem.value); currentItem = iterator.next(); }
We can execute this code using the following command:
node --harmony-generators iteratorGenerator
The preceding simple program should print the list of the items in the array as follows:
apple orange watermelon
In this example, each time we call iterator.next()
, we resume the for
loop of the generator, which runs another cycle by yielding the next item in the array. This demonstrates how the state of the generator is maintained across invocations. When resumed, the loop and all the variables are exactly the same as when the execution was put on pause.
To conclude our exploration of the basic functionality of generators, we will now learn how to pass values back to a generator. This is actually very simple; what we need to do is just providing an argument to the next()
method, and that value will be provided as the return value of the yield
statement inside the generator.
To show this, let's create a new simple module:
function* twoWayGenerator() { var what = yield null; console.log('Hello ' + what); } var twoWay = twoWayGenerator(); twoWay.next(); twoWay.next('world');
When executed, the preceding code will print Hello world
. This means that the following has happened:
next()
method is invoked, the generator reaches the first yield
function and is then put on pause.next('world')
is invoked, the generator resumes from the point where it was put on pause, which is on the yield
instruction, but this time we have a value that is passed back to the generator. This value will then be set into the what
variable. The generator then executes the console.log()
instruction and terminates.In a similar way, we can force a generator to throw an exception. This is made possible by using the throw
method of the generator, as shown in the following example:
var twoWay = twoWayGenerator(); twoWay.next(); twoWay.throw(new Error());
Using this last code snippet, the twoWayGenerator()
function will throw an exception the moment the yield
function returns. This works exactly as if an exception was thrown from inside the generator, and this means that it can be caught and handled like any other exception using a try
-catch
block.
You must be wondering how generators can help us with handling asynchronous operations. We immediately demonstrate that by creating a function that allows us to use asynchronous functions inside a generator and then resuming the execution of the generator when the asynchronous operation completes. We will call this function asyncFlow()
:
function asyncFlow(generatorFunction) { function callback(err) { if(err) { return generator.throw(err); } var results = [].slice.call(arguments, 1); generator.next(results.length > 1 ? results : results[0]); }; var generator = generatorFunction(callback); generator.next(); }
The preceding function takes a generator as an input, instantiates it, and then immediately starts its execution:
var generator = generatorFunction(callback); generator.next();
The generatorFunction()
receives as input a special callback
function that invokes generator.throw()
if an error is received; otherwise, it resumes the execution of the generator by passing back the results received in the callback
function:
if(err) { return generator.throw(err); } var results = [].slice.call(arguments, 1); generator.next(results.length > 1 ? results : results[0]);
To demonstrate the power of this simple function, let's create a new module called clone.js
, which (stupidly) creates a clone of itself. Paste the asyncFlow()
function we just created, followed by the core of the program:
var fs = require('fs'); var path = require('path'); asyncFlow(function* (callback) { var fileName = path.basename(__filename); var myself = yield fs.readFile(fileName, 'utf8', callback); yield fs.writeFile('clone_of_' + fileName, myself, callback); console.log('Clone created'); });
Remarkably, with the help of the asyncFlow()
function, we were able to write asynchronous code using a linear approach, as we were using blocking functions! The magic behind this result should be clear by now. The callback passed to each asynchronous function will in turn resume the generator as soon as the asynchronous operation is complete. Nothing complicated, but the outcome is surely impressive.
There are two other variations of this technique, one involving the use of promises and the other using thunks.
A thunk used in generator-based control flow is just a function that partially applies all the arguments of the original function except its callback. The return value is another function that accepts only the callback as an argument. For example, the thunkified version of fs.readFile()
would be as follows:
function readFileThunk(filename, options) { return function(callback) { fs.readFile(filename, options, callback); } }
Both thunks and promises allow us to create generators that do not need a callback to be passed as an argument; for example, a version of asyncFlow()
using thunks might be the following:
function asyncFlowWithThunks(generatorFunction) { function callback(err) { if(err) { return generator.throw(err); } var results = [].slice.call(arguments, 1); var thunk = generator.next(results.length > 1 ? results : results[0]).value; thunk && thunk(callback); }; var generator = generatorFunction(); var thunk = generator.next().value; thunk && thunk(callback); }
The trick is to read the return value of generator.next()
, which contains the thunk. The next step is to invoke the thunk itself, by injecting our special callback
. Simple! This allows us to write the following code:
asyncFlowWithThunks(function* () { var myself = yield readFileThunk(__filename, 'utf8'); yield writeFileThunk("clone of clone.js", myself); console.log("Clone created"); });
Similarly, we could implement a version of asyncFlow()
that accepts a promise as yieldable. We leave this as an exercise as its implementation requires only a minimal change in the asyncFlowWithThunks()
function. We may also implement an asyncFlow()
function that accepts both promises and thunks as yieldables, using the same principles.
As you may guess, the Node.js ecosystem already provides some solutions to handle asynchronous control flows using generators. For example, suspend
(https://npmjs.org/package/suspend) is one of the oldest and supports promises, thunks, Node.js-style callbacks, as well as raw callbacks. Also, most of the promises libraries we analyzed earlier in the chapter provide helpers to use promises with generators.
All these solutions are based on the same principles we demonstrated with the asyncFlow()
function; so, we may want to reuse one of these instead of writing one ourselves.
For the examples in this section, we chose to use co
(https://npmjs.org/package/co), which is currently receiving a lot of momentum. A flexible solution, co
supports several types of
yieldables, some of which are:
co
also has its own ecosystem of packages including the following:
koa
(https://npmjs.org/package/koa)co
We will use co
to reimplement our web spider application using generators.
While, to convert Node.js style functions to thunks, we are going to use a little library called
thunkify
(https://npmjs.org/package/thunkify).
Let's start our practical exploration of generators and co
by modifying version 2 of the web spider application. The very first thing we want to do is to load our dependencies and generate a thunkified version of the functions we are going to use. These will go at the top of the spider.js
module:
var thunkify = require('thunkify'); var co = require('co'); var request = thunkify(require('request')); var fs = require('fs'); var mkdirp = thunkify(require('mkdirp')); var readFile = thunkify(fs.readFile); var writeFile = thunkify(fs.writeFile); var nextTick = thunkify(process.nextTick);
Looking at the preceding code, we can surely notice some similarities with the code we used earlier in the chapter to promisify some APIs. In this regard, it is interesting to point out that if we decided to use the promisified version of our functions instead of their thunkified alternative, the code that will now follow would remain exactly the same, thanks to the fact that co
supports both thunks and promises as yieldable objects. In fact, if we want, we could even use both thunks and promises in the same application, even in the same generator. This is a tremendous advantage in terms of flexibility, as it allows us to use generator-based control flow with whatever solution we already have at our disposal.
Okay, now let's start transforming the download()
function into a generator:
function* download(url, filename) { console.log('Downloading ' + url); var results = yield request(url); var body = results[1]; yield mkdirp(path.dirname(filename)); yield writeFile(filename, body); console.log('Downloaded and saved:' + url); return body; }
By using generators and co
, our download()
function suddenly becomes trivial. All we had to do is just convert it into a generator function and use yield
wherever we had an asynchronous function (as thunk
) to invoke.
Next, it's the turn of the spider()
function:
function* spider(url, nesting) { var filename = utilities.urlToFilename(url); var body; try { body = yield readFile(filename, 'utf8'); } catch(err) { if(err.code !== 'ENOENT') { throw err; } body = yield download(url, filename); } yield spiderLinks(url, body, nesting); }
The interesting detail to notice from this last fragment of code is how we were able to use a try
-catch
block to handle exceptions. Also, we can now use throw
to propagate errors! Another remarkable line is where we yield
the download()
function, which is not a thunk nor a promisified function, but just another generator. This is possible, thanks to co
, which also supports other generators as yieldables.
At last, we can also convert spiderLinks()
, where we implemented an iteration to download the links of a web page in sequence. With generators, this becomes trivial as well:
function* spiderLinks(currentUrl, body, nesting) { if(nesting === 0) { return yield nextTick(); } var links = utilities.getPageLinks(currentUrl, body); for(var i = 0; i < links.length; i++) { yield spider(links[i], nesting - 1); }; }
There is really little to explain from the previous code, there is no pattern to show for the sequential iteration; generators and co
are doing all the dirty work for us, so we were able to write the asynchronous iteration as if we were using blocking, direct style APIs.
Now comes the most important part, the entry point of our program:
co(function* () { try { yield spider(process.argv[2], 1); console.log('Download complete'); } catch(err) { console.log(err); }; })();
This is the only place where we have to invoke co(...)
to wrap a generator. In fact, once we do that, co
will automatically wrap any generator we pass to a yield
statement, and this will happen recursively, so the rest of the program is totally agnostic to the fact we are using co
, even though it's under the hood.
Now it should be possible to run our generator-based web spider application. Just remember to use the --harmony
or --harmony-generators
flag in the command line:
node --harmony-generators spider <URL>
The bad news about generators is that they are great for writing sequential algorithms, but they can't be used to parallelize the execution of a set of tasks, at least not using just yield
and generators. In fact, the pattern to use for these circumstances is to simply rely on a callback-based or promise-based function, which in turn can easily be yielded and used with generators.
Fortunately, for the specific case of the unlimited parallel execution, co
already allows us to obtain it natively by simply yielding an array of promises, thunks, generators, or generator functions.
With this in mind, version 3 of our web spider application can be implemented simply by rewriting the spiderLinks()
function as follows:
function* spiderLinks(currentUrl, body, nesting) { if(nesting === 0) { return nextTick(); } var links = utilities.getPageLinks(currentUrl, body); var tasks = links.map(function(link) { return spider(link, nesting - 1); }); yield tasks; }
What we did was just collect all the download tasks, which are essentially generators, and then yield on the resulting array. All these tasks will be executed by co
in parallel and then the execution of our generator (spiderLinks
) will be resumed when all the tasks finish running.
If you think we cheated by exploiting the feature of co
that allows us to yield on an array, we can demonstrate how the same parallel flow can be achieved using a callback-based solution similar to what we have already used earlier in the chapter. Let's use this technique to rewrite the spiderLinks()
once again:
function spiderLinks(currentUrl, body, nesting) {
if(nesting === 0) {
return nextTick();
}
//returns a thunk
return function(callback) {
var completed = 0, errored = false;
var links = utilities.getPageLinks(currentUrl, body);
if(links.length === 0) {
return process.nextTick(callback);
}
function done(err, result) {
if(err && !errored) {
errored = true;
callback(err);
}
if(++completed === links.length && !errored) {
callback();
}
}
for(var i = 0; i < links.length; i++) {
co(spider(links[i], nesting - 1))(done);
};
}
}
To run the spider()
function in parallel, which is a generator, we had to convert it into a thunk and then execute it. This was possible by wrapping it with the co(...)
function, which essentially creates a thunk out of a generator. This way, we were able to invoke it in parallel and set the done()
function as callback. Usually, all the libraries for generator-based control flow have a similar feature, so you can always transform a generator into a callback-based function if needed.
To start multiple download tasks in parallel, we just reused the callback-based pattern for parallel execution, which we defined earlier in the chapter. We should also notice that we transformed the spiderLinks()
function to a thunk (it's not even a generator anymore.) This enabled us to have a callback
function to invoke when all the parallel tasks are completed.
Now that we know how to move in case of nonsequential execution flows, it should be easy to plan the implementation of version 4 of our web spider application, the one imposing a limit on the number of concurrent download tasks. We have several options we can use to do that, some of them are as follows:
TaskQueue
class we implemented previously in the chapter. We would need to just thunkify its functions and any generator we want to use as a task.TaskQueue
class, and just make sure that each generator we want to use as a task is converted into a function returning a promise.async
, and thunkify any helper we plan to use, in addition to converting any generator to a callback-based function that can be used by the library.co
ecosystem, specifically designed for this type of flow, such as, co-limiter
(https://npmjs.org/package/co-limiter).co-limiter
uses internally.For educational purposes, we are going to choose the last option, so we can dive into a pattern that is often associated with coroutines (but also threads and processes).
The goal is to leverage a queue to feed a fixed number of workers, as many as the concurrency level we want to set. To implement this algorithm, we are going to take as starting point the TaskQueue
class we defined earlier in the chapter. Let's start gradually; the first thing we want to do is define the constructor:
function TaskQueue(concurrency) {
this.concurrency = concurrency;
this.running = 0;
this.taskQueue = [];
this.consumerQueue = [];
this.spawnWorkers(concurrency);
}
Notice the invocation of this.spawnWorkers()
as this is the method in charge of starting the workers. The next step is, of course, to define our workers; let's see how they look:
TaskQueue.prototype.spawnWorkers = function(concurrency) { var self = this; for(var i = 0; i < concurrency; i++) { co(function* () { while(true) { var task = yield self.nextTask(); yield task; } })(); } }
Our workers are very simple; they are just generators wrapped around co()
and executed immediately, so that each one can run in parallel. Internally, each worker is running an infinite loop that blocks (yield
) waiting for a new task to be available in the queue (yield self.nextTask()
), and when this happens, it yields the task (which is any valid yieldable) waiting for its completion. You may be wondering, how can we actually wait for the next task to be queued? The answer is in the nextTask()
method that we are now going to define:
TaskQueue.prototype.nextTask = function() { var self = this; return function(callback) { //[1] if(self.taskQueue.length !== 0) { callback(null, self.taskQueue.shift()); //[2] } else { self.consumerQueue.push(callback); //[3] } } }
Let's see what happens in this method, which is the core of the pattern:
co
.taskQueue
(if there is any available). This will immediately unblock a worker, providing the next task to yield on.consumerQueue
. By doing this, we are practically putting a worker in idle mode. The callbacks in the consumerQueue
function will be invoked as soon as we have a new task to process, which will resume the corresponding worker.Now, to know how the idle workers in the consumerQueue
function are resumed, we need to define the pushTask()
method:
TaskQueue.prototype.pushTask = function(task) { if(this.consumerQueue.length !== 0) { this.consumerQueue.shift()(null, task); } else { this.taskQueue.push(task); } }
Trivially, the method invokes the first callback in the consumerQueue
function if available, which in turn will unblock a worker. If no callback is available, it means that all the workers are busy, so we simply add a new item in the taskQueue
function.
In the TaskQueue
class we just defined, the workers have the role of consumers, while whoever uses pushTask()
can be considered a producer. This pattern shows us how a generator can look very similar to a thread (or a process). In fact, the producer-consumer interaction is probably the most common problem presented when studying inter-process communication techniques, but as we already mentioned, it is also a common use case for coroutines.
Now that we have implemented our limited parallel algorithm using generators and the producer-consumer pattern, we can apply it to limit the concurrency of the download tasks of our web spider application (version 4). First, let's load and initialize a TaskQueue
object:
var TaskQueue = require('./taskQueue') var downloadQueue = new TaskQueue(2);
Next, we modify the spiderLinks()
function. Its body is almost identical to the one we just used to implement the unlimited parallel execution flow, so we will only show the changed parts here:
function spiderLinks(currentUrl, body, nesting) { [...] return function(callback) { [...] function done(err, result) { [...] } links.forEach(function(link) { downloadQueue.pushTask(function *() { yield spider(link, nesting - 1); done(); }); }); } }
In each of the tasks, we invoke the done()
function just after a download completes, so we can count how many links were downloaded and then notify the callback of the thunk when all are complete.
As an exercise, you can try to implement version 4 of the web spider application, using the other four methods we presented at the beginning of this section.