Back to home

Readable Streams in Node.js

Published: Jul 8, 2024

Last updated: Jul 9, 2024

Overview

In my previous post, we learned everything we need to know to understand Node.js streams. We covered the four fundamental stream types in Node.js: Writable, Readable, Duplex, and Transform. We also discussed the benefits of using streams, key concepts like buffering, backpressure, and piping, and the importance of events in stream processing.

In this post, we will dive deeper into Readable streams in particular: how they are created, how we can consume data from them, and some a number of examples to get you comfortable.

How to create Readable streams

In our previous post, we started with the mental model that Readable streams are like water taps. In the same way that we can turn on the tap, change the flow rate, or turn it off, a Readable stream provides data in chunks that we can consume, pause, resume, or end.

The glorious water tap: our mental starting point for Readable streams

The glorious water tap: our mental starting point for Readable streams

But how can we create a Readable stream in Node.js? There are a number of ways, but let's start with what the node:stream module provides.

  1. stream.Readable.from()
  2. stream.from()
  3. stream.Readable class
  4. stream.fromWeb()

From top-to-bottom, I've listed out what I think are the most approachable ways to create a Readable stream in Node.js from the node:stream module, so let's go through them each.

stream.Readable.from()

The stream.Readable.from() method creates a Readable stream from an iterable object or an async iterable object. This method is useful when you have an iterable data source, for example an array of strings, that you want to convert into a stream.

const { Readable } = require("stream"); const readableStream = Readable.from(["Hello", " ", "World", "!"]);

In the above example, we create a Readable stream from an array of strings. The Readable.from() method automatically converts the array into a stream that emits each string as a chunk of data.

For the sake of completion, here is a reminder of how that Readable stream could be paired with a Writable stream to pipe data from one to the other:

const { Readable, Writable } = require("node:stream"); const fs = require("node:fs"); // Create a Readable stream from an array of strings const readableStream = Readable.from(["Hello", " ", "World", "!"]); // Create a Writable stream (in this case, writing to a file) const writableStream = fs.createWriteStream("output.txt"); // Pipe the Readable stream to the Writable stream readableStream.pipe(writableStream); // Handle the 'finish' event on the Writable stream writableStream.on("finish", () => { console.log("Finished writing to file"); }); // Handle any errors writableStream.on("error", (err) => { console.error("An error occurred:", err); });

In the above scenario, our readableStream is piped to the writeableStream that writes to a file output.txt.

The array of ["Hello", " ", "World", "!"] here is considered an iterable object with four chunks. Each element of the array is treated as a separate chunk in the resulting stream.

The writeableStream consumes each chunk of data and writes it to the file output.txt. The finish event is emitted when all data has been written to the file, and the error event is emitted if an error occurs during the writing process.

To solidify this, here is a verbose sequence diagram:

An overview of how the above code is processed chunk-by-chunk

An overview of how the above code is processed chunk-by-chunk

stream.from()

This is function that can be imported directly from the node:stream module and is a shorthand for stream.Readable.from().

const { from, Readable } = require("node:stream"); // These two lines are equivalent: const stream1 = from(["Hello", "World"]); const stream2 = Readable.from(["Hello", "World"]);

stream.Readable()

The stream.Readable class is a base class for creating custom Readable streams. You can extend this class to create your own Readable streams by implementing the _read() method.

const { Readable } = require("stream"); class MyReadable extends Readable { constructor(options, data) { super(options); this.data = data; } _read() { if (this.data.length) { this.push(this.data.shift()); } else { this.push(null); } } } const myReadable = new MyReadable(["Hello", " ", "World", "!"]);

In the above example, we create a custom Readable stream by extending the stream.Readable class. The _read() method is implemented to push data chunks from the data array to the stream. When all data has been pushed, null is pushed to signal the end of the stream.

stream.fromWeb()

stream.fromWeb() is designed to convert a Web API ReadableStream into a Node.js Readable stream. It's not always "from the web" in the sense of network requests; it's about compatibility between Web API streams and Node.js streams.

