当前位置:首页 > 谈天说地 > 正文内容

Netty分布式ByteBuf使用SocketChannel读取数据过程剖析

34资源网2022年03月29日 11:51293

我们第三章分析过客户端接入的流程, 这一小节带大家剖析客户端发送数据, server读取数据的流程:

首先温馨提示, 这一小节高度耦合第三章的第1, 2节的内容, 很多知识这里并不会重复讲解, 如果对之前的知识印象不深刻建议恶补第三章的第1, 2节的内容之后再学习这一小节

传送门:

初始化niosockectchannelconfig

server读取数据的流程

我们首先看nioeventloop的processselectedkey方法

private void processselectedkey(selectionkey k, abstractniochannel ch) {
    //获取到channel中的unsafe
    final abstractniochannel.niounsafe unsafe = ch.unsafe();
    //如果这个key不是合法的, 说明这个channel可能有问题
    if (!k.isvalid()) {
        //代码省略
    }
    try {
        //如果是合法的, 拿到key的io事件
        int readyops = k.readyops();
        //链接事件
        if ((readyops & selectionkey.op_connect) != 0) {
            int ops = k.interestops();
            ops &= ~selectionkey.op_connect;
            k.interestops(ops);
            unsafe.finishconnect();
        }
        //写事件
        if ((readyops & selectionkey.op_write) != 0) {
            ch.unsafe().forceflush();
        }
        //读事件和接受链接事件
        //如果当前nioeventloop是work线程的话, 这里就是op_read事件
        //如果是当前nioeventloop是boss线程的话, 这里就是op_accept事件
        if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) {
            unsafe.read();
            if (!ch.isopen()) {
                return;
            }
        }
    } catch (cancelledkeyexception ignored) {
        unsafe.close(unsafe.voidpromise());
    }
}

 if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) 

这里的判断表示轮询到大事件是op_read或者op_accept事件

之前的章节分析过, 如果当前nioeventloop是work线程的话, 那么这里就是op_read事件, 也就是读事件, 表示客户端发来了数据流

这里会调用unsafe的redis()方法进行读取

如果是work线程, 那么这里的channel是nioserversocketchannel, 其绑定的unsafe是niobyteunsafe, 这里会走进niobyteunsafe的read()方法中:

public final void read() {
        final channelconfig config = config();
        final channelpipeline pipeline = pipeline();
        final bytebufallocator allocator = config.getallocator();
        final recvbytebufallocator.handle allochandle = recvbufallochandle();
        allochandle.reset(config);
        bytebuf bytebuf = null;
        boolean close = false;
        try {
            do {
                bytebuf = allochandle.allocate(allocator);
                allochandle.lastbytesread(doreadbytes(bytebuf));
                if (allochandle.lastbytesread() <= 0) {
                    bytebuf.release();
                    bytebuf = null;
                    close = allochandle.lastbytesread() < 0;
                    break;
                }
                allochandle.incmessagesread(1);
                readpending = false;
                pipeline.firechannelread(bytebuf);
                bytebuf = null;
            } while (allochandle.continuereading());

            allochandle.readcomplete();
            pipeline.firechannelreadcomplete();

            if (close) {
                closeonread(pipeline);
            }
        } catch (throwable t) {
            handlereadexception(pipeline, bytebuf, t, close, allochandle);
        } finally {
            if (!readpending && !config.isautoread()) {
                removereadop();
            }
        }
    }
}

首先获取socketchannel的config, pipeline等相关属性

 final bytebufallocator allocator = config.getallocator(); 这一步是获取一个bytebuf的内存分配器, 用于分配bytebuf

这里会走到defaultchannelconfig的getallocator方法中

public bytebufallocator getallocator() {
    return allocator;
}

这里返回的defualtchannelconfig的成员变量, 我们看这个成员变量:

private volatile bytebufallocator allocator = bytebufallocator.default;

这里调用bytebufallocator的属性default, 跟进去:

bytebufallocator default = bytebufutil.default_allocator;

我们看到这里又调用了bytebufutil的静态属性default_allocator, 再跟进去:

static final bytebufallocator default_allocator;

default_allocator这个属性是在static块中初始化的

我们跟到static块中

static {
    string alloctype = systempropertyutil.get(
            "io.netty.allocator.type", platformdependent.isandroid() ? "unpooled" : "pooled");
    alloctype = alloctype.tolowercase(locale.us).trim();

    bytebufallocator alloc;
    if ("unpooled".equals(alloctype)) {
        alloc = unpooledbytebufallocator.default;
        logger.debug("-dio.netty.allocator.type: {}", alloctype);
    } else if ("pooled".equals(alloctype)) {
        alloc = pooledbytebufallocator.default;
        logger.debug("-dio.netty.allocator.type: {}", alloctype);
    } else {
        alloc = pooledbytebufallocator.default;
        logger.debug("-dio.netty.allocator.type: pooled (unknown: {})", alloctype);
    }
    default_allocator = alloc;
    //代码省略
}

