返回顶部
首页 > 资讯 > 后端开发 > Python >Netty分布式flush方法刷新buffer队列源码剖析
  • 565
分享到

Netty分布式flush方法刷新buffer队列源码剖析

2024-04-02 19:04:59 565人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

目录flush方法这里最终会调用AbstractUnsafe的flush方法跟进addFlush方法回到addFlush方法回到AbstractUnsafe的flush方法我们重点关

flush方法

上一小节学习了writeAndFlush的write方法, 这一小节我们剖析flush方法

通过前面的学习我们知道, flush方法通过事件传递, 最终会传递到HeadContext的flush方法:

public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

这里最终会调用AbstractUnsafe的flush方法

public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    outboundBuffer.addFlush();
    flush0();
}

这里首先也是拿到ChannelOutboundBuffer对象

然后我们看这一步:

outboundBuffer.addFlush();

这一步同样也是调整ChannelOutboundBuffer的指针

跟进addFlush方法

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) { 
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                int pending = entry.cancel();
                decrementPendinGoutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
        unflushedEntry = null;
    }
}

首先声明一个entry指向unflushedEntry, 也就是第一个未flush的entry

通常情况下unflushedEntry是不为空的, 所以进入if

再未刷新前flushedEntry通常为空, 所以会执行到flushedEntry = entry

也就是flushedEntry指向entry

经过上述操作, 缓冲区的指针情况如图所示:

7-4-1

然后通过do-while将, 不断寻找unflushedEntry后面的节点, 直到没有节点为止

flushed自增代表需要刷新多少个节点

循环中我们关注这一步

decrementPendingOutboundBytes(pending, false, true);

这一步也是统计缓冲区中的字节数, 但是是和上一小节的incrementPendingOutboundBytes正好是相反, 因为这里是刷新, 所以这里要减掉刷新后的字节数,

我们跟到方法中:

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    //从总的大小减去
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    //直到减到小于某一个阈值32个字节
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        //设置写状态
        setWritable(invokeLater);
    }
}

同样TOTAL_PENDING_SIZE_UPDATER代表缓冲区的字节数, 这里的addAndGet中参数是-size, 也就是减掉size的长度

再看 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) 

getWriteBufferLowWaterMark()代表写buffer的第水位值, 也就是32k, 如果写buffer的长度小于这个数, 就通过setWritable方法设置写状态

也就是通道由原来的不可写改成可写

回到addFlush方法

遍历do-while循环结束之后, 将unflushedEntry指为空, 代表所有的entry都是可写的

经过上述操作, 缓冲区的指针情况如下图所示:

7-4-2

回到AbstractUnsafe的flush方法

指针调整完之后, 我们跟到flush0()方法中:

protected void flush0() {
    if (inFlush0) {
        return;
    }
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }
    inFlush0 = true;
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }
    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
        } else {
            outboundBuffer.failFlushed(t, true);
        }
    } finally {
        inFlush0 = false;
    }
}

 if (inFlush0) 表示判断当前flush是否在进行中, 如果在进行中, 则返回, 避免重复进入

我们重点关注doWrite方法

跟到AbstractNIOByteChannel的doWrite方法中去:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;
    boolean setOpWrite = false;
    for (;;) {
        //每次拿到当前节点
        Object msg = in.current();
        if (msg == null) {
            clearOpWrite();
            return;
        }
        if (msg instanceof ByteBuf) {
            //转化成ByteBuf
            ByteBuf buf = (ByteBuf) msg;
            //如果没有可写的值
            int readableBytes = buf.readableBytes();
            if (readableBytes == 0) {
                //移除
                in.remove();
                continue;
            } 
            boolean done = false;
            long flushedAmount = 0;
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                //将buf写入到Socket里面
                //localFlushedAmount代表向jdk底层写了多少字节
                int localFlushedAmount = doWriteBytes(buf);
                //如果一个字节没写, 直接break
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }
                //统计总共写了多少字节
                flushedAmount += localFlushedAmount;
                //如果buffer全部写到jdk底层
                if (!buf.isReadable()) {
                    //标记全写道
                    done = true;
                    break;
                }
            }
            in.progress(flushedAmount);
            if (done) {
                //移除当前对象
                in.remove();
            } else {
                break;
            }
        } else if (msg instanceof FileRegion) {
            //代码省略
        } else {
            throw new Error();
        }
    }
    incompleteWrite(setOpWrite);
}

首先是一个无限for循环

 Object msg = in.current() 这一步是拿到flushedEntry指向的entry中的msg

跟到current()方法中

public Object current() { 
    Entry entry = flushedEntry;
    if (entry == null) {
        return null;
    }
    return entry.msg;
}

这里直接拿到flushedEntry指向的entry中关联的msg, 也就是一个ByteBuf

回到doWrite方法:

如果msg为null, 说明没有可以刷新的entry, 则调用clearOpWrite()方法清除写标识

如果msg不为null, 则会判断是否是ByteBuf类型, 如果是ByteBuf, 就进入if块中的逻辑

if块中首先将msg转化为ByteBuf, 然后判断ByteBuf是否可读, 如果不可读, 则通过in.remove()将当前的byteBuf所关联的entry移除, 然后跳过这次循环进入下次循环

remove方法稍后分析, 这里我们先继续往下看

 boolean done = false 这里设置一个标识, 标识刷新操作是否执行完成, 这里默认值为false代表走到这里没有执行完成

 writeSpinCount = config().getWriteSpinCount() 这里是获得一个写操作的循环次数, 默认是16

