Netty分布式ByteBuf使用SocketChannel读取数据过程剖析
我们第三章分析过客户端接入的流程, 这一小节带大家剖析客户端发送数据, server读取数据的流程:
首先温馨提示, 这一小节高度耦合第三章的第1, 2节的内容, 很多知识这里并不会重复讲解, 如果对之前的知识印象不深刻建议恶补第三章的第1, 2节的内容之后再学习这一小节
传送门:
初始化niosockectchannelconfig
server读取数据的流程
我们首先看nioeventloop的processselectedkey方法
private void processselectedkey(selectionkey k, abstractniochannel ch) { //获取到channel中的unsafe final abstractniochannel.niounsafe unsafe = ch.unsafe(); //如果这个key不是合法的, 说明这个channel可能有问题 if (!k.isvalid()) { //代码省略 } try { //如果是合法的, 拿到key的io事件 int readyops = k.readyops(); //链接事件 if ((readyops & selectionkey.op_connect) != 0) { int ops = k.interestops(); ops &= ~selectionkey.op_connect; k.interestops(ops); unsafe.finishconnect(); } //写事件 if ((readyops & selectionkey.op_write) != 0) { ch.unsafe().forceflush(); } //读事件和接受链接事件 //如果当前nioeventloop是work线程的话, 这里就是op_read事件 //如果是当前nioeventloop是boss线程的话, 这里就是op_accept事件 if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0) { unsafe.read(); if (!ch.isopen()) { return; } } } catch (cancelledkeyexception ignored) { unsafe.close(unsafe.voidpromise()); } }
if ((readyops & (selectionkey.op_read | selectionkey.op_accept)) != 0 || readyops == 0)
这里的判断表示轮询到大事件是op_read或者op_accept事件
之前的章节分析过, 如果当前nioeventloop是work线程的话, 那么这里就是op_read事件, 也就是读事件, 表示客户端发来了数据流
这里会调用unsafe的redis()方法进行读取
如果是work线程, 那么这里的channel是nioserversocketchannel, 其绑定的unsafe是niobyteunsafe, 这里会走进niobyteunsafe的read()方法中:
public final void read() { final channelconfig config = config(); final channelpipeline pipeline = pipeline(); final bytebufallocator allocator = config.getallocator(); final recvbytebufallocator.handle allochandle = recvbufallochandle(); allochandle.reset(config); bytebuf bytebuf = null; boolean close = false; try { do { bytebuf = allochandle.allocate(allocator); allochandle.lastbytesread(doreadbytes(bytebuf)); if (allochandle.lastbytesread() <= 0) { bytebuf.release(); bytebuf = null; close = allochandle.lastbytesread() < 0; break; } allochandle.incmessagesread(1); readpending = false; pipeline.firechannelread(bytebuf); bytebuf = null; } while (allochandle.continuereading()); allochandle.readcomplete(); pipeline.firechannelreadcomplete(); if (close) { closeonread(pipeline); } } catch (throwable t) { handlereadexception(pipeline, bytebuf, t, close, allochandle); } finally { if (!readpending && !config.isautoread()) { removereadop(); } } } }
首先获取socketchannel的config, pipeline等相关属性
final bytebufallocator allocator = config.getallocator(); 这一步是获取一个bytebuf的内存分配器, 用于分配bytebuf
这里会走到defaultchannelconfig的getallocator方法中
public bytebufallocator getallocator() { return allocator; }
这里返回的defualtchannelconfig的成员变量, 我们看这个成员变量:
private volatile bytebufallocator allocator = bytebufallocator.default;
这里调用bytebufallocator的属性default, 跟进去:
bytebufallocator default = bytebufutil.default_allocator;
我们看到这里又调用了bytebufutil的静态属性default_allocator, 再跟进去:
static final bytebufallocator default_allocator;
default_allocator这个属性是在static块中初始化的
我们跟到static块中
static { string alloctype = systempropertyutil.get( "io.netty.allocator.type", platformdependent.isandroid() ? "unpooled" : "pooled"); alloctype = alloctype.tolowercase(locale.us).trim(); bytebufallocator alloc; if ("unpooled".equals(alloctype)) { alloc = unpooledbytebufallocator.default; logger.debug("-dio.netty.allocator.type: {}", alloctype); } else if ("pooled".equals(alloctype)) { alloc = pooledbytebufallocator.default; logger.debug("-dio.netty.allocator.type: {}", alloctype); } else { alloc = pooledbytebufallocator.default; logger.debug("-dio.netty.allocator.type: pooled (unknown: {})", alloctype); } default_allocator = alloc; //代码省略 }
首先判断运行环境是不是安卓, 如果不是安卓, 在返回"pooled"字符串保存在alloctype中
然后通过if判断, 最后局部变量alloc = pooledbytebufallocator.default, 最后将alloc赋值到成员变量default_allocator
我们跟到pooledbytebufallocator的default属性中:
public static final pooledbytebufallocator default = new pooledbytebufallocator(platformdependent.directbufferpreferred());
我们看到这里直接通过new的方式, 创建了一个pooledbytebufallocator对象, 也就是基于申请一块连续内存进行缓冲区分配的缓冲区分配器
缓冲区分配器的知识, 我们之前小节进行了详细的剖析, 这里就不再赘述
回到niobyteunsafe的read()方法中
public final void read() { final channelconfig config = config(); final channelpipeline pipeline = pipeline(); final bytebufallocator allocator = config.getallocator(); final recvbytebufallocator.handle allochandle = recvbufallochandle(); allochandle.reset(config); bytebuf bytebuf = null; boolean close = false; try { do { bytebuf = allochandle.allocate(allocator); allochandle.lastbytesread(doreadbytes(bytebuf)); if (allochandle.lastbytesread() <= 0) { bytebuf.release(); bytebuf = null; close = allochandle.lastbytesread() < 0; break; } allochandle.incmessagesread(1); readpending = false; pipeline.firechannelread(bytebuf); bytebuf = null; } while (allochandle.continuereading()); allochandle.readcomplete(); pipeline.firechannelreadcomplete(); if (close) { closeonread(pipeline); } } catch (throwable t) { handlereadexception(pipeline, bytebuf, t, close, allochandle); } finally { if (!readpending && !config.isautoread()) { removereadop(); } } } }
这里 bytebufallocator allocator = config.getallocator()中的allocator , 就是pooledbytebufallocator
final recvbytebufallocator.handle allochandle = recvbufallochandle() 是创建一个handle, 我们之前的章节讲过, handle是对recvbytebufallocator进行实际操作的对象
我们跟进recvbufallochandle
public recvbytebufallocator.handle recvbufallochandle() { //如果不存在, 则创建一个handle的实例 if (recvhandle == null) { recvhandle = config().getrecvbytebufallocator().newhandle(); } return recvhandle; }
这里是我们之前剖析过的逻辑, 如果不存在, 则创建handle的实例, 具体创建过程我们可以回顾第三章的第二小节, 这里就不再赘述
同样allochandle.reset(config)是将配置重置, 第三章的第二小节也对其进行过剖析
重置完配置之后, 进行do-while循环, 有关循环终止条件allochandle.continuereading(), 之前小节也有过详细剖析, 这里也不再赘述
在do-while循环中, 首先看 bytebuf = allochandle.allocate(allocator) 这一步, 这里传入了刚才创建的allocate对象, 也就是pooledbytebufallocator:
这里会跑到defaultmaxmessagesrecvbytebufallocator类的allocate方法中:
public bytebuf allocate(bytebufallocator alloc) { return alloc.iobuffer(guess()); }
这里的guess方法, 会调用adaptiverecvbytebufallocator的guess方法:
public int guess() { return nextreceivebuffersize; }
这里会返回adaptiverecvbytebufallocator的成员变量nextreceivebuffersize, 也就是下次所分配缓冲区的大小, 根据我们之前学习的内容, 第一次分配的时候会分配初始大小, 也就是1024字节
回到defaultmaxmessagesrecvbytebufallocator类的allocate方法中:
这样, alloc.iobuffer(guess())就会分配一个pooledbytebuf
我们跟到abstractbytebufallocator的iobuffer方法中:
public bytebuf iobuffer(int initialcapacity) { if (platformdependent.hasunsafe()) { return directbuffer(initialcapacity); } return heapbuffer(initialcapacity); }
这里首先判断是否能获取jdk的unsafe对象, 默认为true, 所以会走到directbuffer(initialcapacity)中, 这里最终会分配一个pooledunsafedirectbytebuf对象, 具体分配流程我们再之前小节做过详细剖析
回到niobyteunsafe的read()方法中:
分配完了bytebuf之后, 再看这一步allochandle.lastbytesread(doreadbytes(bytebuf)):
首先看参数doreadbytes(bytebuf)方法, 这步是将channel中的数据读取到我们刚分配的bytebuf中, 并返回读取到的字节数
这里会调用到niosocketchannel的doreadbytes方法:
protected int doreadbytes(bytebuf bytebuf) throws exception { final recvbytebufallocator.handle allochandle = unsafe().recvbufallochandle(); allochandle.attemptedbytesread(bytebuf.writablebytes()); return bytebuf.writebytes(javachannel(), allochandle.attemptedbytesread()); }
首先拿到绑定在channel中的handler, 因为我们已经创建了handle, 所以这里会直接拿到
再看allochandle.attemptedbytesread(bytebuf.writablebytes())这步, bytebuf.writablebytes()返回bytebuf的可写字节数, 也就是最多能从channel中读取多少字节写到bytebuf, allocate的attemptedbytesread会把可写字节数设置到defaultmaxmessagesrecvbytebufallocator 类的attemptedbytesread属性中
跟到defaultmaxmessagesrecvbytebufallocator中的attemptedbytesread我们会看到:
public void attemptedbytesread(int bytes) { attemptedbytesread = bytes; }
继续看doreadbytes方法
最后, 通过bytebuf.writebytes(javachannel(), allochandle.attemptedbytesread())将jdk底层的channel中的数据写入到我们创建的bytebuf中, 并返回实际写入的字节数
回到niobyteunsafe的read()方法中:
继续看allochandle.lastbytesread(doreadbytes(bytebuf))这步
刚才我们剖析过doreadbytes(bytebuf)返回的是世界写入bytebuf的字节数
再看lastbytesread方法, 跟到defaultmaxmessagesrecvbytebufallocator的lastbytesread方法中:
public final void lastbytesread(int bytes) { lastbytesread = bytes; totalbytesread += bytes; if (totalbytesread < 0) { totalbytesread = integer.max_value; } }
这里会赋值两个属性, lastbytesread代表最后读取的字节数, 这里赋值为我们刚才写入bytebuf的字节数, totalbytesread表示总共读取的字节数, 这里将写入的字节数追加
继续看niobyteunsafe的read()方法:
如果最后一次读取数据为0, 说明已经将channel中的数据全部读取完毕, 将新创建的bytebuf释放循环利用, 并跳出循环
allochandle.incmessagesread(1)这步是增加消息的读取次数, 因为我们循环最多16次, 所以当增加消息次数增加到16会结束循环
读取完毕之后, 会通过pipeline.firechannelread(bytebuf)将传递channelread事件, 有关channelread事件, 我们在第四章也进行了详细的剖析
这里读者会有疑问, 如果一次读取不完, 就传递channelread事件, 那么server接收到的数据有可能就是不完整的, 其实关于这点, netty也做了相应的处理, 我们会在之后的章节详细剖析netty的半包处理机制
循环结束后, 会执行到allochandle.readcomplete()这一步
我们知道第一次分配bytebuf的初始容量是1024, 但是初始容量不一定一定满足所有的业务场景, netty中, 将每次读取数据的字节数进行记录, 然后之后次分配bytebuf的时候, 容量会尽可能的符合业务场景所需要大小, 具体实现方式, 就是在readcomplete()这一步体现的
我们跟到adaptiverecvbytebufallocator的readcomplete()方法中:
public void readcomplete() { record(totalbytesread()); }
这里调用了record方法, 并且传入了这一次所读取的字节总数
跟到record方法中
private void record(int actualreadbytes) { if (actualreadbytes <= size_table[math.max(0, index - index_decrement - 1)]) { if (decreasenow) { index = math.max(index - index_decrement, minindex); nextreceivebuffersize = size_table[index]; decreasenow = false; } else { decreasenow = true; } } else if (actualreadbytes >= nextreceivebuffersize) { index = math.min(index + index_increment, maxindex); nextreceivebuffersize = size_table[index]; decreasenow = false; } }
首先看判断条件 if (actualreadbytes <= size_table[math.max(0, index - index_decrement - 1)])
这里index是当前分配的缓冲区大小所在的size_table中的索引, 将这个索引进行缩进, 然后根据缩进后的所以找出size_table中所存储的内存值, 再判断是否大于等于这次读取的最大字节数, 如果条件成立, 说明分配的内存过大, 需要缩容操作, 我们看if块中缩容相关的逻辑
首先 if (decreasenow) 会判断是否立刻进行收缩操作, 通常第一次不会进行收缩操作, 然后会将decreasenow设置为true, 代表下一次直接进行收缩操作
假设需要立刻进行收缩操作, 我们看收缩操作的相关逻辑:
index = math.max(index - index_decrement, minindex) 这一步将索引缩进一步, 但不能小于最小索引值
然后通过 nextreceivebuffersize = size_table[index] 获取设置索引之后的内存, 赋值在nextreceivebuffersize, 也就是下次需要分配的大小, 下次就会根据这个大小分配bytebuf了, 这样就实现了缩容操作
再看 else if (actualreadbytes >= nextreceivebuffersize)
这里判断这次读取字节的总量比上次分配的大小还要大, 则进行扩容操作
扩容操作也很简单, 索引步进, 然后拿到步进后的索引所对应的内存值, 作为下次所需要分配的大小
再niobyteunsafe的read()方法中:
经过了缩容或者扩容操作之后, 通过pipeline.firechannelreadcomplete()传播channelreadcomplete()事件
以上就是读取客户端消息的相关流程
章节总结
本章主要剖析了bytebuf的基本操作以及缓冲区分配等相关知识.
缓冲区分配, 分为通过调用jdk的api的方式和分配一块连续内存的方式
其中, 通过分配连续内存的方式分配缓冲区中, 又介绍了在page级别分配的逻辑和在subpage级别分配的逻辑
page级别分配时通过操作内存二叉树的方式记录分配情况
subpage级别分配是通过位图的方式记录分配情况
最后介绍了niosocketchannel处理读事件的相关逻辑
总体来说, 这一章的内容难度是比较大的, 希望同学课后通过多调试的方式进行熟练掌握
看完文章,还可以用支付宝扫描下面的二维码领取一个支付宝红包,目前可领1-88元不等
除了扫码可以领取之外,大家还可以(复制 720087999 打开✔支付宝✔去搜索, h`o`n.g.包哪里来,动动手指就能领)。
看下图所示是好多参与这次活动领取红包的朋友: