Duplex Streams in Node.js
Published: Jul 11, 2024
Last updated: Jul 11, 2024
Overview
At this point in our Node.js Streams blog series
The next fundamental stream type that we're going to look into are Duplex streams
How to think about Duplex streams
Let's recap our mental model on Duplex streams: the two-way water pipe.
Two-way water pipe: our mental model for Duplex streams
To understand this analogy a bit more, let's place flesh it out a little bit more:
- Bidirectional flow: Like a two-way water pipe that allows water to flow in both directions, a Duplex stream enables data to flow both ways.
- Separate channels: Imagine the pipe having two separate channels, one for each direction. This represents how Duplex streams manage separate internal buffers for reading and writing.
- Valves (methods):
_read()
is like opening a valve to allow water (data) to flow out._write()
is like opening a valve to let water (data) flow in.
- Pressure (backpressure):
- If water is flowing in faster than it can flow out, pressure builds up. This is analogous to backpressure in streams.
- The pipe might have a mechanism to signal when it's full, similar to how streams emit events or return values to indicate backpressure.
- Flow control: Imagine regulators on each end of the pipe that can slow down or speed up the flow. This represents how Duplex streams can pause and resume data flow.
- Pipe connections (piping): You can connect multiple two-way pipes to create a complex water system, just as you can pipe multiple Duplex streams together.
- Closing the pipe: The process of properly shutting off both ends of the pipe is similar to the
_final()
method ensuring all data is processed before closing. - Leaks (error handling): A leak in the pipe is like an error in the stream, requiring proper handling to prevent data loss.
- Water types (modes): The pipe could carry either a continuous flow of water (binary mode) or discrete objects like fish (object mode).
This analogy can help readers visualize the abstract concepts of Duplex streams in a more concrete, familiar context.
Alternatives to this would be to take the analogy of walkie talkies, or a two-way street with cars coming and going in both directions. Choose whatever helps you visualize things best. I stick with the two-way water pipe analogy to fit in better with all the water-themed analogies I used for Node.js streams.
You can also think of Duplex streams as a combination of Readable and Writable streams, meaning that our previous deep-dives analogies for those can also apply to help you solidify your understanding of Duplex streams.
How to create Duplex streams
We can create duplex streams using the node:stream
module. There are a number of ways:
- Extending the
Duplex
class. - Using the
new Duplex()
constructor. - Using
Duplex.from
.
Extending the Duplex class
const { Duplex } = require("stream"); class MyDuplex extends Duplex { constructor(options) { super(options); this.data = ["Hello", "World", "Duplex", "Stream"]; } _read(size) { const chunk = this.data.shift(); if (chunk) { this.push(chunk); } else { this.push(null); } } _write(chunk, encoding, callback) { console.log("Received:", chunk.toString()); callback(); } } const myDuplex = new MyDuplex(); myDuplex.on("data", (chunk) => console.log("Read:", chunk.toString())); myDuplex.write("Some data");
If you have followed along with my previous posts, you will notice that the _read
and _write
methods are similar to the ones we used for Readable and Writable streams. The key difference is that Duplex streams implement both methods.
The above code creates a Duplex stream that reads data from an internal array and writes data to the console.
When you run node script.js
, you should see the following output:
$ node duplex/one.js Received: Some data Read: Hello Read: World Read: Duplex Read: Stream
Here's a sequence diagram visual:
Visualizing our Duplex stream
Using the new Duplex() constructor
Similar to Readable and Writable, we do not need to extend the class if we choose that we do not want to.
const { Duplex } = require("node:stream"); const duplex = new Duplex({ read(size) { // Implement read logic }, write(chunk, encoding, callback) { // Implement write logic callback(); }, });
Using Duplex.from
With Duplex.from
, we can write the implementation for the readable
and writable
properties:
const { Duplex } = require("node:stream"); const duplex = Duplex.from({ readable: (async function* () { yield "data"; yield "more data"; })(), writable: new WritableStream({ write(chunk) { console.log("Received:", chunk); }, }), });
Alternatively, we can pass in existing Readable and Writable streams:
const { Duplex } = require("node:stream"); const { PassThrough } = require("node:stream"); const readable = new PassThrough(); const writable = new PassThrough(); const duplex = Duplex.from({ readable, writable });
Use cases for Duplex streams
Duplex streams have several practical use cases in Node.js applications. Here are some common scenarios where Duplex streams are particularly useful:
Network sockets:
- TCP sockets
- WebSocket connections
- Implementing custom network protocols
Database connections:
- Streaming queries and results
- Real-time data synchronization
File I/O:
- Simultaneous reading and writing of files
- Implementing file-based IPC (Inter-Process Communication)
Encryption/Decryption:
- Creating streams that can both encrypt and decrypt data
Compression/Decompression:
- Implementing streams that can both compress and decompress data
Parser/Serializer:
- Creating streams that can parse incoming data and serialize outgoing data
Protocol implementation:
- HTTP/2 streams
- Custom bi-directional protocols
Proxy servers:
- Forwarding requests and responses between client and server
Telnet-like interfaces:
- Creating interactive command-line tools
Middleware in stream processing pipelines:
- Implementing transformations that require both read and write capabilities
Testing and mocking:
- Simulating complex I/O scenarios in unit tests
Multiplexing/Demultiplexing:
- Combining multiple streams into one or splitting one stream into many
Buffering and flow control:
- Implementing custom buffering strategies for data flow
Let's go through some simple examples for some of these.
Network sockets
Here we show a TCP server and client operating as Duplex streams:
const net = require("net"); const server = net.createServer((socket) => { socket.write("Welcome to the server!\n"); socket.on("data", (data) => { console.log("Received:", data.toString()); socket.write(`Echo: ${data}`); }); }); server.listen(3000, () => { const client = new net.Socket(); client.connect(3000, "localhost", () => { client.write("Hello, server!"); }); client.on("data", (data) => { console.log("Server says:", data.toString()); }); });
In this example, we create a TCP server and client using Node's net module. The server listens for connections, echoes back any received data, and sends a welcome message. The client connects to the server, sends a message, and logs any responses.
Both the server socket and client socket are Duplex streams, allowing bidirectional communication.
Sequence diagram for how the network socket above works
File I/O
In this case we simultaneously read and write:
const fs = require("fs"); const { Duplex } = require("stream"); class FileDuplex extends Duplex { constructor(options) { super(options); this.readFd = fs.openSync("input.txt", "r"); this.writeFd = fs.openSync("output.txt", "w"); } _read(size) { const buf = Buffer.alloc(size); fs.read(this.readFd, buf, 0, size, null, (err, bytesRead) => { if (bytesRead > 0) { this.push(buf.slice(0, bytesRead)); } else { this.push(null); } }); } _write(chunk, encoding, callback) { fs.write(this.writeFd, chunk, callback); } _final(callback) { fs.close(this.readFd, (err) => { fs.close(this.writeFd, callback); }); } } const fileDuplex = new FileDuplex(); fileDuplex.pipe(process.stdout); fileDuplex.write("This will be written to output.txt\n");
This example implements a custom Duplex stream for simultaneous file reading and writing. It reads from 'input.txt' and writes to 'output.txt'. The _read
method reads chunks from the input file, while the _write
method writes data to the output file. This demonstrates how Duplex streams can be used for concurrent file operations.
Encryption/Decryption
const { Duplex } = require("stream"); const crypto = require("crypto"); class CipherDuplex extends Duplex { constructor(options) { super(options); this.key = crypto.randomBytes(32); this.iv = crypto.randomBytes(16); } _read(size) {} _write(chunk, encoding, callback) { const cipher = crypto.createCipheriv("aes-256-cbc", this.key, this.iv); let encrypted = cipher.update(chunk); encrypted = Buffer.concat([encrypted, cipher.final()]); this.push(encrypted); callback(); } } const cipherDuplex = new CipherDuplex(); cipherDuplex.on("data", (data) => { console.log("Encrypted:", data.toString("hex")); }); cipherDuplex.write("Secret message");
We have the CipherDuplex stream that encrypts incoming data using AES-256-CBC encryption. It generates a random key and initialization vector (IV) for the encryption. When data is written to the stream, it's encrypted and then pushed as readable data. This shows how Duplex streams can be used for on-the-fly data transformation.
Compression/Decompression
const zlib = require("zlib"); const { pipeline } = require("stream"); const compressDecompress = zlib.createGzip(); pipeline( process.stdin, compressDecompress, zlib.createGunzip(), process.stdout, (err) => { if (err) { console.error("Pipeline failed", err); } else { console.log("Pipeline succeeded"); } } ); // Usage: echo "Hello, world!" | node script.js
This example uses zlib's built-in Duplex streams for compression and decompression. It sets up a pipeline that compresses input from stdin, immediately decompresses it, and then writes the result to stdout. This demonstrates how Duplex streams can be chained together for complex data processing.
Parser/serializer
const { Duplex } = require("stream"); class JSONDuplex extends Duplex { constructor(options) { super({ ...options, objectMode: true }); } _read(size) {} _write(chunk, encoding, callback) { try { const parsed = JSON.parse(chunk); this.push(parsed); callback(); } catch (err) { callback(err); } } _final(callback) { this.push(null); callback(); } } const jsonDuplex = new JSONDuplex(); jsonDuplex.on("data", (data) => { console.log("Parsed object:", data); }); jsonDuplex.write('{"name": "John", "age": 30}'); jsonDuplex.write('{"city": "New York", "country": "USA"}');
Here we create a JSONDuplex stream that parses incoming JSON strings into JavaScript objects. It operates in object mode, allowing it to push parsed objects directly. This showcases how Duplex streams can be used for data format conversion in both directions, although this example only implements the parsing direction.
Multiplexing/Demultiplexing
const { Duplex, PassThrough } = require("stream"); class MultiplexDuplex extends Duplex { constructor(options) { super(options); this.streams = new Map(); } _read(size) {} _write(chunk, encoding, callback) { const { streamId, data } = JSON.parse(chunk); if (!this.streams.has(streamId)) { this.streams.set(streamId, new PassThrough()); } this.streams.get(streamId).write(data); callback(); } getStream(streamId) { if (!this.streams.has(streamId)) { this.streams.set(streamId, new PassThrough()); } return this.streams.get(streamId); } } const multiplexer = new MultiplexDuplex(); const stream1 = multiplexer.getStream(1); const stream2 = multiplexer.getStream(2); stream1.on("data", (data) => console.log("Stream 1:", data.toString())); stream2.on("data", (data) => console.log("Stream 2:", data.toString())); multiplexer.write(JSON.stringify({ streamId: 1, data: "Hello from stream 1" })); multiplexer.write(JSON.stringify({ streamId: 2, data: "Hello from stream 2" }));
This example implements a MultiplexDuplex stream that can handle multiple logical streams over a single Duplex stream. It uses a streamId to differentiate between different logical streams. Data written to the multiplexer is routed to the appropriate PassThrough stream based on the streamId. This demonstrates how Duplex streams can be used to implement complex communication protocols.
Conclusion
In this journey through the world of Node.js streams, we've delved into the fundamental concepts of streams, explored Readable and Writable streams in detail, and now, we've uncovered the intricacies of Duplex streams. By understanding Duplex streams through the analogy of a two-way water pipe, we've gained insight into how data can flow bidirectionally, the importance of backpressure, and the mechanisms for controlling this flow.
From extending the Duplex
class to using the new Duplex()
constructor and Duplex.from
, we've seen multiple ways to create Duplex streams. These streams offer a powerful way to handle scenarios where both reading and writing are essential, such as network sockets, file I/O, encryption, compression, and complex protocols.
By exploring practical use cases, we've demonstrated how Duplex streams can be applied to real-world problems, making them an indispensable tool in your Node.js toolkit. Whether you're building a custom network protocol, implementing a parser, or creating a multiplexing system, Duplex streams provide the flexibility and control you need.
As we continue this series, we'll look into other advanced stream types and techniques, helping you master the full spectrum of stream capabilities in Node.js. Stay tuned for more insights and practical examples in our next post on Transform streams. Thank you for following along, and happy streaming!
Resources and further reading
Disclaimer: This blog post used AI to generate the images used for the analogy.
Photo credit: pawel_czerwinski
Dennis O'Keeffe
Melbourne, Australia
1,200+ PEOPLE ALREADY JOINED ❤️️
Get fresh posts + news direct to your inbox.
No spam. We only send you relevant content.
Duplex Streams in Node.js
Introduction