然后根据这个循环次数, 进行循环的写操作

在循环中, 关注这一步:

int localFlushedAmount = doWriteBytes(buf);

这一步就是将buf的内容写到channel中, 并返回写的字节数, 这里会调用NiOSocketChannel的doWriteBytes

我们跟到doWriteBytes方法中:

protected int doWriteBytes(ByteBuf buf) throws Exception { 
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

这里首先拿到buf的可读字节数, 然后通过readBytes将可读字节写入到jdk底层的channel中

回到doWrite方法:

将内容写的jdk底层的channel之后, 如果一个字节都没写, 说明现在channel可能不可写, 将setOpWrite设置为true, 用于标识写操作位, 并退出循环

如果已经写出字节, 则通过 flushedAmount += localFlushedAmount 累加写出的字节数

然后根据是buf是否没有可读字节数判断是否buf的数据已经写完, 如果写完, 将done设置为true, 说明写操作完成, 并退出循环

因为有时候不一定一次就能将byteBuf所有的字节写完, 所以这里会继续通过循环进行写出, 直到循环到16次

如果ByteBuf内容完全写完, 会通过in.remove()将当前entry移除掉

我们跟到remove方法中:

public boolean remove() {
    //拿到当前第一个flush的entry
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;
    ChannelPromise promise = e.promise;
    int size = e.pendingSize;
    removeEntry(e);
    if (!e.cancelled) {
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }
    e.recycle();
    return true;
}

首先拿到当前的flushedEntry

我们重点关注removeEntry这步, 跟进去:

private void removeEntry(Entry e) { 
    if (-- flushed == 0) {
        //位置为空
        flushedEntry = null;
        //如果是最后一个节点
        if (e == tailEntry) {
            //全部设置为空
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        //移动到下一个节点
        flushedEntry = e.next;
    }
}

 if (-- flushed == 0) 表示当前节点是否为需要刷新的最后一个节点, 如果是, 则flushedEntry指针设置为空

如果当前节点是tailEntry节点, 说明当前节点是最后一个节点, 将tailEntry和unflushedEntry两个指针全部设置为空

如果当前节点不是需要刷新的最后的一个节点, 则通过 flushedEntry = e.nex t这步将flushedEntry指针移动到下一个节点

以上就是flush操作的相关逻辑,更多关于Netty分布式flush方法刷新buffer队列的资料请关注编程网其它相关文章!

--结束END--

本文标题: Netty分布式flush方法刷新buffer队列源码剖析

本文链接: https://lsjlt.com/news/144296.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Netty分布式flush方法刷新buffer队列源码剖析
    目录flush方法这里最终会调用AbstractUnsafe的flush方法跟进addFlush方法回到addFlush方法回到AbstractUnsafe的flush方法我们重点关...
    99+
    2024-04-02
  • Netty分布式flush方法刷新buffer队列源码分析
    本文小编为大家详细介绍“Netty分布式flush方法刷新buffer队列源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“Netty分布式flush方法刷新buffer队列源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入...
    99+
    2023-06-29
  • Netty分布式编码器写buffer队列逻辑剖析
    目录写buffer队列我们跟到AbstractUnsafe的write方法中回到write方法中我们跟到setUnwritable(invokeLater)方法中前文传送门:抽象编码...
    99+
    2024-04-02
  • Netty分布式NioEventLoop任务队列执行源码分析
    目录执行任务队列跟进runAllTasks方法:我们跟进fetchFromScheduledTaskQueue()方法回到runAllTasks(long timeoutNanos)...
    99+
    2024-04-02
  • Netty分布式ByteBuf的分类方式源码解析
    目录ByteBuf根据不同的分类方式 会有不同的分类结果1.Pooled和Unpooled2.基于直接内存的ByteBuf和基于堆内存的ByteBuf3.safe和unsafe上一小...
    99+
    2024-04-02
  • Netty分布式NioEventLoop任务队列执行的方法
    这篇文章主要介绍“Netty分布式NioEventLoop任务队列执行的方法”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Netty分布式NioEventLoop任务队列执行的方法”文章能帮助大家解...
    99+
    2023-06-29
  • Netty分布式获取异线程释放对象源码剖析
    目录获取异线程释放对象在介绍之前我们首先看Stack类中的两个属性我们跟到pop方法中继续跟到scavengeSome方法中我们继续分析transfer方法接着我们我们关注一个细节我...
    99+
    2024-04-02
  • Netty分布式FastThreadLocal的set方法实现逻辑剖析
    目录FastThreadLocal的set方法实现线程set对象我们跟到setIndexedVariable中我们跟进removeIndexedVariable方法上一小节我们学习了...
    99+
    2024-04-02
  • laravel源码分析队列Queue方法示例
    目录前言队列任务的创建队列任务的分发前言 队列 (Queue) 是 laravel 中比较常用的一个功能,队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求...
    99+
    2024-04-02
  • laravel源码分析队列Queue方法怎么用
    本篇内容介绍了“laravel源码分析队列Queue方法怎么用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!队列任务的创建先通过命令创建一个...
    99+
    2023-06-29
  • Netty分布式ByteBuf使用的底层实现方式源码解析
    目录概述AbstractByteBuf属性和构造方法首先看这个类的属性和构造方法我们看几个最简单的方法我们重点关注第二个校验方法ensureWritable(length)我们跟到扩...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作