nodeJS 中的stream对象

前言

如果你是一名当今世界的前端开发者,那么流一定是一个你需要掌握的概念。如果你想成为一个前端开发高手,那么流一定是武功秘籍中不可缺少的一个部分。本文中的部分内容来自GitHub上的开源社区。

为什么要使用流,流到底是什么

“流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。” 这是 nodeJS 官网上对流的一段解释,估计很多新人看到这个定义是一脸懵逼,下面我们通过一个简单的例子来慢慢解释。
定义中我们知道http请求是流的实例,是一个可读的、可写的流,我们以http服务器的例子来说明,先看下面的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const http = require('http');
const fs = require('fs');

const server = http.createServer((req,res)=>{
fs.readFile(__dirname+"/data.txt",(err,data)=>{
if(err) throw err;
res.end(data);
});
});
server.on("connection",()=>{
console.log("Someone connected.");
});

server.listen(5566);

上述例子创建了一个简单的服务器,当接收到请求是,响应一个data.txt文件,文件的内容有一百万行,在没有使用流的情况下,需要将文件中的所有内容读取完成之后才可以进行发送数据,这大大的损耗了内存资源,在客户端的体验也是非常糟糕的,需要等待服务器端完成所有data.txt文件的读取之后才能响应。下面引入流的实例

1
2
3
4
5
6
7
8
9
10
11
12
const http = require('http');
const fs = require('fs');

const server = http.createServer((req,res)=>{
const reader = fs.createReadStream(__dirname+"/data.txt");
reader.pipe(res);
});
server.on("connection",()=>{
console.log("Someone connected.");
});

server.listen(5588);

通过fs模块创建一个可读的流,用来读取data.txt中的一百万行数据,在将读取到的数据流 pipe 到 响应结果的 res 中,这样客户端发起请求的时候就大大减少了等待的时间,服务器端的数据源源不断的 pipe 到响应中去。在这里,.pipe()方法会自动帮助我们监听data和end事件。上面的这段代码不仅简洁,而且data.txt文件中每一小段数据都将源源不断的发送到客户端。除此之外,使用.pipe()方法还有别的好处,比如说它可以自动控制后端压力,以便在客户端连接缓慢的时候node可以将尽可能少的缓存放到内存中。

.pipe()的使用

无论哪一种流,都会使用.pipe()方法来实现输入和输出。.pipe()函数很简单,它仅仅是接受一个源头src并将数据输出到一个可写的流dst中:

1
src.pipe(dst)

.pipe(dst) 将会返回 dst ,因此你可以链式调用多个流:

1
a.pipe(b).pipe(c).pipe(d)

上述的代码等同于以下:

1
2
3
a.pipe(b)
b.pipe(c)
c.pipe(d)

介绍了 src.pipe(dst) 的用法之后,我们需要知道不是所有的属性的流都可以有 .pipe() 方法的,只有可读的流才可以调用该方法,举个例子就如上述的链式调用可以实现就必须要求 b,c,d 本身即是一个可读的流也是一个可写的流,一句话就是,可读的流可以产生数据,再 pipe 到一个可写的流中去。就相当于源源不断的从可读的流文件中读取数据,再源源不断的写入一个可写的流文件中去,实现流最强大的功能。

创建并使用流

说完了流的用法之后,如何创建流是接下来讨论的问题。nodeJS中的 stream 模块可以通过以下方式引入:

1
const stream = require("stream")

尽管所有的 Node.js 用户都应该理解流的工作方式,这点很重要, 但是 stream 模块本身只对于那些需要创建新的流的实例的开发者最有用处。 对于主要是 消费流 的开发者来说,他们很少(如果有的话)需要直接使用 stream 模块。在nodeJS中的fs模块中就封装了很好用的模块,下面的例子就是用来创建一个的可写的流和一个可读的流,并 pipe 到可写的流中去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const fs = require('fs');
const path = require('path');

const file_r = path.join(__dirname,"/test-data.txt");
const file_w = path.join(__dirname,"/test.txt");

const reader = fs.createReadStream(file_r,(err)=>{if(err) throw err;});
const writer = fs.createWriteStream(file_w,(err)=>{if(err) throw err;});

//直接向reader的buffer中写入数据可以采用,当buffer压入null是触发 end 事件
reader.push("Arrow")
reader.push("Flash")
reader.push("Superman")
reader.push(null)

//将reader缓存中的数据和实际文件中的数据 pipe 到可写的流中去
reader.pipe(writer)

流的执行原理

Node.js 中有四种基本的流类型:
1、Readable - 可读的流 (例如fs.createReadStream())
2、Writable - 可写的流 (例如fs.createWriteStream())
3、Duplex - 可读写的流 (例如net.Socket)
4、Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如zlib.createDeflate()).

  • Readable 和 Writable流都会将数据存储到内部的缓存中。这些缓存可以通过相应writable._writableState.getBuffer()readable._readableState.buffer来获取缓存中的内容。
  • 当可读流的实现调用 stream.push(chunk) 方法时,数据被放到缓存中。如果流的 消费者 没有调用 stream.read() 方法, 这些数据会始终存在于内部队列中,直到被消费。
  • 当内部可读缓存的大小达到 highWaterMark 指定的阈值时,流会暂停从底层资源读取数据,直到当前 缓存的数据被消费 (也就是说, 流会在内部停止调用 readable._read() 来填充可读缓存)。
  • 可写流通过反复调用 writable.write(chunk) 方法将数据放到缓存。当内部可写缓存的总大小小于 highWaterMark 指定的阈值时, 调用 writable.write() 将返回true。 一旦内部缓存的大小达到或超过 highWaterMark ,调用 writable.write() 将返回 false 。
  • stream API 的关键目标, 尤其对于 stream.pipe() 方法, 就是限制缓存数据大小,以达到可接受的程度。这样,对于读写速度不匹配的源头和目标,就不会超出可用的内存大小。