返回顶部
首页 > 资讯 > 前端开发 > JavaScript >node中的stream有哪些类型
  • 691
分享到

node中的stream有哪些类型

2024-04-02 19:04:59 691人浏览 泡泡鱼
摘要

本篇内容介绍了“node中的stream有哪些类型”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

本篇内容介绍了“node中的stream有哪些类型”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

node stream有4种类型:1、Readable(可读流)。需要实现“_read”方法来返回内容;2、Writable(可写流),需要实现“_write”方法来接受内容;3、Duplex(可读可写流),需要实现“_read”和“_write”方法来接受和返回内容;4、TransfORM(转换流),需要实现“_transform”方法来把接受的内容转换之后返回内容。

node中的stream有哪些类型

教程操作环境:windows7系统、nodejs16版,DELL G3电脑。

流(Stream)在 Nodejs 中是个十分基础的概念,很多基础模块都是基于流实现的,扮演着十分重要的角色。同时流也是是一个十分难以理解的概念,这主要是相关的文档比较缺少,对于 NodeJs 初学者来说,理解流往往需要花很多时间理解,才能真正掌握这个概念,所幸的是,对于大部分 NodeJs 使用者来说,仅仅是用来开发 WEB 应用,对流的不充分认识并不影响使用。但是,理解流能够对 NodeJs 中的其他模块有更好的理解,同时在某些情况下,使用流来处理数据会有更好的效果。

Stream

Stream 是在 node.js 中处理流数据的抽象接口。Stream 并不是一个实际的接口,而是对所有流的一种统称。实际的接口有 ReadableStream、 WritableStream、ReadWriteStream 这几个。

interface ReadableStream extends EventEmitter {
    readable: boolean;
    read(size?: number): string | Buffer;
    setEncoding(encoding: BufferEncoding): this;
    pause(): this;
    resume(): this;
    isPaused(): boolean;
    pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;
    unpipe(destination?: WritableStream): this;
    unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;
    wrap(oldStream: ReadableStream): this;
    [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}

interface WritableStream extends EventEmitter {
    writable: boolean;
    write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;
    write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;
    end(cb?: () => void): this;
    end(data: string | Uint8Array, cb?: () => void): this;
    end(str: string, encoding?: BufferEncoding, cb?: () => void): this;
}

interface ReadWriteStream extends ReadableStream, WritableStream { }

可以看出 ReadableStream 和 WritableStream 都是继承 EventEmitter 类的接口(ts中接口是可以继承类的,因为他们只是在进行类型的合并)。

上面这些接口对应的实现类分别是 Readable、Writable 和 Duplex

NodeJs中的流有4种:

  • Readable 可读流(实现ReadableStream)

  • Writable 可写流(实现WritableStream)

  • Duplex 可读可写流(继承Readable后实现WritableStream)

  • Transform 转换流(继承Duplex)

它们都有要实现的方法:

  • Readable 需要实现 _read 方法来返回内容

  • Writable 需要实现 _write 方法来接受内容

  • Duplex 需要实现 _read 和 _write 方法来接受和返回内容

  • Transform 需要实现 _transform 方法来把接受的内容转换之后返回

Readable

可读流(Readable)是流的一种类型,他有两种模式三种状态

两种读取模式:

  • 流动模式:数据会从底层系统读取写入到缓冲区,当缓冲区被写满后自动通过 EventEmitter 尽快的将数据传递给所注册的事件处理程序中

  • 暂停模式:在这种模式下将不会主动触发 EventEmitter 传输数据,必须显示的调用 Readable.read() 方法来从缓冲区中读取数据,read 会触发响应到 EventEmitter 事件。

三种状态:

  • readableFlowing === null(初始状态)

  • readableFlowing === false(暂停模式)

