Table of Contents
Streams are nothing but objects that allow us to continually receive or write data from a source or to a destination. Streams may not be accessible all at once and do not need to get fit into the memory. It becomes advantageous while working with huge amounts of data or when the data is coming from an external source.
Streams are used to handle reading or writing files, network communication, or end-to-end information exchange. They also include events and methods, which allow Node.js development companies to use them as needed.
Why Streams?
Memory efficiency
There is no need to load a large amount of data before you need to process the data.
Time efficiency
It does not require too much time to as soon as you get the data, then wait for the processing until the entire payload is transmitted.
Types of streams in Node.js
- Writable
- Readable
- Transform
- Duplex
Writable: Streams where data can be written but cannot be read.
Events: Methods:
- Drain – write()
- Finish – end()
- Error – setDefaultEncoding()
- Close – cork() , uncork()
- Pipe/unpipe
Readable: Streams from where data can be read but not written.
Events: Methods :
- Data – pipe(), unpipe()
- End – pause(), isPaused()
- Error – setEncoding()
- Readable – read(), unshift(), resume()
- Close
Duplex: Streams that can perform both readable and writable operations are known as Duplex.
- net.Socket
Transform: A transform stream is basically a duplex stream that can modify or transform the data as it is written and read.
For example: whenever you’re performing the file-compression operation, it can be written with compressed data and decompressed data can be read to file or from the file.
All the streams are instances of the eventEmmiter. It emits the events that can be used to read or write the data.
The data-consuming from streams can be done in a more simple way using the pipe() method.
Pipe() method :
Pipes are used to connecting more than one stream. It takes two parameters:
- The writable stream can be a destination for the data, and it is a required parameter.
- The Object used in to pass in options, is an optional parameter
Implementing Writable Streams:
We need to import Writable from the stream module to implement the Writable stream.
const { Writable } = require('stream');
There is one more way to implement a writable stream:
class myWritableStream extends Writable {}
While using writable multiple options can be passed but only write is required, rest are the optional.
const { Writable } = require('stream');
const writeStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
});
process.stdin.pipe(writeStream);
This write() method takes three parameters :
- Chunk normally works as a buffer class unless the custom configuration.
- If customization is implemented encoding is required, but most of the time we can ignore that.
- The Callback() function gets called after the operation on the data chunk is performed. It indicates if the write() was successful or not. If it is not successful we need to call a callback() with an error object.
In the above example, we are just consoling chunk as a string and after that, we call a callback without an error object to indicate success.
We have used a stdin.pipe() stream which is a readable stream to consume our outStream using pipe().
Implementing Readable Stream:
To implement a Readable Stream, we need to import the Readable interface from the stream module. Then, we need to create a new instance of Readable and pass a read() method as an argument.
const { Readable } = require('stream');
const readStream = new Readable({
read() {},
});
There is an alternate way to use the Readable stream by pushing the data that is only required to be consumed.
const { Readable } = require('stream');
const readStream = new Readable({
read() {},
});
readStream.push('Hii from Node Streams');
readStream.push('Again from Node Streams');
readStream.push(null);
readStream.pipe(process.stdout);
The pushing of a null object means that no more data is needed to push anymore.
Process.stdout() is a writable stream that is piped with the readStream to consume the readStream.
When we run the code above we will be able to read all the data in readStream. But it is not very efficient.
In the above example, we are pushing all the data before piping it with the process.stdout(). The efficient way is where the data is passed when a client asks for it. We can achieve that by including read() in the readable stream.
const readStream = new Readable({
read(size) {
},
});
This way when the read() is called it can push some of the data to the queue. We can push an alphabet at a time and then increment that in every push().
const readStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharacterCode++));
if (this.currentCharacterCode > 85) {
this.push(null);
}
},
});
readStream.currentChararacterCode = 66;
readStream.pipe(process.stdout);
While the readable stream is in execution the execution of the read() will continue to fire and more characters would be added. The execution of the read() is needed to stop or it will continue, and to stop that if() the condition is implemented that stops execution when currentCharacterCode equals 85.
This is very similar to the one before but only this time the data is sent according to the demand. And that is the best practice.
Implementing Duplex/Transform Stream:
In, the duplex stream we can implement both readable and writeable streams in a single object. It is like importing both interfaces at a time.
const { Duplex } = require('stream');
const readWriteStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharacterCode++));
if (this.currentCharacterCode > 85) {
this.push(null);
}
},
});
readWriteStream.currentCharacterCode = 66;
process.stdin.pipe(readWriteStream).pipe(process.stdout);
There is one important thing that needs to be understood the readable stream part of the Duplex stream and writable stream part both works totally in an independent manner. It is just a grouping of the two different streams.
A Transform stream is a more intriguing duplex stream because the output of this is calculated from its input.
To implement the Transform stream, we do not need to implement the read and write method only the transform method will do the work, which combines both of them in one.
const { Transform } = require('stream');
const lowerCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toLowerCase());
callback();
},
});
process.stdin.pipe(lowerCaseTr).pipe(process.stdout);
Conclusion
Streams are crucial Node.js components that are far more efficient than traditional data handling approaches. They enable developers to design high-performance, real-time applications. Streams might be tough to comprehend at times but understanding more about them and implementing them into your apps will help you master them in continually grasping data from a source or to the destination.
Author Bio: Vinod Satapara – Technical Director, iFour Technolab Pvt. Ltd. Technocrat and entrepreneur of a reputed Microsoft 365 development company with years of experience in building large scale enterprise web, cloud and mobile applications using latest technologies like ASP.NET, CORE, .NET MVC, Angular and Blockchain. Keen interest in addressing business problems using latest technologies and help organization to achieve goals.