Core readable streams flow control issue

The _read method on readable streams does not accept a callback. Since a stream usually contains more than just a single buffer of data, the stream needs to call the _read method more than once.

The way it does this is by waiting for us to call push and then calling _read again if the internal buffer of the stream has available space.

A problem with this approach is that, if we want to call push more than once in an asynchronous way, this becomes problematic.

Let's create a folder called readable-flow-control, initialize it as a package, install readable-stream, and create a file called undefined-behavior.js:

$ mkdir readable-flow-control
$ cd readable-flow-control

$ npm init -y $ npm install --save readable-stream
$ touch
undefined-behavior.js

The undefined-behavior.js file should contain the following:

// WARNING: DOES NOT WORK AS EXPECTED 
const { Readable } = require('readable-stream')
const rs = Readable({
read: () => {
setTimeout(() => {
rs.push('Data 0')
setTimeout(() => {
rs.push('Data 1')
}, 50)
}, 100)
}
})

rs.on('data', (data) => {
console.log(data.toString())
})

If we run:

$ node undefined-behavior.js 

We might expect it to produce a stream of alternating Data 0, Data 1 buffers but in reality it has undefined behavior.

Luckily, as we will show in this recipe, there are more user-friendly modules available (such as as from2) to make all of this easier.

Let's install from2 into our folder and create a file called expected-behavior.js:

$ npm install --save from2 
$ touch expected-behavior.js

Let's make the expected-behavior.js file contain the following content:

const from = require('from2') 
const rs = from((size, cb) => {
setTimeout(() => {
rs.push('Data 0')
setTimeout(() => {
rs.push('Data 1')
cb()
}, 50)
}, 100)
})

rs.on('data', (data) => {
console.log(data.toString())
})

Now if we run:

$ node expected-behavior.js 

We'll see alternating messages, as expected.