  • readableFlowing === true(流动模式)

初始时流的 readable.readableFlowingnull

添加data事件后变为 true 。调用 pause()unpipe()、或接收到背压或者添加 readable 事件,则 readableFlowing 会被设为 false ,在这个状态下,为 data 事件绑定监听器不会使 readableFlowing 切换到 true

调用 resume() 可以让可读流的 readableFlowing 切换到 true

移除所有的 readable 事件是使 readableFlowing 变为 null 的唯一方法。

事件名说明
readable当缓冲区有新的可读取数据时触发(每一个想缓存池插入节点都会触发)
data每一次消费数据后都会触发,参数是本次消费的数据
close流关闭时触发
error流发生错误时触发
方法名说明
read(size)消费长度为size的数据,返回null表示当前数据不足size,否则返回本次消费的数据。size不传递时表示消费缓存池中所有数据
const fs = require('fs');

const readStreams = fs.createReadStream('./EventEmitter.js', {
    highWaterMark: 100// 缓存池浮标值
})

readStreams.on('readable', () => {
    console.log('缓冲区满了')
    readStreams.read()// 消费缓存池的所有数据,返回结果并且触发data事件
})


readStreams.on('data', (data) => {
    console.log('data')
})

https://GitHub1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527

当 size 为 0 会触发 readable 事件。

当缓存池中的数据长度达到浮标值 highWaterMark 后,就不会在主动请求生产数据,而是等待数据被消费后在生产数据

暂停状态的流如果不调用 read 来消费数据时,后续也不会在触发 datareadable,当调用 read 消费时会先判断本次消费后剩余的数据长度是否低于 浮标值,如果低于 浮标值 就会在消费前请求生产数据。这样在 read 后的逻辑执行完成后新的数据大概率也已经生产完成,然后再次触发 readable,这种提前生产下一次消费的数据存放在缓存池的机制也是缓存流为什么快的原因

流动状态下的流有两种情况

  • 生产速度慢于消费速度时:这种情况下每一个生产数据后一般缓存池中都不会有剩余数据,直接将本次生产的数据传递给 data 事件即可(因为没有进入缓存池,所以也不用调用 read 来消费),然后立即开始生产新数据,待上一次数据消费完后新数据才生产好,再次触发 data ,一只到流结束。

  • 生产速度快于消费速度时:此时每一次生产完数据后一般缓存池都还存在未消费的数据,这种情况一般会在消费数据时开始生产下一次消费的数据,待旧数据消费完后新数据已经生产完并且放入缓存池

他们的区别仅仅在于数据生产后缓存池是否还存在数据,如果存在数据则将生产的数据 push 到缓存池等待消费,如果不存在则直接将数据交给 data 而不加入缓存池。

值得注意的是当一个缓存池中存在数据的流从暂停模式进入的流动模式时,会先循环调用 read 来消费数据只到返回 null

暂停模式

node中的stream有哪些类型

暂停模式下,一个可读流读创建时,模式是暂停模式,创建后会自动调用 _read 方法,把数据从数据源 push 到缓冲池中,直到缓冲池中的数据达到了浮标值。每当数据到达浮标值时,可读流会触发一个 " readable " 事件,告诉消费者有数据已经准备好了,可以继续消费。

一般来说, 'readable' 事件表明流有新的动态:要么有新的数据,要么到达流的尽头。所以,数据源的数据被读完前,也会触发一次 'readable' 事件;

消费者 " readable " 事件的处理函数中,通过 stream.read(size) 主动消费缓冲池中的数据。

const { Readable } = require('stream')

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    // 参数的 read 方法会作为流的 _read 方法,用于获取源数据
    read(size) {
        // 假设我们的源数据上 1000 个1
        let chunk = null
        // 读取数据的过程一般是异步的,例如IO操作
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = '1'.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})
// 每一次成功 push 数据到缓存池后都会触发 readable
myReadable.on('readable', () => {
    const chunk = myReadable.read()//消费当前缓存池中所有数据
    console.log(chunk.toString())
})

值得注意的是, 如果 read(size) 的 size 大于浮标值,会重新计算新的浮标值,新浮标值是size的下一个二次幂(size <= 2^n,n取最小值)

