文章已同步至掘金:https://juejin.cn/post/6844903921295556621
欢迎访问😃,有任何问题都可留言评论哦~
流是什么?
顾名思义,流就是数据流动的意思。
举个例子:
比如小区停水了,没想到你家用无塔供水器存了一点水,然而隔壁没有水了,想向你家借点水,直接用桶运水吧,费力气,还有可能浪费,于是可以用根管子,连接你们两家,这样就可以通过管子直接把水流到隔壁家
这就类似于request
对象向服务器发请求要资源,在这request
请求资源的传播方式通过流来实现
为什么使用流?
一般我们处理数据有两种模式:buffer
模式、stream
模式
buffer
模式:取完数据一次性操作stream
模式:边取数据边操作
举个例子:
你想用手机看部电影,用buffer模式就是你把这个电影全部缓存下来,然后再看。用stream模式,就是你边缓存边看
所以从这里就可以看出stream
模式无论是在空间和时间上都优于buffer
模式:
- 空间上:内存只会占用当前需要处理的一块数据区域的大小, 而不是整个文件
- 时间上:因为不需要全部的数据就可以开始处理, 所以时间就相当于是节约了
还有一个好处就是可以链式调用
如果说写入的速度跟不上读取的速度,就有可能导致数据丢失
所以说正常的情况应该是,写完一段,再读取下一段,如果没有写完的话,就让读取流先暂停,等写完再继续,(也就是你看电影的时候,缓存着看着,如果你网络不好,没缓存,就看不了,等再缓存点,再看)
所以为了让可读流和可写流速度一致,就要用到流中必不可少的属性pipe
了,pipe
翻译过来意思是管道,就如上面的例子中的管子一样
流的类型
node中有四种基本流类型:
- Readable 可读流
- Writable 可写流
- Duplex 可读可写流(双工流)
- Transform 在读写过程中可以修改和变换数据的Duplex流(转换流)
流中的数据有两种模式:
- 二进制模式,都是
string
字符串 和buffer
。 - 对象模式,流内部处理的是一系统普通对象。
可读流(Readable)
可读流中的两种模式:
- 流动模式 ( flowing ) :数据自动从系统底层读取,并通过事件,尽可能快地提供给应用程序。
- 暂停模式 ( paused ),必须显式的调用
read()
读取数据。
可读流都开始于 暂停模式
暂停模式 切换到 流动模式:
1.添加 data
事件回调。
2.调用 resume()
。
3.调用 pipe()
。
流动模式 切换到 暂停模式
1.如果没有管道目标,调用 pause()
。
2.如果有管道目标,移除所有管道目标,调用 unpipe()
移除多个管道目标。
createReadStream
方法有两个参数:
1.第一个参数是读取文件的路径
2.第二个参数为 options
选项,其中有八个参数:
- flags:标识位,默认为 r;
- encoding:字符编码,默认为 null;
- fd:文件描述符,默认为 null;
- mode:权限位,默认为 0o666;
- autoClose:是否自动关闭文件,默认为 true;
- start:读取文件的起始位置;
- end:读取文件的(包含)结束位置;
- highWaterMark:最大读取文件的字节数,默认 64 * 1024。
创建可读流,并监听事件:
const fs = require('fs');
//创建一个文件可读流
let rs = fs.createReadStream('./1.txt', {
//文件系统标志
flags: 'r',
//数据编码,如果调置了该参数,则读取的数据会自动解析
//如果没调置,则读取的数据会是 Buffer
//也可以通过 rs.setEncoding() 进行设置
encoding: 'utf8',
//文件描述符,默认为null
fd: null,
//文件权限,
mode: 0o666,
//文件读取的开始位置
start: 0,
//文件读取的结束位置(包括结束位置)
end: Infinity,
//读取缓冲区的大小,默认64K
highWaterMark: 3
});
//文件被打开时触发
rs.on('open', function () {
console.log('文件打开');
});
//监听data事件,会让当前流切换到流动模式
//当流中将数据传给消费者后触发
//由于我们在上面配置了 highWaterMark 为 3字节,所以下面会打印多次。
rs.on('data', function (data) {
console.log(data);
});
//流中没有数据可供消费者时触发
rs.on('end', function () {
console.log('数据读取完毕');
});
//读取数据出错时触发
rs.on('error', function () {
console.log('读取错误');
});
//当文件被关闭时触发
rs.on('close', function () {
console.log('文件关闭');
});
注:open
和 close
事件并不是所有流都会触发。
当们监听data
事件后,系统会尽可能快的读取出数据。但有时候,我们需要暂停一下流的读取,操作其他事情。
这时候就需要用到 pause()
和 resume()
方法。
const fs = require('fs');
//创建一个文件可读流
let rs = fs.createReadStream('./1.txt', {
highWaterMark: 3
});
rs.on('data', function (data) {
console.log(`读取了 ${data.length} 字节数据 : ${data.toString()}`);
//使流动模式的流停止触发'data'事件,切换出流动模式,数据都会保留在内部缓存中。
rs.pause();
//等待3秒后,再恢复触发'data'事件,将流切换回流动模式。
setTimeout(function () {
rs.resume();
}, 3000);
});
可读流的 readable
事件,当流中有数据可供读取时就触发。
注意当监听 readable
事件后,会导致流停止流动,需调用 read()
方法读取数据。
注:on('data')
,on('readable')
,pipe()
不要混合使用,会导致不明确的行为。
const fs = require('fs');
let rs = fs.createReadStream('./1.txt', {
highWaterMark: 1
});
//当流中有数据可供读取时就触发
rs.on('readable', function () {
let data;
//循环读取数据
//参数表示要读取的字节数
//如果可读的数据不足字节数,则返回缓冲区剩余数据
//如是没有指定字节数,则返回缓冲区中所有数据
while (data = rs.read()) {
console.log(`读取到 ${data.length} 字节数据`);
console.log(data.toString());
}
});
可写流(Writable)
createWriteStream
方法有两个参数:
1.第一个参数是读取文件的路径
2.第二个参数为 options
选项,其中有七个参数:
- flags:标识位,默认为 w;
- encoding:字符编码,默认为 utf8;
- fd:文件描述符,默认为 null;
- mode:权限位,默认为 0o666;
- autoClose:是否自动关闭文件,默认为 true;
- start:写入文件的起始位置;
- highWaterMark:一个对比写入字节数的标识,默认 16 * 1024。
创建可写流,并监听事件:
const fs = require('fs');
//创建一个文件可写流
let ws = fs.createWriteStream('./1.txt', {
highWaterMark: 3
});
//往流中写入数据
//参数一表示要写入的数据
//参数二表示编码方式
//参数三表示写入成功的回调
//缓冲区满时返回false,未满时返回true。
//由于上面我们设置的缓冲区大小为 3字节,所以到写入第3个时,就返回了false。
console.log(ws.write('1', 'utf8'));
console.log(ws.write('2', 'utf8'));
console.log(ws.write('3', 'utf8'));
console.log(ws.write('4', 'utf8'));
function writeData() {
let cnt = 9;
return function () {
let flag = true;
while (cnt && flag) {
flag = ws.write(`${cnt}`);
console.log('缓冲区中写入的字节数', ws.writableLength);
cnt--;
}
};
}
let wd = writeData();
wd();
//当缓冲区中的数据满的时候,应停止写入数据,
//一旦缓冲区中的数据写入文件了,并清空了,则会触发 'drain' 事件,告诉生产者可以继续写数据了。
ws.on('drain', function () {
console.log('可以继续写数据了');
console.log('缓冲区中写入的字节数', ws.writableLength);
wd();
});
//当流或底层资源关闭时触发
ws.on('close', function () {
console.log('文件被关闭');
});
//当写入数据出错时触发
ws.on('error', function () {
console.log('写入数据错误');
});
写入流的 end()
方法 和 finish
事件监听
const fs = require('fs');
//创建一个文件可写流
let ws = fs.createWriteStream('./1.txt', {
highWaterMark: 3
});
//往流中写入数据
//参数一表示要写入的数据
//参数二表示编码方式
//参数三表示写入成功的回调
//缓冲区满时返回false,未满时返回true。
//由于上面我们设置的缓冲区大小为 3字节,所以到写入第3个时,就返回了false。
console.log(ws.write('1', 'utf8'));
console.log(ws.write('2', 'utf8'));
console.log(ws.write('3', 'utf8'));
console.log(ws.write('4', 'utf8'));
//调用end()表明已经没有数据要被写入,在关闭流之前再写一块数据。
//如果传入了回调函数,则将作为 'finish' 事件的回调函数
ws.end('最后一点数据', 'utf8');
//调用 end() 且缓冲区数据都已传给底层系统时触发
ws.on('finish', function () {
console.log('写入完成');
});
写入流的 cork()
和 uncork()
方法,主要是为了解决大量小块数据写入时,内部缓冲可能失效,导致的性能下降。
const fs = require('fs');
let ws = fs.createWriteStream('./1.txt', {
highWaterMark: 1
});
//调用 cork() 后,会强制把所有写入的数据缓冲到内存中。
//不会因为写入的数据超过了 highWaterMark 的设置而写入到文件中。
ws.cork();
ws.write('1');
console.log(ws.writableLength);
ws.write('2');
console.log(ws.writableLength);
ws.write('3');
console.log(ws.writableLength);
//将调用 cork() 后的缓冲数据都输出到目标,也就是写入文件中。
ws.uncork();
注意 cork()
的调用次数要与 uncork()
一致。
const fs = require('fs');
let ws = fs.createWriteStream('./1.txt', {
highWaterMark: 1
});
//调用一次 cork() 就应该写一次 uncork(),两者要一一对应。
ws.cork();
ws.write('4');
ws.write('5');
ws.cork();
ws.write('6');
process.nextTick(function () {
//注意这里只调用了一次 uncork()
ws.uncork();
//只有调用同样次数的 uncork() 数据才会被输出。
ws.uncork();
});
双工流(Duplex)
Duplex
(双工)流实际上是继承了Readable
和Writable
的一类流,一个Duplex
对象既可当成可读流来使用,也可以当做可写流来使用。
所以需要继承Duplex
类:
1.继承 Duplex
类
2.实现 _read()
方法
3.实现 _write()
方法
实现了_read()
方法后,可以监听data
事件来消耗Duplex
产生的数据
实现了_write()
方法后,便可以作为下游去消耗数据
相信大家对 read()
、write()
方法的实现不会陌生,因为和 Readable
、Writable
完全一样。
const Duplex = require('stream').Duplex;
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
}
});
在实例化 Duplex
类的时候可以传递几个参数:
readableObjectMode
: 可读流是否设置为ObjectMode
,默认false
writableObjectMode
: 可写流是否设置为ObjectMode
,默认false
allowHalfOpen
: 默认true
, 设置成false
的话,当写入端结束的时,流会自动的结束读取端,反之亦然。
例子:
const Duplex = require('stream').Duplex;
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// The underlying source only deals with strings
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
这是不能执行的伪代码,但是可以看出来 Duplex
的作用,即可以生产数据,又可以消费数据,所以才可以处于数据流动管道的中间环节。
转换流(Transform)
在Duplex
流中,可读流中的数据和可写流中的数据是分开的。
而Transform
流是一种特殊的Duplex
流,它继承自Duplex
流,其可写端的数据经变换后会自动添加到可读端
Tranform
类内部继承了 Duplex
并实现了 writable.write()
和 readable._read()
方法
所以当我们自定义 Transform
流时,只需要:
1.继承 Transform
类
2.实现 _transform()
方法
3.实现 _flush()
方法(可以不实现)
_transform(chunk, encoding, callback)
方法用来接收数据,并产生输出,参数我们已经很熟悉了,和 Writable
一样, chunk
默认是 Buffer
,除非 decodeStrings
被设置为 false
。
在 _transform()
方法内部可以调用 this.push(data)
生产数据,交给可写流,也可以不调用,意味着输入不会产生输出。
当数据处理完了必须调用 callback(err, data)
,第一个参数用于传递错误信息,第二个参数可以省略,如果被传入了,效果和 this.push(data)
一样
transform.prototype._transform = function (data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data);
};
有些时候,transform
操作可能需要在流的最后多写入可写流一些数据。例如, Zlib流会存储一些内部状态,以便优化压缩输出。在这种情况下,可以使用_flush()
方法,它会在所有写入数据被消费、触发 end
之前被调用。
Transform 事件
Transform 流有两个常用的事件:
1.来自 Writable
的 finish
2.来自 Readable
的 end
当调用 transform.end()
并且数据被 _transform()
处理完后会触发 finish
,调用_flush
后,所有的数据输出完毕,触发end
事件。
pipe()方法
pipe()
方法类似下面的代码,在可读流与可写流之间连接一个管道。
const fs = require('fs');
//创建一个可读流
let rs = fs.createReadStream('./1.txt', {
highWaterMark: 3
});
//创建一个可写流
let ws = fs.createWriteStream('./2.txt', {
highWaterMark: 3
});
rs.on('data', function (data) {
let flag = ws.write(data);
console.log(`往可写流中写入 ${data.length} 字节数据`);
//如果写入缓冲区已满,则暂停可读流的读取
if (!flag) {
rs.pause();
console.log('暂停可读流');
}
});
//监控可读流数据是否读完
rs.on('end', function () {
console.log('数据已读完');
//如果可读流读完了,则调用 end() 表示可写流已写入完成
ws.end();
});
//如果可写流缓冲区已清空,可以再次写入,则重新打开可读流
ws.on('drain', function () {
rs.resume();
console.log('重新开启可读流');
});
用 pipe()
方法完成上面的功能。
const fs = require('fs');
//创建一个可读流
let rs = fs.createReadStream('./1.txt', {
highWaterMark: 3
});
//创建一个可写流
let ws = fs.createWriteStream('./2.txt', {
highWaterMark: 3
});
let ws2 = fs.createWriteStream('./3.txt', {
highWaterMark: 3
});
//绑定可写流到可读流,自动将可读流切换到流动模式,将可读流的所有数据推送到可写流。
rs.pipe(ws);
//可以绑定多个可写流
rs.pipe(ws2);
也可以用 unpipe()
手动的解绑可写流。
const fs = require('fs');
//创建一个可读流
let rs = fs.createReadStream('./1.txt', {
highWaterMark: 3
});
//创建一个可写流
let ws = fs.createWriteStream('./2.txt', {
highWaterMark: 3
});
let ws2 = fs.createWriteStream('./3.txt', {
highWaterMark: 3
});
rs.pipe(ws);
rs.pipe(ws2);
//解绑可写流,如果参数没写,则解绑所有管道
setTimeout(function () {
rs.unpipe(ws2);
}, 0);
评论区