nodejs可读流源码分析
可读流是对数据消费的抽象,nodejs中可读流有两种工作模式:流式和暂停式,流式就是有数据的时候就会触发回调,并且把数据传给回调,暂停式就是需要用户自己手动执行读取的操作。我们通过源码去了解一下可读流实现的一些逻辑。因为实现的代码比较多,逻辑也比较绕,本文只分析一些主要的逻辑,有兴趣的可以参考文档或者自行深入看源码了解细节。我们先看一下ReadableState,这个对象是表示可读流的一些状态和属性的。
function ReadableState(options, stream) {
options = options || {};
// 是否是双向流
var isDuplex = stream instanceof Stream.Duplex;
// 数据模式
this.objectMode = !!options.objectMode;
// 双向流的时候,设置读端的模式
if (isDuplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;
// 读到highWaterMark个字节则停止,对象模式的话则是16个对象
this.highWaterMark = getHighWaterMark(this,
options,
'readableHighWaterMark',
isDuplex);
// 存储数据的缓冲区
this.buffer = new BufferList();
// 可读数据的长度
this.length = 0;
// 管道的目的源和个数
this.pipes = null;
this.pipesCount = 0;
// 工作模式
this.flowing = null;
// 流是否已经结束
this.ended = false;
// 是否触发过end事件了
this.endEmitted = false;
// 是否正在读取数据
this.reading = false;
// 是否同步执行事件
this.sync = true;
// 是否需要触发readable事件
this.needReadable = false;
// 是否触发了readable事件
this.emittedReadable = false;
// 是否监听了readable事件
this.readableListening = false;
// 是否正在执行resume的过程
this.resumeScheduled = false;
// has it been destroyed
// 流是否已销毁
this.destroyed = false;
// 数据编码格式
this.defaultEncoding = options.defaultEncoding || 'utf8';
// 在管道化中,有多少个写者已经达到阈值,需要等待触发drain事件,awaitDrain记录达到阈值的写者个数
this.awaitDrain = 0;
// 执行maybeReadMore函数的时候,设置为true
this.readingMore = false;
this.decoder = null;
this.encoding = null;
// 编码解码器
if (options.encoding) {
if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this.decoder = new StringDecoder(options.encoding);
this.encoding = options.encoding;
}
}
ReadableState里包含了一大堆字段,我们可以先不管他,等待用到的时候,再回头看。接着我们开始看可读流的实现。
function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
this._readableState = new ReadableState(options, this);
// 可读
this.readable = true;
// 用户实现的两个函数
if (options) {
if (typeof options.read === 'function')
this._read = options.read;
if (typeof options.destroy === 'function')
this._destroy = options.destroy;
}
// 初始化父类
Stream.call(this);
}
上面的逻辑不多,需要关注的是read和destroy这两个函数,如果我们是直接使用Readable使用可读流,那再options里是必须传read函数的,destroy是可选的。如果我们是以继承的方式使用Readable,那必须实现_read函数。nodejs只是抽象了流的逻辑,具体的操作(比如可读流就是读取数据)是由用户自己实现的,因为读取操作是业务相关的。下面我们分析一下可读流的操作。
1 可读流从底层资源获取数据
对用户来说,可读流是用户获取数据的地方,但是对可读流来说,他提供数据给用户的前提是他自己得有数据,所以可读流首先需要生产数据。生产数据的逻辑由_read函数实现。_read函数的逻辑大概是
const data = getSomeData();
readableStream.push(data);
通过push函数,往可读流里写入数据,然后就可以为用户提供数据,我们看看push的实现,只列出主要逻辑。
Readable.prototype.push = function(chunk, encoding) {
// 省略了编码处理的代码
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
var state = stream._readableState;
// push null代表流结束
if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
} else {
addChunk(stream, state, chunk, false);
}
// 返回是否还可以读取更多数据
return needMoreData(state);
}
function addChunk(stream, state, chunk, addToFront) {
// 是流模式并且缓存没有数据,则直接触发data事件
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
} else {
// 否则先把数据缓存起来
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
state.buffer.push(chunk);
// 监听了readable事件,则触发readable事件
if (state.needReadable)
emitReadable(stream);
}
// 继续读取数据,如果可以的话
maybeReadMore(stream, state);
}
总的来说,可读流首先要从某个地方获取数据,根据当前的工作模式,直接交付给用户,或者先缓存起来。并可以的情况下,继续获取数据。
2 用户从可读流获取数据
用户可以通过read函数或者监听data事件来从可读流中获取数据
Readable.prototype.read = function(n) {
n = parseInt(n, 10);
var state = this._readableState;
// 计算可读的大小
n = howMuchToRead(n, state);
var ret;
// 需要读取的大于0,则取读取数据到ret返回
if (n > 0)
ret = fromList(n, state);
else
ret = null;
// 减去刚读取的长度
state.length -= n;
// 如果缓存里没有数据或者读完后小于阈值了,则可读流可以继续从底层资源里获取数据
if (state.length === 0 || state.length - n < state.highWaterMark) {
this._read(state.highWaterMark);
}
// 触发data事件
if (ret !== null)
this.emit('data', ret);
return ret;
};
读取数据的操作就是计算缓存里有多少数据可以读,和用户需要的数据大小,取小的,然后返回给用户,并触发data事件。如果数据还没有达到阈值,则触发可读流从底层资源中获取数据。
3销毁流
function destroy(err, cb) {
// 设置已销毁标记
if (this._readableState) {
this._readableState.destroyed = true;
}
// 执行_destroy钩子函数,用户可以重写这个函数
this._destroy(err || null, (err) => {
// 出错,但是没有设置回调,则执行触发error事件
if (!cb && err) {
process.nextTick(() => {
this.emit('error', err);
}, this, err);
} else if (cb) {
// 有回调则执行
cb(err);
}
});
return this;
}
我们看一下Readable提供的默认_destroy函数。
Readable.prototype._destroy = function(err, cb) {
this.push(null);
cb(err);
};
刚才分析push函数时已经看到this.push(null)表示流结束了。销毁流意味着关闭流对应的底层资源,不再提供数据服务。 总结:本文就分析到这里,流的实现代码不算很难,但是非常绕,有兴趣的可以详细看源码,最后分享很久之前画的一个图(链接https://www.processon.com/view/link/5cc7e9e5e4b09eb4ac2e0688)。
- android 仿音悦台页面交互效果
- Webpack+Babel+React开发环境搭建
- Android Studio中 .gitignore配置
- Android系统服务之WindowManager整理
- gulp+webpack工具整合简介
- React Native之常用第三方库
- React Native和Android整合详解
- 强大的API测试工具Hitchhiker v0.9 基于UI的断言测试,回顾2017
- WEB前端架构(二)
- WEB前端架构(三)
- node.js基本工作原理及流程
- Github+Jekyll搭建个人博文网站
- Ecmascript语法之Symbol
- 区块链的java实现
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 实现简单登陆注册功能流程分析
- centos系统中yum安装应用出现doesn't have enough cached
- Mac os上显示与隐藏文件
- windows启动tomcat闪退,乱码问题解决
- Homebrew的安装
- 小程序轮播中嵌入视频-关于swiper、video组件与block标签
- Pocket重建您的专注力
- redux-thunk
- 使用vuepress-6小时搭建一个完全免费的个人网站
- 使用item2+oh my zsh优化终端体验
- Svelte中文文档 1基础介绍
- 分布式事物TCC
- docker环境搭建nexus私有maven私服
- mysql每天定时自动全库备份、灾备、docker
- wails Go+vue/angular/react编写桌面GUI客户端