//  hwm 不会大于 1GB.
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
  if (n >= MAX_HWM) {
    // 1GB限制
    n = MAX_HWM;
  } else {
    //取下一个2最高幂,以防止过度增加hwm
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
  }
  return n;
}

流动模式

node中的stream有哪些类型

所有可读流开始的时候都是暂停模式,可以通过以下方法可以切换至流动模式:

  • 添加 " data " 事件句柄;

  • 调用 “ resume ”方法;

  • 使用 " pipe " 方法把数据发送到可写流

流动模式下,缓冲池里面的数据会自动输出到消费端进行消费,同时,每次输出数据后,会自动回调 _read 方法,把数据源的数据放到缓冲池中,如果此时缓存池中不存在数据则会直接吧数据传递给 data 事件,不会经过缓存池;直到流动模式切换至其他暂停模式,或者数据源的数据被读取完了( push(null) );

可读流可以通过以下方式切换回暂停模式:

  • 如果没有管道目标,则调用 stream.pause()

  • 如果有管道目标,则移除所有管道目标。调用 stream.unpipe() 可以移除多个管道目标。

const { Readable } = require('stream')

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    read(size) {
        let chunk = null
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = '1'.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})

myReadable.on('data', data => {
    console.log(data.toString())
})

Writable

相对可读流来说,可写流要简单一些。

node中的stream有哪些类型

当生产者调用 write(chunk) 时,内部会根据一些状态(corked,writing等)选择是否缓存到缓冲队列中或者调用 _write,每次写完数据后,会尝试清空缓存队列中的数据。如果缓冲队列中的数据大小超出了浮标值(highWaterMark),消费者调用 write(chunk) 后会返回 false,这时候生产者应该停止继续写入。

那么什么时候可以继续写入呢?当缓冲中的数据都被成功 _write 之后,清空了缓冲队列后会触发 drain 事件,这时候生产者可以继续写入数据。

当生产者需要结束写入数据时,需要调用 stream.end 方法通知可写流结束。

const { Writable, Duplex } = require('stream')
let fileContent = ''
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {// 会作为_write方法
        setTimeout(()=>{
            fileContent += chunk
            callback()// 写入结束后调用
        }, 500)
    }
})

myWritable.on('close', ()=>{
    console.log('close', fileContent)
})
myWritable.write('123123')// true
myWritable.write('123123')// false
myWritable.end()

注意,在缓存池中数据到达浮标值后,此时缓存池中可能存在多个节点,在清空缓存池的过程中(循环调用_read),并不会向可读流一样尽量一次消费长度为浮标值的数据,而是每次消费一个缓冲区节点,即使这个缓冲区长度于浮标值不一致也是如此

const { Writable } = require('stream')


let fileContent = ''
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {
        setTimeout(()=>{
            fileContent += chunk
            console.log('消费', chunk.toString())
            callback()// 写入结束后调用
        }, 100)
    }
})

myWritable.on('close', ()=>{
    console.log('close', fileContent)
})

let count = 0
function productionData(){
    let flag = true
    while (count <= 20 && flag){
        flag = myWritable.write(count.toString())
        count++
    }
    if(count > 20){
        myWritable.end()
    }
}
productionData()
myWritable.on('drain', productionData)

上述是一个浮标值为 10 的可写流,现在数据源是一个 0——20 到连续的数字字符串productionData 用于写入数据。

  • 首先第一次调用 myWritable.write("0") 时,因为缓存池不存在数据,所以 "0" 不进入缓存池,而是直接交给 _wirtemyWritable.write("0") 返回值为 true

  • 当执行 myWritable.write("1") 时,因为 _wirtecallback 还未调用,表明上一次数据还未写入完,位置保证数据写入的有序性,只能创建一个缓冲区将 "1" 加入缓存池中。后面 2-9 都是如此

  • 当执行 myWritable.write("10") 时,此时缓冲区长度为 9(1-9),还未到达浮标值, "10" 继续作为一个缓冲区加入缓存池中,此时缓存池长度变为 11,所以 myWritable.write("1") 返回 false,这意味着缓冲区的数据已经足够,我们需要等待 drain 事件通知时再生产数据。

  • 100ms过后,_write("0", encoding, callback)callback 被调用,表明 "0" 已经写入完成。然后会检查缓存池中是否存在数据,如果存在则会先调用 _read 消费缓存池的头节点("1"),然后继续重复这个过程直到缓存池为空后触发 drain 事件,再次执行 productionData

  • 调用 myWritable.write("11"),触发第1步开始的过程,直到流结束。

