Handling backpressure

By default, writable streams have a high watermark of 16,384 bytes (16 KB). If the limit is met, the writable stream will indicate that this is the case and it's up to the stream consumer to stop writing until the stream's buffer has cleared. However, even if the high watermark is exceeded, a stream can still be written to. This is how memory leaks can form.

When a writable (or transform) stream is receiving more data than it's able to process in a given time frame, a backpressure strategy is required to prevent the memory from continually growing until the process begins to slow down and eventually crash.

When we use the pipe method (including when used indirectly via pump), the backpressure is respected by default.

Let's create a folder called backpressure, initialize it as a package, and installĀ readable-stream:

$ mkdir backpressure 
$ cd backpressure
$ npm init -y
$ npm install readable-stream --save

Now we'll create a file called backpressure-with-pipe.js, with the following contents:

const { Readable, Writable } = require('readable-stream') 

var i = 20

const rs = Readable({
read: (size) => {
setImmediate(function () {
rs.push(i-- ? Buffer.alloc(size) : null)
})
}
})

const ws = Writable({
write: (chunk, enc, cb) => {
console.log(ws._writableState.length)
setTimeout(cb, 1)
}
})

rs.pipe(ws)

We have a write stream that takes 1 ms to process each chunk, and a read stream that pushes 16 KB chunks (the size parameter will be 16 KB). We use setImmediate in the read stream to simulate asynchronous behavior, as read streams tend to (and should generally) be asynchronous.

In our write stream, we're logging out the size of the stream buffer on each write.

We can definitely (as we'll soon see) write more than one 16 KB chunk to a stream before the 1 ms timeout occurs.

However, if we run our backpressure-with-pipe.js program:

$ node backpressure-with-pipe.js 

We should see results as shown in the following image:

We'll see that the write stream's buffer never exceeds 16 KB; this is because the pipe method is managing the backpressure.

However, if we write directly to the stream, we can push the stream far above its watermark.

Let's copy backpressure-with-pipe.js to direct-write-no-backpressure.js, and alter the very last line which states rs.pipe(ws) to:

rs.on('data', (chunk) => ws.write(chunk)) 

If we run our new program:

$ node direct-write-no-backpressure.js 

We should see (as shown in the following image) the size of the write stream's buffer grow almost to 300 KB before it falls back to 16 KB as the stream attempts to free up the buffer.

If we want to manage backpressure without pipe, we have to check the return value of our call to write. If the return value is true, then the stream can be written to; if the value is false, then we need to wait for the drain event to begin writing again.

Let's copy direct-write-no-backpressure.js to direct-write-with-backpressure.js and alter the final line (rs.on('data', (chunk) => ws.write(chunk))) to:

rs.on('data', (chunk) => { 
const writable = ws.write(chunk)
if writable === false) {
rs.pause()
ws.once('drain', () => rs.resume())
}
})

We check the return value of ws.write to determine whether the stream is still writable (in the advisable sense).

If it isn't writable, we have to pause the incoming readable stream, since, once we listen to a data event, the mode of the stream changes from non-flowing mode (where data is pulled from it) to flowing mode (where data is pushed).

If we run direct-write-with-backpressure.js:

$ node direct-write-with-backpressure.js 

We should see, as with our piping example, that the writable stream's buffer does not exceed 16 KB.