首先判断运行环境是不是安卓, 如果不是安卓, 在返回"pooled"字符串保存在alloctype中

然后通过if判断, 最后局部变量alloc = pooledbytebufallocator.default, 最后将alloc赋值到成员变量default_allocator

我们跟到pooledbytebufallocator的default属性中:

public static final pooledbytebufallocator default =
        new pooledbytebufallocator(platformdependent.directbufferpreferred());

我们看到这里直接通过new的方式, 创建了一个pooledbytebufallocator对象, 也就是基于申请一块连续内存进行缓冲区分配的缓冲区分配器

缓冲区分配器的知识, 我们之前小节进行了详细的剖析, 这里就不再赘述

回到niobyteunsafe的read()方法中

public final void read() {
        final channelconfig config = config();
        final channelpipeline pipeline = pipeline();
        final bytebufallocator allocator = config.getallocator();
        final recvbytebufallocator.handle allochandle = recvbufallochandle();
        allochandle.reset(config);

        bytebuf bytebuf = null;
        boolean close = false;
        try {
            do {
                bytebuf = allochandle.allocate(allocator);
                allochandle.lastbytesread(doreadbytes(bytebuf));
                if (allochandle.lastbytesread() <= 0) {
                    bytebuf.release();
                    bytebuf = null;
                    close = allochandle.lastbytesread() < 0;
                    break;
                }

                allochandle.incmessagesread(1);
                readpending = false;
                pipeline.firechannelread(bytebuf);
                bytebuf = null;
            } while (allochandle.continuereading());

            allochandle.readcomplete();
            pipeline.firechannelreadcomplete();

            if (close) {
                closeonread(pipeline);
            }
        } catch (throwable t) {
            handlereadexception(pipeline, bytebuf, t, close, allochandle);
        } finally {
            if (!readpending && !config.isautoread()) {
                removereadop();
            }
        }
    }
}

这里 bytebufallocator allocator = config.getallocator()中的allocator , 就是pooledbytebufallocator

 final recvbytebufallocator.handle allochandle = recvbufallochandle()  是创建一个handle, 我们之前的章节讲过, handle是对recvbytebufallocator进行实际操作的对象

我们跟进recvbufallochandle

public recvbytebufallocator.handle recvbufallochandle() {
    //如果不存在, 则创建一个handle的实例
    if (recvhandle == null) {
        recvhandle = config().getrecvbytebufallocator().newhandle();
    }
    return recvhandle;
}

这里是我们之前剖析过的逻辑, 如果不存在, 则创建handle的实例, 具体创建过程我们可以回顾第三章的第二小节, 这里就不再赘述

同样allochandle.reset(config)是将配置重置, 第三章的第二小节也对其进行过剖析

重置完配置之后, 进行do-while循环, 有关循环终止条件allochandle.continuereading(), 之前小节也有过详细剖析, 这里也不再赘述

在do-while循环中, 首先看 bytebuf = allochandle.allocate(allocator) 这一步, 这里传入了刚才创建的allocate对象, 也就是pooledbytebufallocator:

这里会跑到defaultmaxmessagesrecvbytebufallocator类的allocate方法中:

public bytebuf allocate(bytebufallocator alloc) {
    return alloc.iobuffer(guess());
}

这里的guess方法, 会调用adaptiverecvbytebufallocator的guess方法:

public int guess() {
    return nextreceivebuffersize;
}

这里会返回adaptiverecvbytebufallocator的成员变量nextreceivebuffersize, 也就是下次所分配缓冲区的大小, 根据我们之前学习的内容, 第一次分配的时候会分配初始大小, 也就是1024字节

回到defaultmaxmessagesrecvbytebufallocator类的allocate方法中:

这样, alloc.iobuffer(guess())就会分配一个pooledbytebuf

我们跟到abstractbytebufallocator的iobuffer方法中:

public bytebuf iobuffer(int initialcapacity) {
    if (platformdependent.hasunsafe()) {
        return directbuffer(initialcapacity);
    }
    return heapbuffer(initialcapacity);
}

这里首先判断是否能获取jdk的unsafe对象, 默认为true, 所以会走到directbuffer(initialcapacity)中, 这里最终会分配一个pooledunsafedirectbytebuf对象, 具体分配流程我们再之前小节做过详细剖析

回到niobyteunsafe的read()方法中:

分配完了bytebuf之后, 再看这一步allochandle.lastbytesread(doreadbytes(bytebuf)):

首先看参数doreadbytes(bytebuf)方法, 这步是将channel中的数据读取到我们刚分配的bytebuf中, 并返回读取到的字节数

这里会调用到niosocketchannel的doreadbytes方法:

protected int doreadbytes(bytebuf bytebuf) throws exception {
    final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle();
    allochandle.attemptedbytesread(bytebuf.writablebytes());
    return bytebuf.writebytes(javachannel(), allochandle.attemptedbytesread());
}

首先拿到绑定在channel中的handler, 因为我们已经创建了handle, 所以这里会直接拿到

再看allochandle.attemptedbytesread(bytebuf.writablebytes())这步, bytebuf.writablebytes()返回bytebuf的可写字节数, 也就是最多能从channel中读取多少字节写到bytebuf, allocate的attemptedbytesread会把可写字节数设置到defaultmaxmessagesrecvbytebufallocator 类的attemptedbytesread属性中

跟到defaultmaxmessagesrecvbytebufallocator中的attemptedbytesread我们会看到:

public void attemptedbytesread(int bytes) {
    attemptedbytesread = bytes;
}

继续看doreadbytes方法

最后, 通过bytebuf.writebytes(javachannel(), allochandle.attemptedbytesread())将jdk底层的channel中的数据写入到我们创建的bytebuf中, 并返回实际写入的字节数

回到niobyteunsafe的read()方法中:

继续看allochandle.lastbytesread(doreadbytes(bytebuf))这步

刚才我们剖析过doreadbytes(bytebuf)返回的是世界写入bytebuf的字节数

再看lastbytesread方法, 跟到defaultmaxmessagesrecvbytebufallocator的lastbytesread方法中:

public final void lastbytesread(int bytes) {
    lastbytesread = bytes;
    totalbytesread += bytes;
    if (totalbytesread < 0) {
        totalbytesread = integer.max_value;
    }
}

这里会赋值两个属性, lastbytesread代表最后读取的字节数, 这里赋值为我们刚才写入bytebuf的字节数, totalbytesread表示总共读取的字节数, 这里将写入的字节数追加

继续看niobyteunsafe的read()方法:

如果最后一次读取数据为0, 说明已经将channel中的数据全部读取完毕, 将新创建的bytebuf释放循环利用, 并跳出循环

allochandle.incmessagesread(1)这步是增加消息的读取次数, 因为我们循环最多16次, 所以当增加消息次数增加到16会结束循环

读取完毕之后, 会通过pipeline.firechannelread(bytebuf)将传递channelread事件, 有关channelread事件, 我们在第四章也进行了详细的剖析

这里读者会有疑问, 如果一次读取不完, 就传递channelread事件, 那么server接收到的数据有可能就是不完整的, 其实关于这点, netty也做了相应的处理, 我们会在之后的章节详细剖析netty的半包处理机制

循环结束后, 会执行到allochandle.readcomplete()这一步

我们知道第一次分配bytebuf的初始容量是1024, 但是初始容量不一定一定满足所有的业务场景, netty中, 将每次读取数据的字节数进行记录, 然后之后次分配bytebuf的时候, 容量会尽可能的符合业务场景所需要大小, 具体实现方式, 就是在readcomplete()这一步体现的

我们跟到adaptiverecvbytebufallocator的readcomplete()方法中:

public void readcomplete() {
    record(totalbytesread());
}

这里调用了record方法, 并且传入了这一次所读取的字节总数

跟到record方法中

private void record(int actualreadbytes) { 
    if (actualreadbytes <= size_table[math.max(0, index - index_decrement - 1)]) { 
        if (decreasenow) { 
            index = math.max(index - index_decrement, minindex); 
            nextreceivebuffersize = size_table[index];
            decreasenow = false;
        } else {
            decreasenow = true;
        }
    } else if (actualreadbytes >= nextreceivebuffersize) { 
        index = math.min(index + index_increment, maxindex); 
        nextreceivebuffersize = size_table[index];
        decreasenow = false;
    }
}

首先看判断条件 if (actualreadbytes <= size_table[math.max(0, index - index_decrement - 1)]) 

这里index是当前分配的缓冲区大小所在的size_table中的索引, 将这个索引进行缩进, 然后根据缩进后的所以找出size_table中所存储的内存值, 再判断是否大于等于这次读取的最大字节数, 如果条件成立, 说明分配的内存过大, 需要缩容操作, 我们看if块中缩容相关的逻辑

首先 if (decreasenow) 会判断是否立刻进行收缩操作, 通常第一次不会进行收缩操作, 然后会将decreasenow设置为true, 代表下一次直接进行收缩操作

假设需要立刻进行收缩操作, 我们看收缩操作的相关逻辑:

 index = math.max(index - index_decrement, minindex) 这一步将索引缩进一步, 但不能小于最小索引值

然后通过 nextreceivebuffersize = size_table[index] 获取设置索引之后的内存, 赋值在nextreceivebuffersize, 也就是下次需要分配的大小, 下次就会根据这个大小分配bytebuf了, 这样就实现了缩容操作

再看 else&nbsp;if (actualreadbytes >= nextreceivebuffersize) 

这里判断这次读取字节的总量比上次分配的大小还要大, 则进行扩容操作

扩容操作也很简单, 索引步进, 然后拿到步进后的索引所对应的内存值, 作为下次所需要分配的大小

再niobyteunsafe的read()方法中:

经过了缩容或者扩容操作之后, 通过pipeline.firechannelreadcomplete()传播channelreadcomplete()事件

以上就是读取客户端消息的相关流程

章节总结

        本章主要剖析了bytebuf的基本操作以及缓冲区分配等相关知识.

        缓冲区分配, 分为通过调用jdk的api的方式和分配一块连续内存的方式

        其中, 通过分配连续内存的方式分配缓冲区中, 又介绍了在page级别分配的逻辑和在subpage级别分配的逻辑

        page级别分配时通过操作内存二叉树的方式记录分配情况

        subpage级别分配是通过位图的方式记录分配情况

        最后介绍了niosocketchannel处理读事件的相关逻辑

        总体来说, 这一章的内容难度是比较大的, 希望同学课后通过多调试的方式进行熟练掌握

看完文章,还可以用支付宝扫描下面的二维码领取一个支付宝红包,目前可领1-88元不等

支付宝红包二维码

除了扫码可以领取之外,大家还可以(复制 720087999 打开✔支付宝✔去搜索, h`o`n.g.包哪里来,动动手指就能领)。

看下图所示是好多参与这次活动领取红包的朋友:

支付宝红包

扫描二维码推送至手机访问。

版权声明:本文由34楼发布,如需转载请注明出处。

本文链接:https://www.34l.com/post/11242.html

分享给朋友:

相关文章

什么是开关电源?什么是线性电源?两者有什么不同?
什么是开关电源?什么是线性电源?两者有什么不同?

本文将和大家介绍下什么是开关电源?什么是线性电源?对于两者有哪些方面不同不是很了解的朋友,可以好好看下,希望对大家有所帮助。1、开关电源开关电源是相对线性电源说的。他输入端直接将交流电整流变成直流电,再在高频震荡电路的作用下,用开关管控制电...

调整心态温暖哲理经典语录,看这些语录能够调整心态
调整心态温暖哲理经典语录,看这些语录能够调整心态

取得成功是他人不成功时仍在坚持不懈。若自身不作出一点外貌,别人想拉你一把都不知道你的手在哪儿。直至你不会再找我聊,直至你找不着我,直至最终,你一直在某一瞬间猛地想起我。但是,那个时候,被你弄丢的我就确实早已没有了,也再不想要你再找回家了。理...

工具人是什么意思,工具人是什么梗?
工具人是什么意思,工具人是什么梗?

最近网络上面出现比较多的一个词语就是“工具人”,那么,什么样的人会将他形容成是工具呢?这个是让网友们觉得比较的好奇的,主要是指的什么?在微博上面经常看到有网络语工具人这样的表达,所以引起了很多网友的关注,想要了解这个词语,那么工具人是什么意...

引流文案微信推广(微商引流推广文案模板)
引流文案微信推广(微商引流推广文案模板)

大家好啊!今天又跟大家分享小技巧啦~往下看↓↓↓ 首先说一下什么样的文案是引流型的?实际上,一句话是将公共域流量定向到您的私有域流量池。其目的是先引流然后慢慢进行信任激活变现。 在标题方面,通常有以下几种类型,今天为大家详细描述一下。...

lenovo手机网上哪里买(联想旗舰店官网商城)
lenovo手机网上哪里买(联想旗舰店官网商城)

昨晚联想拯救者电竞手机 2 Pro 正式发布,搭载骁龙 888 旗舰芯片、八指操控体系,配备 6.92 英寸 AMOLED 144Hz 三星定制电竞无孔屏幕,5500mAh 容量电池,堪称 “堆料狂魔”,这款手机于今日 10:00 正式开售...

闪客快跑2背景音乐(闪客快打andylaw的微博)
闪客快跑2背景音乐(闪客快打andylaw的微博)

《疯狂跑酷》是一款LowPoly(低多边形)画风的跑酷游戏,一场突如其来的大水淹没了城市,而你在游戏中扮演一名刚下班的男子,需要从被水淹没的城市中逃出生天。和常见的跑酷游戏不同,在本作中,跑酷的场景完全立体化,不再是固定一条直达终点的路线。...