返回顶部
首页 > 资讯 > 后端开发 > Python >Netty分布式编码器写buffer队列逻辑剖析
  • 937
分享到

Netty分布式编码器写buffer队列逻辑剖析

2024-04-02 19:04:59 937人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

目录写buffer队列我们跟到AbstractUnsafe的write方法中回到write方法中我们跟到setUnwritable(invokeLater)方法中前文传送门:抽象编码

前文传送门:抽象编码器MessageToByteEncoder

写buffer队列

之前的小节我们介绍过, writeAndFlush方法其实最终会调用write和flush方法

write方法最终会传递到head节点, 调用HeadContext的write方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

这里通过unsafe对象的write方法, 将消息写入到缓存中, 具体的执行逻辑, 我们在这个小节进行剖析

我们跟到AbstractUnsafe的write方法中

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    //负责缓冲写进来的byteBuf
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        ReferenceCountUtil.release(msg);
        return;
    }
    int size;
    try {
        //非堆外内存转化为堆外内存
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    //插入写队列
    outboundBuffer.addMessage(msg, size, promise);
}

首先看 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer 

ChannelOutboundBuffer的功能就是缓存写入的ByteBuf

我们继续看try块中的 msg = filterOutboundMessage(msg) 

这步的意义就是将非对外内存转化为堆外内存

filterOutboundMessage方法方法最终会调用AbstractNIOByteChannel中的filterOutboundMessage方法:

protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        //是堆外内存, 直接返回
        if (buf.isDirect()) {
            return msg;
        }
        return newDirectBuffer(buf);
    }
    if (msg instanceof FileRegion) {
        return msg;
    }
    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

首先判断msg是否byteBuf对象, 如果是, 判断是否堆外内存, 如果是堆外内存, 则直接返回, 否则, 通过newDirectBuffer(buf)这种方式转化为堆外内存

回到write方法中

outboundBuffer.addMessage(msg, size, promise)将已经转化为堆外内存的msg插入到写队列

我们跟到addMessage方法当中, 这是ChannelOutboundBuffer中的方法:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise); 
    if (tailEntry == null) { 
        flushedEntry = null;
        tailEntry = entry;
    } else { 
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    } 
    if (unflushedEntry == null) { 
        unflushedEntry = entry;
    }
    incrementPendinGoutboundBytes(size, false);
}

首先通过 Entry.newInstance(msg, size, total(msg), promise) 的方式将msg封装成entry

然后通过调整tailEntry, flushedEntry, unflushedEntry三个指针, 完成entry的添加

这三个指针均是ChannelOutboundBuffer的成员变量

flushedEntry指向第一个被flush的entry

unflushedEntry指向第一个未被flush的entry

也就是说, 从flushedEntry到unflushedEntry之间的entry, 都是被已经被flush的entry

tailEntry指向最后一个entry, 也就是从unflushedEntry到tailEntry之间的entry都是没flush的entry

我们回到代码中:

创建了entry之后首先判断尾指针是否为空, 在第一次添加的时候, 均是空, 所以会将flushedEntry设置为null, 并且将尾指针设置为当前创建的entry

最后判断unflushedEntry是否为空, 如果第一次添加这里也是空, 所以这里将unflushedEntry设置为新创建的entry

第一次添加如下图所示

7-3-1

如果不是第一次调用write方法, 则会进入 if (tailEntry == null) 中else块:

 Entry tail = tailEntry  这里tail就是当前尾节点

 tail.next = entry  代表尾节点的下一个节点指向新创建的entry

 tailEntry = entry  将尾节点也指向entry

这样就完成了添加操作, 其实就是将新创建的节点追加到原来尾节点之后

第二次添加 if (unflushedEntry == null) 会返回false, 所以不会进入if块

第二次添加之后指针的指向情况如下图所示:

7-3-4

以后每次调用write, 如果没有调用flush的话都会在尾节点之后进行追加

回到代码中, 看这一步incrementPendingOutboundBytes(size, false)

这步时统计当前有多少字节需要被写出, 我们跟到这个方法中:

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    //TOTAL_PENDING_SIZE_UPDATER当前缓冲区里面有多少待写的字节
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    //getWriteBufferHighWaterMark() 最高不能超过64k
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

看这一步:

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size)

TOTAL_PENDING_SIZE_UPDATER表示当前缓冲区还有多少待写的字节, addAndGet就是将当前的ByteBuf的长度进行累加, 累加到newWriteBufferSize中

在继续看判断 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) 

 channel.config().getWriteBufferHighWaterMark() 表示写buffer的高水位值, 默认是64k, 也就是说写buffer的最大长度不能超过64k

如果超过了64k, 则会调用setUnwritable(invokeLater)方法设置写状态

我们跟到setUnwritable(invokeLater)方法中