Duplex

在理解了可读流与可写流后,双工流就好理解了,双工流事实上是继承了可读流然后实现了可写流(源码是这么写的,但是应该说是同时实现了可读流和可写流更加好)。

node中的stream有哪些类型

Duplex 流需要同时实现下面两个方法

  • 实现 _read() 方法,为可读流生产数据

  • 实现 _write() 方法,为可写流消费数据

上面两个方法如何实现在上面可写流可读流的部分已经介绍过了,这里需要注意的是,双工流是存在两个独立的缓存池分别提供给两个流,他们的数据源也不一样

以 NodeJs 的标准输入输出流为例:

  • 当我们在控制台输入数据时会触发其 data 事件,这证明他有可读流的功能,每一次用户键入回车相当于调用可读的 push 方法推送生产的数据。

  • 当我们调用其 write 方法时也可以向控制台输出内容,但是不会触发 data 事件,这说明他有可写流的功能,而且有独立的缓冲区,_write 方法的实现内容就是让控制台展示文字。

// 每当用户在控制台输入数据(_read),就会触发data事件,这是可读流的特性
process.stdin.on('data', data=>{
    process.stdin.write(data);
})

// 每隔一秒向标准输入流生产数据(这是可写流的特性,会直接输出到控制台上),不会触发data
setInterval(()=>{
    process.stdin.write('不是用户控制台输入的数据')
}, 1000)

Transform

node中的stream有哪些类型

可以将 Duplex 流视为具有可写流的可读流。两者都是独立的,每个都有独立的内部缓冲区。读写事件独立发生。

                             Duplex Stream
                          ------------------|
                    Read  <-----               External Source
            You           ------------------|  
                    Write ----->               External Sink
                          ------------------|

Transform 流是双工的,其中读写以因果关系进行。双工流的端点通过某种转换链接。读取要求发生写入。

                                 Transform Stream
                           --------------|--------------
            You     Write  ---->                   ---->  Read  You
                           --------------|--------------

对于创建 Transform 流,最重要的是要实现 _transform 方法而不是 _write 或者 _read_transform 中对可写流写入的数据做处理(消费)然后为可读流生产数据。

转换流还经常会实现一个 `_flush` 方法,他会在流结束前被调用,一般用于对流的末尾追加一些东西,例如压缩文件时的一些压缩信息就是在这里加上的
const { write } = require('fs')
const { Transform, PassThrough } = require('stream')

const reurce = '1312123213124341234213423428354816273513461891468186499126412'

const transform = new Transform({
    highWaterMark: 10,
    transform(chunk ,encoding, callback){// 转换数据,调用push将转换结果加入缓存池
        this.push(chunk.toString().replace('1', '@'))
        callback()
    },
    flush(callback){// end触发前执行
        this.push('<<<')
        callback()
    }
})


// write 不断写入数据
let count = 0
transform.write('>>>')
function productionData() {
    let flag = true
    while (count <= 20 && flag) {
        flag = transform.write(count.toString())
        count++
    }
    if (count > 20) {
        transform.end()
    }
}
productionData()
transform.on('drain', productionData)


let result = ''
transform.on('data', data=>{
    result += data.toString()
})
transform.on('end', ()=>{
    console.log(result)
    // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<<
})

“node中的stream有哪些类型”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

--结束END--

本文标题: node中的stream有哪些类型

