Back to home

Duplex Streams in Node.js

Published: Jul 11, 2024

Last updated: Jul 11, 2024

Overview

At this point in our Node.js Streams blog series, we covered the core fundamentals you should know about Node.js streams and spent a blog post diving into both Readable streams and Writable streams.

The next fundamental stream type that we're going to look into are Duplex streams.

How to think about Duplex streams

Let's recap our mental model on Duplex streams: the two-way water pipe.

Two-way water pipe: our mental model for Duplex streams

Two-way water pipe: our mental model for Duplex streams

To understand this analogy a bit more, let's place flesh it out a little bit more:

  • Bidirectional flow: Like a two-way water pipe that allows water to flow in both directions, a Duplex stream enables data to flow both ways.
  • Separate channels: Imagine the pipe having two separate channels, one for each direction. This represents how Duplex streams manage separate internal buffers for reading and writing.
  • Valves (methods):
    • _read() is like opening a valve to allow water (data) to flow out.
    • _write() is like opening a valve to let water (data) flow in.
  • Pressure (backpressure):
    • If water is flowing in faster than it can flow out, pressure builds up. This is analogous to backpressure in streams.
    • The pipe might have a mechanism to signal when it's full, similar to how streams emit events or return values to indicate backpressure.
  • Flow control: Imagine regulators on each end of the pipe that can slow down or speed up the flow. This represents how Duplex streams can pause and resume data flow.
  • Pipe connections (piping): You can connect multiple two-way pipes to create a complex water system, just as you can pipe multiple Duplex streams together.
  • Closing the pipe: The process of properly shutting off both ends of the pipe is similar to the _final() method ensuring all data is processed before closing.
  • Leaks (error handling): A leak in the pipe is like an error in the stream, requiring proper handling to prevent data loss.
  • Water types (modes): The pipe could carry either a continuous flow of water (binary mode) or discrete objects like fish (object mode).

This analogy can help readers visualize the abstract concepts of Duplex streams in a more concrete, familiar context.

Alternatives to this would be to take the analogy of walkie talkies, or a two-way street with cars coming and going in both directions. Choose whatever helps you visualize things best. I stick with the two-way water pipe analogy to fit in better with all the water-themed analogies I used for Node.js streams.

You can also think of Duplex streams as a combination of Readable and Writable streams, meaning that our previous deep-dives analogies for those can also apply to help you solidify your understanding of Duplex streams.

How to create Duplex streams

We can create duplex streams using the node:stream module. There are a number of ways:

  1. Extending the Duplex class.
  2. Using the new Duplex() constructor.
  3. Using Duplex.from.

Extending the Duplex class

const { Duplex } = require("stream"); class MyDuplex extends Duplex { constructor(options) { super(options); this.data = ["Hello", "World", "Duplex", "Stream"]; } _read(size) { const chunk = this.data.shift(); if (chunk) { this.push(chunk); } else { this.push(null); } } _write(chunk, encoding, callback) { console.log("Received:", chunk.toString()); callback(); } } const myDuplex = new MyDuplex(); myDuplex.on("data", (chunk) => console.log("Read:", chunk.toString())); myDuplex.write("Some data");

If you have followed along with my previous posts, you will notice that the _read and _write methods are similar to the ones we used for Readable and Writable streams. The key difference is that Duplex streams implement both methods.

The above code creates a Duplex stream that reads data from an internal array and writes data to the console.

When you run node script.js, you should see the following output:

$ node duplex/one.js Received: Some data Read: Hello Read: World Read: Duplex Read: Stream

Here's a sequence diagram visual:

Visualizing our Duplex stream

Visualizing our Duplex stream

Using the new Duplex() constructor

Similar to Readable and Writable, we do not need to extend the class if we choose that we do not want to.

