Node.js流编程:全面指南
立即解锁
发布时间: 2025-08-19 01:17:24 订阅数: 4 


Node.js设计模式与最佳实践
### Node.js 流编程:全面指南
#### 1. 随机流(RandomStream)
随机流以 5% 的概率随机终止流,通过向内部缓冲区推送 `null` 来表示流的结束(EOF 情况)。`_read()` 函数的 `size` 参数会被忽略,因为它只是一个建议性参数。我们可以直接推送所有可用数据,但如果在同一调用中有多次推送,就需要检查 `push()` 的返回值。若返回 `false`,则意味着内部缓冲区已达到 `highWaterMark` 限制,此时应停止添加更多数据。
以下是使用随机流的示例代码:
```javascript
const RandomStream = require('./randomStream');
const randomStream = new RandomStream();
randomStream.on('readable', () => {
let chunk;
while((chunk = randomStream.read()) !== null) {
console.log(`Chunk received: ${chunk.toString()}`);
}
});
```
操作步骤:
1. 创建 `generateRandom.js` 模块。
2. 引入 `RandomStream` 类并实例化。
3. 监听 `readable` 事件,在事件处理函数中读取数据并打印。
#### 2. 可写流(Writable streams)
可写流代表数据的目的地,在 Node.js 中,它使用 `stream` 模块中的 `Writable` 抽象类实现。
##### 2.1 写入流
向可写流写入数据可使用 `write()` 方法,其签名如下:
```javascript
writable.write(chunk, [encoding], [callback])
```
- `encoding` 参数可选,当 `chunk` 为字符串时可指定编码(默认为 `utf8`,若 `chunk` 为 `Buffer` 则忽略)。
- `callback` 函数在数据刷新到底层资源时调用,也是可选的。
要表示不再向流写入数据,可使用 `end()` 方法:
```javascript
writable.end([chunk], [encoding], [callback])
```
我们可以通过 `end()` 方法提供最后一块数据,此时 `callback` 函数相当于注册 `finish` 事件的监听器,该事件在流中写入的所有数据都刷新到底层资源时触发。
以下是一个创建 HTTP 服务器输出随机字符串序列的示例:
```javascript
const Chance = require('chance');
const chance = new Chance();
require('http').createServer((req, res) => {
res.writeHead(200, {'Content-Type': 'text/plain'});
while(chance.bool({likelihood: 95})) {
res.write(chance.string() + '\n');
}
res.end('\nThe end...\n');
res.on('finish', () => console.log('All data was sent'));
}).listen(8080, () => console.log('Listening on http://localhost:8080'));
```
操作步骤:
1. 创建 `entropyServer.js` 模块。
2. 引入 `chance` 模块和 `http` 模块。
3. 创建 HTTP 服务器,设置响应头。
4. 使用循环以 95% 的概率写入随机字符串。
5. 调用 `end()` 方法结束流,并注册 `finish` 事件监听器。
6. 启动服务器监听 8080 端口。
测试服务器:
- 打开浏览器访问 `http://localhost:8080`。
- 在终端使用 `curl localhost:8080`。
##### 2.2 背压(Back-pressure)
与实际管道系统中的液体流动类似,Node.js 流也可能出现瓶颈,即数据写入速度快于流的消费速度。为解决此问题,会对传入数据进行缓冲。但如果流不向写入者提供反馈,可能会导致内部缓冲区积累越来越多的数据,从而导致内存使用量过高。
为防止这种情况发生,当内部缓冲区超过 `highWaterMark` 限制时,`writable.write()` 会返回 `false`。可写流有一个 `highWaterMark` 属性,当内部缓冲区大小超过该限制时,`write()` 方法开始返回 `false`,表示应用程序应停止写入。当缓冲区清空时,会发出 `drain` 事件,表明可以安全地再次开始写入。这种机制称为背压。
以下是考虑背压的 `entropyServer` 修改示例:
```javascript
const Chance = require('chance');
const chance = new Chance();
require('http').createServer((req, res) => {
res.writeHead(200, {'Content-Type': 'text/plain'});
function generateMore() {
while(chance.bool({likelihood: 95})) {
let shouldContinue = res.write(
chance.string({length: (16 * 1024) - 1})
);
if(!shouldContinue) {
console.log('Backpressure');
return res.once('drain', generateMore);
}
}
res.end('\nThe end...\n',() => console.log('All data was sent'));
}
generateMore();
}).listen(8080, () => console.log('Listening on http://localhost:8080'));
```
操作步骤:
1. 将主要逻辑封装在 `generateMore()` 函数中。
2. 增加数据块大小到 16 KB - 1 字节,接近默认的 `highWaterMark` 限制。
3. 写入数据后检查 `write()` 方法的返回值,若为 `false`,则监听 `drain` 事件并在事件触发时再次调用 `generateMore()` 函数。
##### 2.3 实现可写流
我们可以通过继承 `stream.Writable` 的原型并实现 `_write()` 方法来创建新的可写流。
以下是一个将对象保存到文件的可写流示例:
```javascript
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');
class ToFileStream extends stream.Writable {
constructor() {
super({objectMode: true});
}
_write (chunk, encoding, callback) {
mkdirp(path.dirname(chunk.path), err => {
if (err) {
return callback(err);
}
fs.writeFile(chunk.path, chunk.content, callback);
});
}
}
module.exports = ToFileStream;
```
操作步骤:
1. 创建
0
0
复制全文
相关推荐