本文链接: https://lsjlt.com/news/98312.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • node中的stream有哪些类型
    本篇内容介绍了“node中的stream有哪些类型”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成! ...
    99+
    2024-04-02
  • Node中的I/O模型有哪些
    这篇文章主要为大家展示了“Node中的I/O模型有哪些”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Node中的I/O模型有哪些”这篇文章吧。我们以网络请求IO...
    99+
    2024-04-02
  • javascript中有哪些类型
    这篇“javascript中有哪些类型”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“jav...
    99+
    2024-04-02
  • Hibernate中有哪些类型分类
    本篇文章给大家分享的是有关Hibernate中有哪些类型分类,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。1. 实体(Entities)和值(values)为了理解很多与持久化...
    99+
    2023-06-17
  • redis中的key有哪些类型
    redis中的key有哪些类型?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。Redis 是速度非常快的非关系型(NoSQL)内存键值数据库...
    99+
    2024-04-02
  • Flink中的Time类型有哪些
    这篇文章主要讲解了“Flink中的Time类型有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Flink中的Time类型有哪些”吧!**Processing Time**Processi...
    99+
    2023-06-19
  • Android中的Call类型有哪些
    这篇文章将为大家详细讲解有关Android中的Call类型有哪些,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。call就是平时我们常用的通话功能,最基本的就是来电incoming call,...
    99+
    2023-05-31
    android call roi
  • java中内部类的类型有哪些
    java中内部类的类型有哪些?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。Java是什么Java是一门面向对象编程语言,可以编写桌面应用程序、Web应用程序、分布式系统和嵌入式...
    99+
    2023-06-14
  • Java8中Stream的特性有哪些
    这期内容当中小编将会给大家带来有关Java8中Stream的特性有哪些,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1.初识stream先来一个总纲:东西就是这么多啦,stream是java8中加入的一个...
    99+
    2023-05-31
    java8 stream ava
  • Sqlserver 中有哪些表类型
    这篇文章将为大家详细讲解有关Sqlserver 中有哪些表类型,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。表类型可以用在存储过程中,用于批量增加表类型定义...
    99+
    2024-04-02
  • MySQL中有哪些列类型
    MySQL中有哪些列类型,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。  MySQL列类型选择和查询效率有怎样的联系  ■使用定长列,不使用可...
    99+
    2024-04-02
  • oracle中有哪些表类型
    oracle中有哪些表类型,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。oracle中有99%以上的表都是堆组织表(heap ...
    99+
    2024-04-02
  • html中MIME类型有哪些
    本篇内容主要讲解“html中MIME类型有哪些”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“html中MIME类型有哪些”吧! 在h...
    99+
    2024-04-02
  • TypeScript的类型有哪些
    这篇文章主要介绍“TypeScript的类型有哪些”,在日常操作中,相信很多人在TypeScript的类型有哪些问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”TypeScri...
    99+
    2024-04-02
  • linux的类型有哪些
    这篇文章主要介绍了linux的类型有哪些的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇linux的类型有哪些文章都会有所收获,下面我们一起来看看吧。 ...
    99+
    2023-02-28
    linux
  • RDD的类型有哪些
    这篇文章主要讲解了“RDD的类型有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“RDD的类型有哪些”吧!一、RDD定义  RDD(Resilient Distributed Datase...
    99+
    2023-06-02
  • Node中的Stream是什么
    本篇内容主要讲解“Node中的Stream是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Node中的Stream是什么”吧!stream 是一个抽象的数据接口,它继承了 EventEmit...
    99+
    2023-07-05
  • java中的数据类型有哪些
    数据类型:注意事项:java开发过程中整型用int、小数用double、布尔用boolean;类型转换都是小范围向大范围转换,大范围往小范围转化需要用到强制转换;例如:(1)int a=12;double b=a;(2)double a=3...
    99+
    2021-03-27
    java基础 java 数据类型
  • HTML5中新的Input类型有哪些
    这篇文章主要为大家展示了“HTML5中新的Input类型有哪些”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“HTML5中新的Input类型有哪些”这篇文章吧。I...
    99+
    2024-04-02
  • JavaScript中的错误类型有哪些
    小编给大家分享一下JavaScript中的错误类型有哪些,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧! ...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作