const { Duplex } = require("node:stream"); const duplex = new Duplex({ read(size) { // Implement read logic }, write(chunk, encoding, callback) { // Implement write logic callback(); }, });

Using Duplex.from

With Duplex.from, we can write the implementation for the readable and writable properties:

const { Duplex } = require("node:stream"); const duplex = Duplex.from({ readable: (async function* () { yield "data"; yield "more data"; })(), writable: new WritableStream({ write(chunk) { console.log("Received:", chunk); }, }), });

Alternatively, we can pass in existing Readable and Writable streams:

const { Duplex } = require("node:stream"); const { PassThrough } = require("node:stream"); const readable = new PassThrough(); const writable = new PassThrough(); const duplex = Duplex.from({ readable, writable });

Use cases for Duplex streams

Duplex streams have several practical use cases in Node.js applications. Here are some common scenarios where Duplex streams are particularly useful:

  1. Network sockets:

    • TCP sockets
    • WebSocket connections
    • Implementing custom network protocols
  2. Database connections:

    • Streaming queries and results
    • Real-time data synchronization
  3. File I/O:

    • Simultaneous reading and writing of files
    • Implementing file-based IPC (Inter-Process Communication)
  4. Encryption/Decryption:

    • Creating streams that can both encrypt and decrypt data
  5. Compression/Decompression:

    • Implementing streams that can both compress and decompress data
  6. Parser/Serializer:

    • Creating streams that can parse incoming data and serialize outgoing data
  7. Protocol implementation:

    • HTTP/2 streams
    • Custom bi-directional protocols
  8. Proxy servers:

    • Forwarding requests and responses between client and server
  9. Telnet-like interfaces:

    • Creating interactive command-line tools
  10. Middleware in stream processing pipelines:

    • Implementing transformations that require both read and write capabilities
  11. Testing and mocking:

    • Simulating complex I/O scenarios in unit tests
  12. Multiplexing/Demultiplexing:

    • Combining multiple streams into one or splitting one stream into many
  13. Buffering and flow control:

    • Implementing custom buffering strategies for data flow

Let's go through some simple examples for some of these.

Network sockets

Here we show a TCP server and client operating as Duplex streams:

const net = require("net"); const server = net.createServer((socket) => { socket.write("Welcome to the server!\n"); socket.on("data", (data) => { console.log("Received:", data.toString()); socket.write(`Echo: ${data}`); }); }); server.listen(3000, () => { const client = new net.Socket(); client.connect(3000, "localhost", () => { client.write("Hello, server!"); }); client.on("data", (data) => { console.log("Server says:", data.toString()); }); });

In this example, we create a TCP server and client using Node's net module. The server listens for connections, echoes back any received data, and sends a welcome message. The client connects to the server, sends a message, and logs any responses.

Both the server socket and client socket are Duplex streams, allowing bidirectional communication.

Sequence diagram for how the network socket above works

Sequence diagram for how the network socket above works

File I/O

In this case we simultaneously read and write:

const fs = require("fs"); const { Duplex } = require("stream"); class FileDuplex extends Duplex { constructor(options) { super(options); this.readFd = fs.openSync("input.txt", "r"); this.writeFd = fs.openSync("output.txt", "w"); } _read(size) { const buf = Buffer.alloc(size); fs.read(this.readFd, buf, 0, size, null, (err, bytesRead) => { if (bytesRead > 0) { this.push(buf.slice(0, bytesRead)); } else { this.push(null); } }); } _write(chunk, encoding, callback) { fs.write(this.writeFd, chunk, callback); } _final(callback) { fs.close(this.readFd, (err) => { fs.close(this.writeFd, callback); }); } } const fileDuplex = new FileDuplex(); fileDuplex.pipe(process.stdout); fileDuplex.write("This will be written to output.txt\n");

This example implements a custom Duplex stream for simultaneous file reading and writing. It reads from 'input.txt' and writes to 'output.txt'. The _read method reads chunks from the input file, while the _write method writes data to the output file. This demonstrates how Duplex streams can be used for concurrent file operations.

Encryption/Decryption

const { Duplex } = require("stream"); const crypto = require("crypto"); class CipherDuplex extends Duplex { constructor(options) { super(options); this.key = crypto.randomBytes(32); this.iv = crypto.randomBytes(16); } _read(size) {} _write(chunk, encoding, callback) { const cipher = crypto.createCipheriv("aes-256-cbc", this.key, this.iv); let encrypted = cipher.update(chunk); encrypted = Buffer.concat([encrypted, cipher.final()]); this.push(encrypted); callback(); } } const cipherDuplex = new CipherDuplex(); cipherDuplex.on("data", (data) => { console.log("Encrypted:", data.toString("hex")); }); cipherDuplex.write("Secret message");

We have the CipherDuplex stream that encrypts incoming data using AES-256-CBC encryption. It generates a random key and initialization vector (IV) for the encryption. When data is written to the stream, it's encrypted and then pushed as readable data. This shows how Duplex streams can be used for on-the-fly data transformation.

Compression/Decompression

const zlib = require("zlib"); const { pipeline } = require("stream"); const compressDecompress = zlib.createGzip(); pipeline( process.stdin, compressDecompress, zlib.createGunzip(), process.stdout, (err) => { if (err) { console.error("Pipeline failed", err); } else { console.log("Pipeline succeeded"); } } ); // Usage: echo "Hello, world!" | node script.js

This example uses zlib's built-in Duplex streams for compression and decompression. It sets up a pipeline that compresses input from stdin, immediately decompresses it, and then writes the result to stdout. This demonstrates how Duplex streams can be chained together for complex data processing.

Parser/serializer

const { Duplex } = require("stream"); class JSONDuplex extends Duplex { constructor(options) { super({ ...options, objectMode: true }); } _read(size) {} _write(chunk, encoding, callback) { try { const parsed = JSON.parse(chunk); this.push(parsed); callback(); } catch (err) { callback(err); } } _final(callback) { this.push(null); callback(); } } const jsonDuplex = new JSONDuplex(); jsonDuplex.on("data", (data) => { console.log("Parsed object:", data); }); jsonDuplex.write('{"name": "John", "age": 30}'); jsonDuplex.write('{"city": "New York", "country": "USA"}');

Here we create a JSONDuplex stream that parses incoming JSON strings into JavaScript objects. It operates in object mode, allowing it to push parsed objects directly. This showcases how Duplex streams can be used for data format conversion in both directions, although this example only implements the parsing direction.

Multiplexing/Demultiplexing

const { Duplex, PassThrough } = require("stream"); class MultiplexDuplex extends Duplex { constructor(options) { super(options); this.streams = new Map(); } _read(size) {} _write(chunk, encoding, callback) { const { streamId, data } = JSON.parse(chunk); if (!this.streams.has(streamId)) { this.streams.set(streamId, new PassThrough()); } this.streams.get(streamId).write(data); callback(); } getStream(streamId) { if (!this.streams.has(streamId)) { this.streams.set(streamId, new PassThrough()); } return this.streams.get(streamId); } } const multiplexer = new MultiplexDuplex(); const stream1 = multiplexer.getStream(1); const stream2 = multiplexer.getStream(2); stream1.on("data", (data) => console.log("Stream 1:", data.toString())); stream2.on("data", (data) => console.log("Stream 2:", data.toString())); multiplexer.write(JSON.stringify({ streamId: 1, data: "Hello from stream 1" })); multiplexer.write(JSON.stringify({ streamId: 2, data: "Hello from stream 2" }));

This example implements a MultiplexDuplex stream that can handle multiple logical streams over a single Duplex stream. It uses a streamId to differentiate between different logical streams. Data written to the multiplexer is routed to the appropriate PassThrough stream based on the streamId. This demonstrates how Duplex streams can be used to implement complex communication protocols.

Conclusion

In this journey through the world of Node.js streams, we've delved into the fundamental concepts of streams, explored Readable and Writable streams in detail, and now, we've uncovered the intricacies of Duplex streams. By understanding Duplex streams through the analogy of a two-way water pipe, we've gained insight into how data can flow bidirectionally, the importance of backpressure, and the mechanisms for controlling this flow.

From extending the Duplex class to using the new Duplex() constructor and Duplex.from, we've seen multiple ways to create Duplex streams. These streams offer a powerful way to handle scenarios where both reading and writing are essential, such as network sockets, file I/O, encryption, compression, and complex protocols.

By exploring practical use cases, we've demonstrated how Duplex streams can be applied to real-world problems, making them an indispensable tool in your Node.js toolkit. Whether you're building a custom network protocol, implementing a parser, or creating a multiplexing system, Duplex streams provide the flexibility and control you need.

As we continue this series, we'll look into other advanced stream types and techniques, helping you master the full spectrum of stream capabilities in Node.js. Stay tuned for more insights and practical examples in our next post on Transform streams. Thank you for following along, and happy streaming!

Resources and further reading

Disclaimer: This blog post used AI to generate the images used for the analogy.

Photo credit: pawel_czerwinski

Personal image

Dennis O'Keeffe

@dennisokeeffe92
  • Melbourne, Australia

Hi, I am a professional Software Engineer. Formerly of Culture Amp, UsabilityHub, Present Company and NightGuru.
I am currently working on Visibuild.

1,200+ PEOPLE ALREADY JOINED ❤️️

Get fresh posts + news direct to your inbox.

No spam. We only send you relevant content.

Duplex Streams in Node.js

Introduction

Share this post