In my previous post, we learned everything we need to know to understand Node.js streams. We covered the four fundamental stream types in Node.js: Writable, Readable, Duplex, and Transform. We also discussed the benefits of using streams, key concepts like buffering, backpressure, and piping, and the importance of events in stream processing.
In this post, we will dive deeper into Readable streams in particular: how they are created, how we can consume data from them, and some a number of examples to get you comfortable.
How to create Readable streams
In our previous post, we started with the mental model that Readable streams are like water taps. In the same way that we can turn on the tap, change the flow rate, or turn it off, a Readable stream provides data in chunks that we can consume, pause, resume, or end.
The glorious water tap: our mental starting point for Readable streams
But how can we create a Readable stream in Node.js? There are a number of ways, but let's start with what the node:stream module provides.
stream.Readable.from()
stream.from()
stream.Readable class
stream.fromWeb()
From top-to-bottom, I've listed out what I think are the most approachable ways to create a Readable stream in Node.js from the node:stream module, so let's go through them each.
stream.Readable.from()
The stream.Readable.from() method creates a Readable stream from an iterable object or an async iterable object. This method is useful when you have an iterable data source, for example an array of strings, that you want to convert into a stream.
In the above example, we create a Readable stream from an array of strings. The Readable.from() method automatically converts the array into a stream that emits each string as a chunk of data.
For the sake of completion, here is a reminder of how that Readable stream could be paired with a Writable stream to pipe data from one to the other:
const { Readable, Writable } = require("node:stream");
const fs = require("node:fs");
// Create a Readable stream from an array of strings
const readableStream = Readable.from(["Hello", " ", "World", "!"]);
// Create a Writable stream (in this case, writing to a file)
const writableStream = fs.createWriteStream("output.txt");
// Pipe the Readable stream to the Writable stream
readableStream.pipe(writableStream);
// Handle the 'finish' event on the Writable stream
writableStream.on("finish", () => {
console.log("Finished writing to file");
});
// Handle any errors
writableStream.on("error", (err) => {
console.error("An error occurred:", err);
});
In the above scenario, our readableStream is piped to the writeableStream that writes to a file output.txt.
The array of ["Hello", " ", "World", "!"] here is considered an iterable object with four chunks. Each element of the array is treated as a separate chunk in the resulting stream.
The writeableStream consumes each chunk of data and writes it to the file output.txt. The finish event is emitted when all data has been written to the file, and the error event is emitted if an error occurs during the writing process.
To solidify this, here is a verbose sequence diagram:
An overview of how the above code is processed chunk-by-chunk
stream.from()
This is function that can be imported directly from the node:stream module and is a shorthand for stream.Readable.from().
const { from, Readable } = require("node:stream");
// These two lines are equivalent:
const stream1 = from(["Hello", "World"]);
const stream2 = Readable.from(["Hello", "World"]);
stream.Readable()
The stream.Readable class is a base class for creating custom Readable streams. You can extend this class to create your own Readable streams by implementing the _read() method.
In the above example, we create a custom Readable stream by extending the stream.Readable class. The _read() method is implemented to push data chunks from the data array to the stream. When all data has been pushed, null is pushed to signal the end of the stream.
stream.fromWeb()
stream.fromWeb() is designed to convert a Web API ReadableStream into a Node.js Readable stream. It's not always "from the web" in the sense of network requests; it's about compatibility between Web API streams and Node.js streams.
const { fromWeb } = require("node:stream/web");
// Create a Web API ReadableStream (this could come from a fetch() response, for example)
const webReadableStream = new ReadableStream({
start(controller) {
controller.enqueue("Hello");
controller.enqueue("World");
controller.close();
},
});
// Convert it to a Node.js Readable stream
const nodeReadableStream = fromWeb(webReadableStream);
// Use the Node.js stream
nodeReadableStream.on("data", (chunk) => {
console.log(chunk.toString());
});
nodeReadableStream.on("end", () => {
console.log("Stream ended");
});
In the above example, we create a Web API ReadableStream that emits two chunks of data ("Hello" and "World") and then closes. We then convert this Web API stream to a Node.js Readable stream using stream.fromWeb().
For advanced more practical example using the web fetch API:
const { fromWeb } = require("node:stream/web");
const { createWriteStream } = require("node:fs");
const { pipeline } = require("node:stream/promises");
async function downloadFile(url, outputPath) {
const response = await fetch(url);
if (!response.ok) throw new Error(`HTTP error! status: ${response.status}`);
// response.body is a Web API ReadableStream
const nodeStream = fromWeb(response.body);
// Create a writable stream to the output file
const fileStream = createWriteStream(outputPath);
// Use pipeline to pipe the data and handle cleanup
await pipeline(nodeStream, fileStream);
console.log(`File downloaded to ${outputPath}`);
}
downloadFile("https://example.com/somefile.txt", "output.txt").catch(
console.error
);
In the above, we use the fetch API to download a file from a URL. The response body is a Web API ReadableStream, which we convert to a Node.js Readable stream using stream.fromWeb(). We then pipe this stream to a Writable stream that writes the data to a file.
Although we haven't touched on it, we make use of the pipeline API from node:stream in the above example.
A visual look at the download sequence:
A sequence diagram of how the above downloadFile function works
Other ways to create a Readable stream
process.stdin
process.stdin provides a built-in Readable stream for standard input that you can hook into thanks streams implementing the EventEmitter class:
The following will be a quick-fire round of examples to help you get comfortable with the concepts we have covered.
const { Readable } = require("node:stream");
// Create a custom Readable stream
class NumberStream extends Readable {
constructor(max, options) {
super(options);
this.max = max;
this.current = 1;
}
_read() {
if (this.current <= this.max) {
const data = { number: this.current };
this.push(JSON.stringify(data));
this.current++;
} else {
this.push(null); // Signal the end of the stream
}
}
}
// Create instances of our custom stream
const numberStream = new NumberStream(5, {
objectMode: false,
highWaterMark: 64, // Buffer size
});
const objectStream = new NumberStream(5, {
objectMode: true,
highWaterMark: 2, // Buffer size
});
// Example 1: Using events (paused mode)
numberStream.on("readable", () => {
let chunk;
while (null !== (chunk = numberStream.read())) {
console.log("Received chunk:", chunk.toString());
}
});
numberStream.on("end", () => {
console.log("Stream ended");
});
// Example 2: Using flowing mode
objectStream.on("data", (chunk) => {
console.log("Received object:", chunk);
});
objectStream.on("end", () => {
console.log("Object stream ended");
});
// Example 3: Demonstrating backpressure
const slowConsumer = new Readable({
read() {},
});
slowConsumer.on("data", (chunk) => {
console.log("Slow consumer received:", chunk.toString());
});
const fastProducer = new Readable({
read() {
for (let i = 0; i < 1000; i++) {
if (!this.push(`Data ${i}\n`)) {
console.log("Backpressure applied");
return;
}
}
this.push(null);
},
});
// Piping with backpressure handling
fastProducer.pipe(slowConsumer);
In the above example, we cover the following concepts:
Buffering: The highWaterMark option is set in the NumberStream constructor, controlling the internal buffer size.
Backpressure: In Example 3, we demonstrate backpressure between a fast producer and a slow consumer.
Modes of Operation:
Example 1 uses the paused mode, manually calling read().
Example 2 uses the flowing mode with the 'data' event.
Piping: Example 3 uses pipe() to connect the fast producer to the slow consumer.
Events: We use readable, data, and end events to interact with the streams.
Object Mode: The objectStream is created with objectMode: true, allowing it to emit JavaScript objects.
On top of this:
The NumberStream class demonstrates how to create a custom Readable stream.
The _read method is called internally by the stream to generate data.
In object mode, we directly push JavaScript objects. In non-object mode, we stringify the objects.
The backpressure example shows how a fast producer will pause when the consumer's buffer fills up.
Piping automatically handles backpressure and data flow between streams.
Conclusion
Readable streams in Node.js offer a powerful and flexible way to handle data in chunks, making it easier to process large datasets efficiently. By understanding the various methods to create readable streams, such as stream.Readable.from(), stream.from(), stream.Readable, and stream.fromWeb(), you can leverage these tools to build robust data processing applications.
We also explored several practical examples, including piping streams, handling HTTP requests, working with child processes, and performing compression and encryption. Each of these examples highlights the versatility of readable streams and their applicability in different scenarios.
As you continue to work with Node.js streams, remember the key concepts of buffering, backpressure, and the importance of event handling to manage your streams effectively. It all takes practice.
For further reading and more advanced use cases, be sure to check out the official Node.js documentation and other related resources. We'll be looking into Writeable streams next time!