private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1; 
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0 && newValue != 0) { 
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

这里通过自旋和cas操作, 传播一个ChannelWritabilityChanged事件, 最终会调用handler的channelWritabilityChanged方法进行处理

以上就是写buffer的相关逻辑,更多关于Netty分布式编码器写buffer队列的资料请关注编程网其它相关文章!

--结束END--

本文标题: Netty分布式编码器写buffer队列逻辑剖析

本文链接: https://lsjlt.com/news/144285.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Netty分布式编码器写buffer队列逻辑剖析
    目录写buffer队列我们跟到AbstractUnsafe的write方法中回到write方法中我们跟到setUnwritable(invokeLater)方法中前文传送门:抽象编码...
    99+
    2024-04-02
  • Netty分布式flush方法刷新buffer队列源码剖析
    目录flush方法这里最终会调用AbstractUnsafe的flush方法跟进addFlush方法回到addFlush方法回到AbstractUnsafe的flush方法我们重点关...
    99+
    2024-04-02
  • Netty分布式抽象编码器MessageToByteEncoder逻辑分析
    目录MessageToByteEncoder首先看MessageToByteEncoder的类声明跟到allocateBuffer方法中前文回顾:Netty分布式编码器及写数据事件处...
    99+
    2024-04-02
  • Netty分布式flush方法刷新buffer队列源码分析
    本文小编为大家详细介绍“Netty分布式flush方法刷新buffer队列源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“Netty分布式flush方法刷新buffer队列源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入...
    99+
    2023-06-29
  • Netty分布式ByteBuf使用的回收逻辑剖析
    目录ByteBuf回收这里调用了release0, 跟进去我们首先分析free方法我们跟到cache中回到add方法中我们回到free方法中前文传送门:ByteBuf使用subPag...
    99+
    2024-04-02
  • Netty分布式抽象编码器MessageToByteEncoder逻辑的示例分析
    小编给大家分享一下Netty分布式抽象编码器MessageToByteEncoder逻辑的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!MessageTo...
    99+
    2023-06-29
  • Netty分布式行解码器逻辑源码解析
    目录行解码器LineBasedFrameDecoder首先看其参数我们跟到重载的decode方法中我们看findEndOfLine(buffer)方法前文传送门:Netty分布式固定...
    99+
    2024-04-02
  • Netty分布式FastThreadLocal的set方法实现逻辑剖析
    目录FastThreadLocal的set方法实现线程set对象我们跟到setIndexedVariable中我们跟进removeIndexedVariable方法上一小节我们学习了...
    99+
    2024-04-02
  • Netty分布式解码器读取数据不完整的逻辑剖析
    目录概述第一节: ByteToMessageDecoder我们看他的定义我们看其channelRead方法我们看cumulator属性我们回到channRead方法中概述 在我们上一...
    99+
    2024-04-02
  • Nett分布式分隔符解码器逻辑源码剖析
    目录分隔符解码器我们看其中的一个构造方法我们跟到重载decode方法中我们看初始化该属性的构造方法章节总结前文传送门:Netty分布式行解码器逻辑源码解析 分隔符解码器 基于分隔符解...
    99+
    2024-04-02
  • Netty分布式Future与Promise执行回调相关逻辑剖析
    目录Future和Promise执行回调首先我们看一段写在handler中的业务代码这里关注newPromise()方法, 跟进去我们继续跟write方法跟进tryFailure方法...
    99+
    2024-04-02
  • Netty分布式NioEventLoop任务队列执行源码分析
    目录执行任务队列跟进runAllTasks方法:我们跟进fetchFromScheduledTaskQueue()方法回到runAllTasks(long timeoutNanos)...
    99+
    2024-04-02
  • Netty分布式pipeline管道传播事件的逻辑总结分析
    目录问题分析首先看第一个问题我们看一下ChannelInitializer这个类的继承关系回到callHandlerCallbackLater方法中紧接着我们看第二个问题章节总结我们...
    99+
    2024-04-02
  • Netty分布式固定长度解码器实现原理剖析
    固定长度解码器 上一小节:解码器读取数据不完整的逻辑剖析 我们了解到, 解码器需要继承ByteToMessageDecoder, 并重写decode方法, 将解析出来的对象放入集合中...
    99+
    2024-04-02
  • Netty分布式解码器读取数据不完整的逻辑是什么
    这篇文章将为大家详细讲解有关Netty分布式解码器读取数据不完整的逻辑是什么,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。概述如果Server在读取客户端的数据的时候, 如果一次读取不完整, 就触发cha...
    99+
    2023-06-29
  • Netty分布式获取异线程释放对象源码剖析
    目录获取异线程释放对象在介绍之前我们首先看Stack类中的两个属性我们跟到pop方法中继续跟到scavengeSome方法中我们继续分析transfer方法接着我们我们关注一个细节我...
    99+
    2024-04-02
  • Netty分布式编码器及写数据事件处理使用场景的示例分析
    这篇文章主要介绍Netty分布式编码器及写数据事件处理使用场景的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!编码器第一节: writeAndFlush的事件传播我们之前在学习pipeline的时候...
    99+
    2023-06-29
  • Netty分布式编码器及写数据事件处理使用场景
    目录概述编码器第一节: writeAndFlush的事件传播我们看一个最简单的使用的场景我们跟到writeAndFlush方法中我们跟到invokeWriteAndFlus...
    99+
    2024-04-02
  • Netty分布式ByteBuf缓冲区分配器源码解析
    目录缓冲区分配器以其中的分配ByteBuf的方法为例, 对其做简单的介绍跟到directBuffer()方法中我们回到缓冲区分配的方法然后通过validate方法进行参数验...
    99+
    2024-04-02
  • Netty分布式固定长度解码器的示例分析
    这篇文章主要为大家展示了“Netty分布式固定长度解码器的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Netty分布式固定长度解码器的示例分析”这篇文章吧。固定长度解码器我们了解到, ...
    99+
    2023-06-29
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作