const { fromWeb } = require("node:stream/web"); // Create a Web API ReadableStream (this could come from a fetch() response, for example) const webReadableStream = new ReadableStream({ start(controller) { controller.enqueue("Hello"); controller.enqueue("World"); controller.close(); }, }); // Convert it to a Node.js Readable stream const nodeReadableStream = fromWeb(webReadableStream); // Use the Node.js stream nodeReadableStream.on("data", (chunk) => { console.log(chunk.toString()); }); nodeReadableStream.on("end", () => { console.log("Stream ended"); });

In the above example, we create a Web API ReadableStream that emits two chunks of data ("Hello" and "World") and then closes. We then convert this Web API stream to a Node.js Readable stream using stream.fromWeb().

For advanced more practical example using the web fetch API:

const { fromWeb } = require("node:stream/web"); const { createWriteStream } = require("node:fs"); const { pipeline } = require("node:stream/promises"); async function downloadFile(url, outputPath) { const response = await fetch(url); if (!response.ok) throw new Error(`HTTP error! status: ${response.status}`); // response.body is a Web API ReadableStream const nodeStream = fromWeb(response.body); // Create a writable stream to the output file const fileStream = createWriteStream(outputPath); // Use pipeline to pipe the data and handle cleanup await pipeline(nodeStream, fileStream); console.log(`File downloaded to ${outputPath}`); } downloadFile("https://example.com/somefile.txt", "output.txt").catch( console.error );

In the above, we use the fetch API to download a file from a URL. The response body is a Web API ReadableStream, which we convert to a Node.js Readable stream using stream.fromWeb(). We then pipe this stream to a Writable stream that writes the data to a file.

Although we haven't touched on it, we make use of the pipeline API from node:stream in the above example.

A visual look at the download sequence:

A sequence diagram of how the above downloadFile function works

A sequence diagram of how the above downloadFile function works

Other ways to create a Readable stream

process.stdin

process.stdin provides a built-in Readable stream for standard input that you can hook into thanks streams implementing the EventEmitter class:

process.stdin.on("data", (chunk) => { console.log("Received input:", chunk.toString()); });

http.IncomingMessage

Another built-in Readable stream created when receiving an HTTP request from Node.js' node:http module:

const http = require("node:http"); const server = http.createServer((req, res) => { // req is an instance of http.IncomingMessage req.on("data", (chunk) => { console.log("Received data:", chunk.toString()); }); }); server.listen(3000);

child_process.spawn()

child_process.spawn() creates a child process that provides Readable streams for standard output and standard error:

const { spawn } = require("node:child_process"); const child = spawn("ls", ["-l"]); child.stdout.on("data", (data) => { console.log(`stdout: ${data}`); });

Compression

Built-in modules like zlib provide Readable streams for compression and decompression:

const zlib = require("node:zlib"); const fs = require("node:fs"); const readStream = fs.createReadStream("file.txt"); const gunzip = zlib.createGunzip(); const uncompressedStream = readStream.pipe(gunzip);

Encryption

Finally for our last example, we can use the crypto module to create streams for cryptographic operations:

const crypto = require("node:crypto"); const key = crypto.randomBytes(32); const iv = crypto.randomBytes(16); const decipher = crypto.createDecipheriv("aes-256-cbc", key, iv); // Use decipher as a Readable stream

For a more complete example for encryption:

const crypto = require("node:crypto"); const { pipeline } = require("node:stream/promises"); const fs = require("node:fs"); // Generate a random key and IV const key = crypto.randomBytes(32); const iv = crypto.randomBytes(16); // Create some sample encrypted data const algorithm = "aes-256-cbc"; const cipher = crypto.createCipheriv(algorithm, key, iv); let encrypted = cipher.update("This is a secret message", "utf8", "hex"); encrypted += cipher.final("hex"); // Write the encrypted data to a file fs.writeFileSync("encrypted.txt", encrypted); // Create a decipher (Readable stream) const decipher = crypto.createDecipheriv(algorithm, key, iv); // Create a Readable stream from the encrypted file const readStream = fs.createReadStream("encrypted.txt", { encoding: "hex" }); // Create a Writable stream for the decrypted output const writeStream = fs.createWriteStream("decrypted.txt"); // Use the pipeline to decrypt the data async function decryptFile() { try { await pipeline(readStream, decipher, writeStream); console.log("File successfully decrypted"); } catch (err) { console.error("Pipeline failed", err); } } decryptFile();

This last example is a little tough to follow (I've synchronously written the encrypted data to a file), but here is a sequence diagram to help:

A sequence diagram of how the above code executes

A sequence diagram of how the above code executes

Readable stream fundamentals in action

We're going to cap off this blog post by recapping some of the core stream concepts we covered in the previous post Understanding Node.js Streams.

The following will be a quick-fire round of examples to help you get comfortable with the concepts we have covered.

const { Readable } = require("node:stream"); // Create a custom Readable stream class NumberStream extends Readable { constructor(max, options) { super(options); this.max = max; this.current = 1; } _read() { if (this.current <= this.max) { const data = { number: this.current }; this.push(JSON.stringify(data)); this.current++; } else { this.push(null); // Signal the end of the stream } } } // Create instances of our custom stream const numberStream = new NumberStream(5, { objectMode: false, highWaterMark: 64, // Buffer size }); const objectStream = new NumberStream(5, { objectMode: true, highWaterMark: 2, // Buffer size }); // Example 1: Using events (paused mode) numberStream.on("readable", () => { let chunk; while (null !== (chunk = numberStream.read())) { console.log("Received chunk:", chunk.toString()); } }); numberStream.on("end", () => { console.log("Stream ended"); }); // Example 2: Using flowing mode objectStream.on("data", (chunk) => { console.log("Received object:", chunk); }); objectStream.on("end", () => { console.log("Object stream ended"); }); // Example 3: Demonstrating backpressure const slowConsumer = new Readable({ read() {}, }); slowConsumer.on("data", (chunk) => { console.log("Slow consumer received:", chunk.toString()); }); const fastProducer = new Readable({ read() { for (let i = 0; i < 1000; i++) { if (!this.push(`Data ${i}\n`)) { console.log("Backpressure applied"); return; } } this.push(null); }, }); // Piping with backpressure handling fastProducer.pipe(slowConsumer);

In the above example, we cover the following concepts:

  1. Buffering: The highWaterMark option is set in the NumberStream constructor, controlling the internal buffer size.

  2. Backpressure: In Example 3, we demonstrate backpressure between a fast producer and a slow consumer.

  3. Modes of Operation:

    • Example 1 uses the paused mode, manually calling read().
    • Example 2 uses the flowing mode with the 'data' event.
  4. Piping: Example 3 uses pipe() to connect the fast producer to the slow consumer.

  5. Events: We use readable, data, and end events to interact with the streams.

  6. Object Mode: The objectStream is created with objectMode: true, allowing it to emit JavaScript objects.

On top of this:

  • The NumberStream class demonstrates how to create a custom Readable stream.
  • The _read method is called internally by the stream to generate data.
  • In object mode, we directly push JavaScript objects. In non-object mode, we stringify the objects.
  • The backpressure example shows how a fast producer will pause when the consumer's buffer fills up.
  • Piping automatically handles backpressure and data flow between streams.

Conclusion

Readable streams in Node.js offer a powerful and flexible way to handle data in chunks, making it easier to process large datasets efficiently. By understanding the various methods to create readable streams, such as stream.Readable.from(), stream.from(), stream.Readable, and stream.fromWeb(), you can leverage these tools to build robust data processing applications.

We also explored several practical examples, including piping streams, handling HTTP requests, working with child processes, and performing compression and encryption. Each of these examples highlights the versatility of readable streams and their applicability in different scenarios.

As you continue to work with Node.js streams, remember the key concepts of buffering, backpressure, and the importance of event handling to manage your streams effectively. It all takes practice.

For further reading and more advanced use cases, be sure to check out the official Node.js documentation and other related resources. We'll be looking into Writeable streams next time!

Resources and further reading

Disclaimer: This blog post used AI to generate the images used for the analogy.

Photo credit: terminath0r

Personal image

Dennis O'Keeffe

@dennisokeeffe92
  • Melbourne, Australia

Hi, I am a professional Software Engineer. Formerly of Culture Amp, UsabilityHub, Present Company and NightGuru.
I am currently working on Visibuild.

1,200+ PEOPLE ALREADY JOINED ❤️️

Get fresh posts + news direct to your inbox.

No spam. We only send you relevant content.

Readable Streams in Node.js

Introduction

Share this post