抓到 Netty 一个 Bug,顺带来透彻地聊一下 Netty 是如何高效接收网络连接的()

  本篇文章为你整理了抓到 Netty 一个 Bug,顺带来透彻地聊一下 Netty 是如何高效接收网络连接的()的详细内容,包含有 抓到 Netty 一个 Bug,顺带来透彻地聊一下 Netty 是如何高效接收网络连接的,希望能帮助你了解 抓到 Netty 一个 Bug,顺带来透彻地聊一下 Netty 是如何高效接收网络连接的。

   本文介绍了NioServerSocketChannel处理客户端连接事件的整个过程。接收连接的整个处理框架。影响Netty接收连接吞吐的Bug产生的原因,以及修复的方案。创建并初始化客户端NioSocketChannel。初始化NioSocketChannel中的pipeline。客户端NioSocketChannel向Sub Reactor注册的过程

  
本系列Netty源码解析文章基于 4.1.56.Final版本,大家如果看到图片显示不了的话,可以查看公众号原文

  对于一个高性能网络通讯框架来说,最最重要也是最核心的工作就是如何高效的接收客户端连接,这就好比我们开了一个饭店,那么迎接客人就是饭店最重要的工作,我们要先把客人迎接进来,不能让客人一看人多就走掉,只要客人进来了,哪怕菜做的慢一点也没关系。

  本文笔者就来为大家介绍下netty这块最核心的内容,看看netty是如何高效的接收客户端连接的。

  下图为笔者在一个月黑风高天空显得那么深邃遥远的夜晚,闲来无事,于是捧起Netty关于如何接收连接这部分源码细细品读的时候,意外的发现了一个影响Netty接收连接吞吐的一个Bug。

  于是笔者就在Github提了一个Issue#11708,阐述了下这个Bug产生的原因以及导致的结果并和Netty的作者一起讨论了下修复措施。如上图所示。

  Issue#11708:https://github.com/netty/netty/issues/11708

  这里先不详细解释这个Issue,也不建议大家现在就打开这个Issue查看,笔者会在本文的介绍中随着源码深入的解读慢慢的为大家一层一层地拨开迷雾。

  之所以在文章的开头把这个拎出来,笔者是想让大家带着怀疑,审视,欣赏,崇敬,敬畏的态度来一起品读世界顶级程序员编写的代码。由衷的感谢他们在这一领域做出的贡献。

  好了,问题抛出来后,我们就带着这个疑问来开始本文的内容吧~~~

  按照老规矩,再开始本文的内容之前,我们先来回顾下前边几篇文章的概要内容帮助大家梳理一个框架全貌出来。

  笔者这里再次想和读者朋友们强调的是本文可以独立观看,并不依赖前边系列文章的内容,只是大家如果对相关细节部分感兴趣的话,可以在阅读完本文之后在去回看相关文章。

  在前边的系列文章中,笔者为大家介绍了驱动Netty整个框架运转的核心引擎Reactor的创建,启动,运行的全流程。从现在开始Netty的整个核心框架就开始运转起来开始工作了,本文要介绍的主要内容就是Netty在启动之后要做的第一件事件:监听端口地址,高效接收客户端连接。

  在《聊聊Netty那些事儿之从内核角度看IO模型》一文中,我们是从整个网络框架的基石IO模型的角度整体阐述了下Netty的IO线程模型。

  而Netty中的Reactor正是IO线程在Netty中的模型定义。Reactor在Netty中是以Group的形式出现的,分为:

  
主Reactor线程组也就是我们在启动代码中配置的EventLoopGroup bossGroup,main reactor group中的reactor主要负责监听客户端连接事件,高效的处理客户端连接。也是本文我们要介绍的重点。

  
从Reactor线程组也就是我们在启动代码中配置的EventLoopGroup workerGroup,sub reactor group中的reactor主要负责处理客户端连接上的IO事件,以及异步任务的执行。

  
taskQueue用于保存Reactor需要执行的异步任务,这些异步任务可以由用户在业务线程中向Reactor提交,也可以是Netty框架提交的一些自身核心的任务。

  
scheduledTaskQueue则是保存Reactor中执行的定时任务。代替了原有的时间轮来执行延时任务。

  
tailQueue保存了在Reactor需要执行的一些尾部收尾任务,在普通任务执行完后 Reactor线程会执行尾部任务,比如对Netty 的运行状态做一些统计数据,例如任务循环的耗时、占用物理内存的大小等等

  
在骨架搭建完毕之后,我们随后又在在《详细图解Netty Reactor启动全流程》》一文中介绍了本文的主角服务端NioServerSocketChannel的创建,初始化,绑定端口地址,向main reactor注册监听OP_ACCEPT事件的完整过程。

  main reactor如何处理OP_ACCEPT事件将会是本文的主要内容。

  自此Netty框架的main reactor group已经启动完毕,开始准备监听OP_accept事件,当客户端连接上来之后,OP_ACCEPT事件活跃,main reactor开始处理OP_ACCEPT事件接收客户端连接了。

  而netty中的IO事件分为:OP_ACCEPT事件,OP_READ事件,OP_WRITE事件和OP_CONNECT事件,netty对于IO事件的监听和处理统一封装在Reactor模型中,这四个IO事件的处理过程也是我们后续文章中要单独拿出来介绍的,本文我们聚焦OP_ACCEPT事件的处理。

  而为了让大家能够对IO事件的处理有一个完整性的认识,笔者写了《一文聊透Netty核心引擎Reactor的运转架构》这篇文章,在文章中详细介绍了Reactor线程的整体运行框架。

  Reactor线程会在一个死循环中996不停的运转,在循环中会不断的轮询监听Selector上的IO事件,当IO事件活跃后,Reactor从Selector上被唤醒转去执行IO就绪事件的处理,在这个过程中我们引出了上述四种IO事件的处理入口函数。

  

 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

 

   //获取Channel的底层操作类Unsafe

   final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

   if (!k.isValid()) {

   ......如果SelectionKey已经失效则关闭对应的Channel......

   try {

   //获取IO就绪事件

   int readyOps = k.readyOps();

   //处理Connect事件

   if ((readyOps SelectionKey.OP_CONNECT) != 0) {

   int ops = k.interestOps();

   //移除对Connect事件的监听,否则Selector会一直通知

   ops = ~SelectionKey.OP_CONNECT;

   k.interestOps(ops);

   //触发channelActive事件处理Connect事件

   unsafe.finishConnect();

   //处理Write事件

   if ((readyOps SelectionKey.OP_WRITE) != 0) {

   ch.unsafe().forceFlush();

   //处理Read事件或者Accept事件

   if ((readyOps (SelectionKey.OP_READ SelectionKey.OP_ACCEPT)) != 0 readyOps == 0) {

   unsafe.read();

   } catch (CancelledKeyException ignored) {

   unsafe.close(unsafe.voidPromise());

  

 

  本文笔者将会为大家重点介绍OP_ACCEPT事件的处理入口函数unsafe.read()的整个源码实现。

  当客户端连接完成三次握手之后,main reactor中的selector产生OP_ACCEPT事件活跃,main reactor随即被唤醒,来到了OP_ACCEPT事件的处理入口函数开始接收客户端连接。

  1. Main Reactor处理OP_ACCEPT事件

  当Main Reactor轮询到NioServerSocketChannel上的OP_ACCEPT事件就绪时,Main Reactor线程就会从JDK Selector上的阻塞轮询APIselector.select(timeoutMillis)调用中返回。转而去处理NioServerSocketChannel上的OP_ACCEPT事件。

  

public final class NioEventLoop extends SingleThreadEventLoop {

 

   private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

   final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

   ..............省略.................

   try {

   int readyOps = k.readyOps();

   if ((readyOps SelectionKey.OP_CONNECT) != 0) {

   ..............处理OP_CONNECT事件.................

  
if ((readyOps SelectionKey.OP_WRITE) != 0) {

   ..............处理OP_WRITE事件.................

  
if ((readyOps (SelectionKey.OP_READ SelectionKey.OP_ACCEPT)) != 0 readyOps == 0) {

   //本文重点处理OP_ACCEPT事件

   unsafe.read();

   } catch (CancelledKeyException ignored) {

   unsafe.close(unsafe.voidPromise());

  

 

  
处理IO就绪事件的入口函数processSelectedKey 中的参数AbstractNioChannel ch正是Netty服务端NioServerSocketChannel。因为此时的执行线程为main reactor线程,而main reactor上注册的正是netty服务端NioServerSocketChannel负责监听端口地址,接收客户端连接。

  
通过ch.unsafe()获取到的NioUnsafe操作类正是NioServerSocketChannel中对底层JDK NIO ServerSocketChannel的Unsafe底层操作类。

  
Unsafe接口是Netty对Channel底层操作行为的封装,比如NioServerSocketChannel的底层Unsafe操作类干的事情就是绑定端口地址,处理OP_ACCEPT事件。

  这里我们看到,Netty将OP_ACCEPT事件处理的入口函数封装在NioServerSocketChannel里的底层操作类Unsafe的read方法中。

  而NioServerSocketChannel中的Unsafe操作类实现类型为NioMessageUnsafe定义在上图继承结构中的AbstractNioMessageChannel父类中。

  下面我们到NioMessageUnsafe#read方法中来看下Netty对OP_ACCPET事件的具体处理过程:

  2. 接收客户端连接核心流程框架总览

  我们还是按照老规矩,先从整体上把整个OP_ACCEPT事件的逻辑处理框架提取出来,让大家先总体俯视下流程全貌,然后在针对每个核心点位进行各个击破。

  main reactor线程是在一个do...while{...}循环read loop中不断的调用JDK NIO serverSocketChannel.accept()方法来接收完成三次握手的客户端连接NioSocketChannel的,并将接收到的客户端连接NioSocketChannel临时保存在List Object readBuf集合中,后续会服务端NioServerSocketChannel的pipeline中通过ChannelRead事件来传递,最终会在ServerBootstrapAcceptor这个ChannelHandler中被处理初始化,并将其注册到Sub Reator Group中。

  这里的read loop循环会被限定只能读取16次,当main reactor从NioServerSocketChannel中读取客户端连接NioSocketChannel的次数达到16次之后,无论此时是否还有客户端连接都不能在继续读取了。

  因为我们在《一文聊透Netty核心引擎Reactor的运转架构》一文中提到,netty对reactor线程压榨的比较狠,要干的事情很多,除了要监听轮询IO就绪事件,处理IO就绪事件,还需要执行用户和netty框架本省提交的异步任务和定时任务。

  所以这里的main reactor线程不能在read loop中无限制的执行下去,因为还需要分配时间去执行异步任务,不能因为无限制的接收客户端连接而耽误了异步任务的执行。所以这里将read loop的循环次数限定为16次。

  如果main reactor线程在read loop中读取客户端连接NioSocketChannel的次数已经满了16次,即使此时还有客户端连接未接收,那么main reactor线程也不会再去接收了,而是转去执行异步任务,当异步任务执行完毕后,还会在回来执行剩余接收连接的任务。

  main reactor线程退出read loop循环的条件有两个:

  
从NioServerSocketChannel中读取客户端连接的次数达到了16次,无论此时是否还有客户端连接都需要退出循环。

  
以上就是Netty在接收客户端连接时的整体核心逻辑,下面笔者将这部分逻辑的核心源码实现框架提取出来,方便大家根据上述核心逻辑与源码中的处理模块对应起来,还是那句话,这里只需要总体把握核心处理流程,不需要读懂每一行代码,笔者会在文章的后边分模块来各个击破它们。

  

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {

 

   private final class NioMessageUnsafe extends AbstractNioUnsafe {

   //存放连接建立后,创建的客户端SocketChannel

   private final List Object readBuf = new ArrayList Object

   @Override

   public void read() {

   //必须在Main Reactor线程中执行

   assert eventLoop().inEventLoop();

   //注意下面的config和pipeline都是服务端ServerSocketChannel中的

   final ChannelConfig config = config();

   final ChannelPipeline pipeline = pipeline();

   //创建接收数据Buffer分配器(用于分配容量大小合适的byteBuffer用来容纳接收数据)

   //在接收连接的场景中,这里的allocHandle只是用于控制read loop的循环读取创建连接的次数。

   final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

   allocHandle.reset(config);

   boolean closed = false;

   Throwable exception = null;

   try {

   try {

   do {

   //底层调用NioServerSocketChannel- doReadMessages 创建客户端SocketChannel

   int localRead = doReadMessages(readBuf);

   //已无新的连接可接收则退出read loop

   if (localRead == 0) {

   break;

   if (localRead 0) {

   closed = true;

   break;

   //统计在当前事件循环中已经读取到得Message数量(创建连接的个数)

   allocHandle.incMessagesRead(localRead);

   } while (allocHandle.continueReading());//判断是否已经读满16次

   } catch (Throwable t) {

   exception = t;

   int size = readBuf.size();

   for (int i = 0; i size; i ++) {

   readPending = false;

   //在NioServerSocketChannel对应的pipeline中传播ChannelRead事件

   //初始化客户端SocketChannel,并将其绑定到Sub Reactor线程组中的一个Reactor上

   pipeline.fireChannelRead(readBuf.get(i));

   //清除本次accept 创建的客户端SocketChannel集合

   readBuf.clear();

   allocHandle.readComplete();

   //触发readComplete事件传播

   pipeline.fireChannelReadComplete();

   ....................省略............

   } finally {

   ....................省略............

  

 

  这里首先要通过断言 assert eventLoop().inEventLoop()确保处理接收客户端连接的线程必须为Main Reactor 线程。

  而main reactor中主要注册的是服务端NioServerSocketChannel,主要负责处理OP_ACCEPT事件,所以当前main reactor线程是在NioServerSocketChannel中执行接收连接的工作。

  所以这里我们通过config()获取到的是NioServerSocketChannel的属性配置类NioServerSocketChannelConfig,它是在Reactor的启动阶段被创建出来的。

  

 public NioServerSocketChannel(ServerSocketChannel channel) {

 

   //父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT

   super(null, channel, SelectionKey.OP_ACCEPT);

   //DefaultChannelConfig中设置用于Channel接收数据用的buffer- AdaptiveRecvByteBufAllocator

   config = new NioServerSocketChannelConfig(this, javaChannel().socket());

  

 

  同理这里通过pipeline()获取到的也是NioServerSocketChannel中的pipeline。它会在NioServerSocketChannel向main reactor注册成功之后被初始化。

  前边提到main reactor线程会被限定只能在read loop中向NioServerSocketChannel读取16次客户端连接,所以在开始read loop之前,我们需要创建一个能够保存记录读取次数的对象,在每次read loop循环之后,可以根据这个对象来判断是否结束read loop。

  这个对象就是这里的 RecvByteBufAllocator.Handle allocHandle 专门用于统计read loop中接收客户端连接的次数,以及判断是否该结束read loop转去执行异步任务。

  当这一切准备就绪之后,main reactor线程就开始在do{....}while(...)循环中接收客户端连接了。

  在 read loop中通过调用doReadMessages函数接收完成三次握手的客户端连接,底层会调用到JDK NIO ServerSocketChannel的accept方法,从内核全连接队列中取出客户端连接。

  返回值localRead 表示接收到了多少客户端连接,客户端连接通过accept方法只会一个一个的接收,所以这里的localRead 正常情况下都会返回1,当localRead = 0时意味着已经没有新的客户端连接可以接收了,本次main reactor接收客户端的任务到这里就结束了,跳出read loop。开始新的一轮IO事件的监听处理。

  

 public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {

 

   try {

   return AccessController.doPrivileged(new PrivilegedExceptionAction SocketChannel () {

   @Override

   public SocketChannel run() throws IOException {

   return serverSocketChannel.accept();

   } catch (PrivilegedActionException e) {

   throw (IOException) e.getCause();

  

 

  随后会将接收到的客户端连接占时存放到List Object readBuf集合中。

  

 private final class NioMessageUnsafe extends AbstractNioUnsafe {

 

   //存放连接建立后,创建的客户端SocketChannel

   private final List Object readBuf = new ArrayList Object

  

 

  调用allocHandle.incMessagesRead统计本次事件循环中接收到的客户端连接个数,最后在read loop末尾通过allocHandle.continueReading判断是否达到了限定的16次。从而决定main reactor线程是继续接收客户端连接还是转去执行异步任务。

  main reactor线程退出read loop的两个条件:

  
从NioServerSocketChannel中读取客户端连接的次数达到了16次,无论此时是否还有客户端连接都需要退出循环。

  
当满足以上两个退出条件时,main reactor线程就会退出read loop,由于在read loop中接收到的客户端连接全部暂存在List Object readBuf 集合中,随后开始遍历readBuf,在NioServerSocketChannel的pipeline中传播ChannelRead事件。

  

 int size = readBuf.size();

 

   for (int i = 0; i size; i ++) {

   readPending = false;

   //NioServerSocketChannel对应的pipeline中传播read事件

   //io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead

   //初始化客户端SocketChannel,并将其绑定到Sub Reactor线程组中的一个Reactor上

   pipeline.fireChannelRead(readBuf.get(i));

  

 

  最终pipeline中的ChannelHandler(ServerBootstrapAcceptor)会响应ChannelRead事件,并在相应回调函数中初始化客户端NioSocketChannel,并将其注册到Sub Reactor Group中。此后客户端NioSocketChannel绑定到的sub reactor就开始监听处理客户端连接上的读写事件了。

  Netty整个接收客户端的逻辑过程如下图步骤1,2,3所示。

  以上内容就是笔者提取出来的整体流程框架,下面我们来将其中涉及到的重要核心模块拆开,一个一个详细解读下。

  3. RecvByteBufAllocator简介

  Reactor在处理对应Channel上的IO数据时,都会采用一个ByteBuffer来接收Channel上的IO数据。而本小节要介绍的RecvByteBufAllocator正是用来分配ByteBuffer的一个分配器。

  还记得这个RecvByteBufAllocator 在哪里被创建的吗??

  在《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》一文中,在介绍NioServerSocketChannel的创建过程中提到,对应Channel的配置类NioServerSocketChannelConfig也会随着NioServerSocketChannel的创建而创建。

  

 public NioServerSocketChannel(ServerSocketChannel channel) {

 

   super(null, channel, SelectionKey.OP_ACCEPT);

   config = new NioServerSocketChannelConfig(this, javaChannel().socket());

  

 

  在创建NioServerSocketChannelConfig的过程中会创建RecvByteBufAllocator。

  

 public DefaultChannelConfig(Channel channel) {

 

   this(channel, new AdaptiveRecvByteBufAllocator());

  

 

  这里我们看到NioServerSocketChannel中的RecvByteBufAllocator实际类型为AdaptiveRecvByteBufAllocator,顾名思义,这个类型的RecvByteBufAllocator可以根据Channel上每次到来的IO数据大小来自适应动态调整ByteBuffer的容量。

  对于服务端NioServerSocketChannel来说,它上边的IO数据就是客户端的连接,它的长度和类型都是固定的,所以在接收客户端连接的时候并不需要这样的一个ByteBuffer来接收,我们会将接收到的客户端连接存放在List Object readBuf集合中

  对于客户端NioSocketChannel来说,它上边的IO数据时客户端发送来的网络数据,长度是不定的,所以才会需要这样一个可以根据每次IO数据的大小来自适应动态调整容量的ByteBuffer来接收。

  那么看起来这个RecvByteBufAllocator和本文的主题不是很关联,因为在接收连接的过程中并不会怎么用到它,这个类笔者还会在后面的文章中详细介绍,之所以这里把它拎出来单独介绍是因为它和本文开头提到的Bug有关系,这个Bug就是由这个类引起的。

  3.1 RecvByteBufAllocator.Handle的获取

  在本文中,我们是通过NioServerSocketChannel中的unsafe底层操作类来获取RecvByteBufAllocator.Handle的

  

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

 

  

 

  

protected abstract class AbstractUnsafe implements Unsafe {

 

   @Override

   public RecvByteBufAllocator.Handle recvBufAllocHandle() {

   if (recvHandle == null) {

   recvHandle = config().getRecvByteBufAllocator().newHandle();

   return recvHandle;

  

 

  我们看到最终会在NioServerSocketChannel的配置类NioServerSocketChannelConfig中获取到AdaptiveRecvByteBufAllocator

  

public class DefaultChannelConfig implements ChannelConfig {

 

   //用于Channel接收数据用的buffer分配器 类型为AdaptiveRecvByteBufAllocator

   private volatile RecvByteBufAllocator rcvBufAllocator;

  

 

  AdaptiveRecvByteBufAllocator 中会创建自适应动态调整容量的ByteBuffer分配器。

  

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

 

   @Override

   public Handle newHandle() {

   return new HandleImpl(minIndex, maxIndex, initial);

   private final class HandleImpl extends MaxMessageHandle {

   .................省略................

  

 

  这里的newHandle方法返回的具体类型为MaxMessageHandle ,这个MaxMessageHandle里边保存了每次从Channel中读取IO数据的容量指标,方便下次读取时分配合适大小的buffer。

  每次在使用allocHandle 前需要调用allocHandle.reset(config);重置里边的统计指标。

  

 public abstract class MaxMessageHandle implements ExtendedHandle {

 

   private ChannelConfig config;

   //每次事件轮询时,最多读取16次

   private int maxMessagePerRead;

   //本次事件轮询总共读取的message数,这里指的是接收连接的数量

   private int totalMessages;

   //本次事件轮询总共读取的字节数

   private int totalBytesRead;

   @Override

   public void reset(ChannelConfig config) {

   this.config = config;

   //默认每次最多读取16次

   maxMessagePerRead = maxMessagesPerRead();

   totalMessages = totalBytesRead = 0;

  

 

  maxMessagePerRead:用于控制每次read loop里最大可以循环读取的次数,默认为16次,可在启动配置类ServerBootstrap中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置。

  

ServerBootstrap b = new ServerBootstrap();

 

  b.group(bossGroup, workerGroup)

   .channel(NioServerSocketChannel.class)

   .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)

  

 

  totalMessages:用于统计read loop中总共接收的连接个数,每次read loop循环后会调用allocHandle.incMessagesRead增加记录接收到的连接个数。

  

 @Override

 

   public final void incMessagesRead(int amt) {

   totalMessages += amt;

  

 

  totalBytesRead:用于统计在read loop中总共接收到客户端连接上的数据大小,这个字段主要用于sub reactor在接收客户端NioSocketChannel上的网络数据用的,本文我们介绍的是main reactor接收客户端连接,所以这里并不会用到这个字段。这个字段会在sub reactor每次读取完NioSocketChannel上的网络数据时增加记录。

  

 @Override

 

   public void lastBytesRead(int bytes) {

   lastBytesRead = bytes;

   if (bytes 0) {

   totalBytesRead += bytes;

  

 

  MaxMessageHandler中还有一个非常重要的方法就是在每次read loop末尾会调用allocHandle.continueReading()方法来判断读取连接次数是否已满16次,来决定main reactor线程是否退出循环。

  

 do {

 

   //底层调用NioServerSocketChannel- doReadMessages 创建客户端SocketChannel

   int localRead = doReadMessages(readBuf);

   if (localRead == 0) {

   break;

   if (localRead 0) {

   closed = true;

   break;

   //统计在当前事件循环中已经读取到得Message数量(创建连接的个数)

   allocHandle.incMessagesRead(localRead);

   } while (allocHandle.continueReading());

  

 

  红框中圈出来的两个判断条件和本文主题无关,我们这里不需要关注,笔者会在后面的文章详细介绍。

  
totalMessages maxMessagePerRead:在本文的接收客户端连接场景中,这个条件用于判断main reactor线程在read loop中的读取次数是否超过了16次。如果超过16次就会返回false,main reactor线程退出循环。

  
totalBytesRead 0:用于判断当客户端NioSocketChannel上的OP_READ事件活跃时,sub reactor线程在read loop中是否读取到了网络数据。

  
以上内容就是RecvByteBufAllocator.Handle在接收客户端连接场景下的作用,大家这里仔细看下这个allocHandle.continueReading()方法退出循环的判断条件,再结合整个do{....}while(...)接收连接循环体,感受下是否哪里有些不对劲?Bug即将出现~~~

  4. 啊哈!!Bug ! !

  netty不论是在本文中处理接收客户端连接的场景还是在处理接收客户端连接上的网络数据场景都会在一个do{....}while(...)循环read loop中不断的处理。

  同时也都会利用在上一小节中介绍的RecvByteBufAllocator.Handle来记录每次read loop接收到的连接个数和从连接上读取到的网络数据大小。

  从而在read loop的末尾都会通过allocHandle.continueReading()方法判断是否应该退出read loop循环结束连接的接收流程或者是结束连接上数据的读取流程。

  无论是用于接收客户端连接的main reactor也好还是用于接收客户端连接上的网络数据的sub reactor也好,它们的运行框架都是一样的,只不过是具体分工不同。

  所以netty这里想用统一的RecvByteBufAllocator.Handle来处理以上两种场景。

  而RecvByteBufAllocator.Handle中的totalBytesRead字段主要记录sub reactor线程在处理客户端NioSocketChannel中OP_READ事件活跃时,总共在read loop中读取到的网络数据,而这里是main reactor线程在接收客户端连接所以这个字段并不会被设置。totalBytesRead字段的值在本文中永远会是0。

  所以无论同时有多少个客户端并发连接到服务端上,在接收连接的这个read loop中永远只会接受一个连接就会退出循环,因为allocHandle.continueReading()方法中的判断条件totalBytesRead 0永远会返回false。

  

 do {

 

   //底层调用NioServerSocketChannel- doReadMessages 创建客户端SocketChannel

   int localRead = doReadMessages(readBuf);

   if (localRead == 0) {

   break;

   if (localRead 0) {

   closed = true;

   break;

   //统计在当前事件循环中已经读取到得Message数量(创建连接的个数)

   allocHandle.incMessagesRead(localRead);

   } while (allocHandle.continueReading());

  

 

  而netty的本意是在这个read loop循环中尽可能多的去接收客户端的并发连接,同时又不影响main reactor线程执行异步任务。但是由于这个Bug,main reactor在这个循环中只执行一次就结束了。这也一定程度上就影响了netty的吞吐。

  让我们想象下这样的一个场景,当有16个客户端同时并发连接到了服务端,这时NioServerSocketChannel上的OP_ACCEPT事件活跃,main reactor从Selector上被唤醒,随后执行OP_ACCEPT事件的处理。

  

public final class NioEventLoop extends SingleThreadEventLoop {

 

   @Override

   protected void run() {

   int selectCnt = 0;

   for (;;) {

   try {

   int strategy;

   try {

   strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());

   switch (strategy) {

   case SelectStrategy.CONTINUE:

   ............省略.........

   case SelectStrategy.BUSY_WAIT:

   ............省略.........

   case SelectStrategy.SELECT:

   ............监听轮询IO事件.........

   default:

   } catch (IOException e) {

   ............省略.........

   ............处理IO就绪事件.........

   ............执行异步任务.........

  

 

  但是由于这个Bug的存在,main reactor在接收客户端连接的这个read loop中只接收了一个客户端连接就匆匆返回了。

  

 private final class NioMessageUnsafe extends AbstractNioUnsafe {

 

   do {

   int localRead = doReadMessages(readBuf);

   .........省略...........

   } while (allocHandle.continueReading());

  

 

  然后根据下图中这个Reactor的运行结构去执行异步任务,随后绕一大圈又会回到NioEventLoop#run方法中重新发起一轮OP_ACCEPT事件轮询。

  由于现在还有15个客户端并发连接没有被接收,所以此时Main Reactor线程并不会在selector.select()上阻塞,最终绕一圈又会回到NioMessageUnsafe#read方法的do{.....}while()循环。在接收一个连接之后又退出循环。

  本来我们可以在一次read loop中把这16个并发的客户端连接全部接收完毕的,因为这个Bug,main reactor需要不断的发起OP_ACCEPT事件的轮询,绕了很大一个圈子。同时也增加了许多不必要的selector.select()系统调用开销

  这时大家在看这个Issue#11708中的讨论是不是就清晰很多了~~

  Issue#11708:https://github.com/netty/netty/issues/11708

  4.1 Bug的修复

  笔者在写这篇文章的时候,Netty最新版本是4.1.68.final,这个Bug在4.1.69.final中被修复。

  由于该Bug产生的原因正是因为服务端NioServerSocketChannel(用于监听端口地址和接收客户端连接)和 客户端NioSocketChannel(用于通信)中的Config配置类混用了同一个ByteBuffer分配器AdaptiveRecvByteBufAllocator而导致的。

  所以在新版本修复中专门为服务端ServerSocketChannel中的Config配置类引入了一个新的ByteBuffer分配器ServerChannelRecvByteBufAllocator,专门用于服务端ServerSocketChannel接收客户端连接的场景。

  在ServerChannelRecvByteBufAllocator的父类DefaultMaxMessagesRecvByteBufAllocator中引入了一个新的字段ignoreBytesRead,用于表示是否忽略网络字节的读取,在创建服务端Channel配置类NioServerSocketChannelConfig的时候,这个字段会被赋值为true。

  当main reactor线程在read loop循环中接收客户端连接的时候。

  

 private final class NioMessageUnsafe extends AbstractNioUnsafe {

 

   do {

   int localRead = doReadMessages(readBuf);

   .........省略...........

   } while (allocHandle.continueReading());

  

 

  在read loop循环的末尾就会采用从ServerChannelRecvByteBufAllocator 中创建的MaxMessageHandle#continueReading方法来判断读取连接次数是否超过了16次。由于这里的ignoreBytesRead == true这回我们就会忽略totalBytesRead == 0的情况,从而使得接收连接的read loop得以继续地执行下去。在一个read loop中一次性把16个连接全部接收完毕。

  以上就是对这个Bug产生的原因,以及发现的过程,最后修复的方案一个全面的介绍,因此笔者也出现在了netty 4.1.69.final版本发布公告里的thank-list中。哈哈,真是令人开心的一件事情~~~

  通过以上对netty接收客户端连接的全流程分析和对这个Bug来龙去脉以及修复方案的介绍,大家现在一定已经理解了整个接收连接的流程框架。

  接下来笔者就把这个流程中涉及到的一些核心模块在单独拎出来从细节入手,为大家各个击破~~~

  5. doReadMessages接收客户端连接

  

public class NioServerSocketChannel extends AbstractNioMessageChannel

 

   implements io.netty.channel.socket.ServerSocketChannel {

   @Override

   protected int doReadMessages(List Object buf) throws Exception {

   SocketChannel ch = SocketUtils.accept(javaChannel());

   try {

   if (ch != null) {

   buf.add(new NioSocketChannel(this, ch));

   return 1;

   } catch (Throwable t) {

   logger.warn("Failed to create a new channel from an accepted socket.", t);

   try {

   ch.close();

   } catch (Throwable t2) {

   logger.warn("Failed to close a socket.", t2);

   return 0;

  

 

  通过javaChannel()获取封装在Netty服务端NioServerSocketChannel中的JDK 原生 ServerSocketChannel。

  

 @Override

 

   protected ServerSocketChannel javaChannel() {

   return (ServerSocketChannel) super.javaChannel();

  

 

  通过JDK NIO 原生的ServerSocketChannel的accept方法获取JDK NIO 原生客户端连接SocketChannel。

  

 public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {

 

   try {

   return AccessController.doPrivileged(new PrivilegedExceptionAction SocketChannel () {

   @Override

   public SocketChannel run() throws IOException {

   return serverSocketChannel.accept();

   } catch (PrivilegedActionException e) {

   throw (IOException) e.getCause();

  

 

  这一步就是我们在《聊聊Netty那些事儿之从内核角度看IO模型》介绍到的调用监听Socket的accept方法,内核会基于监听Socket创建出来一个新的Socket专门用于与客户端之间的网络通信这个我们称之为客户端连接Socket。这里的ServerSocketChannel就类似于监听Socket。SocketChannel就类似于客户端连接Socket。

  由于我们在创建NioServerSocketChannel的时候,会将JDK NIO 原生的ServerSocketChannel设置为非阻塞,所以这里当ServerSocketChannel上有客户端连接时就会直接创建SocketChannel,如果此时并没有客户端连接时accept调用就会立刻返回null并不会阻塞。

  

 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {

 

   super(parent);

   this.ch = ch;

   this.readInterestOp = readInterestOp;

   try {

   //设置Channel为非阻塞 配合IO多路复用模型

   ch.configureBlocking(false);

   } catch (IOException e) {

   ..........省略.............

  

 

  5.1 创建客户端NioSocketChannel

  

public class NioServerSocketChannel extends AbstractNioMessageChannel

 

   implements io.netty.channel.socket.ServerSocketChannel {

   @Override

   protected int doReadMessages(List Object buf) throws Exception {

   SocketChannel ch = SocketUtils.accept(javaChannel());

   try {

   if (ch != null) {

   buf.add(new NioSocketChannel(this, ch));

   return 1;

   } catch (Throwable t) {

   .........省略.......

   return 0;

  

 

  这里会根据ServerSocketChannel的accept方法获取到JDK NIO 原生的SocketChannel(用于底层真正与客户端通信的Channel),来创建Netty中的NioSocketChannel。

  

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

 

   public NioSocketChannel(Channel parent, SocketChannel socket) {

   super(parent, socket);

   config = new NioSocketChannelConfig(this, socket.socket());

  

 

  创建客户端NioSocketChannel的过程其实和之前讲的创建服务端NioServerSocketChannel大体流程是一样的,我们这里只对客户端NioSocketChannel和服务端NioServerSocketChannel在创建过程中的不同之处做一个对比。

  具体细节部分大家可以在回看下《详细图解Netty Reactor启动全流程》一文中关于NioServerSocketChannel的创建的详细细节。

  5.3 对比NioSocketChannel与NioServerSocketChannel的不同

  1:Channel的层次不同

  在我们介绍Reactor的创建文章中,我们提到Netty中的Channel是具有层次的。由于客户端NioSocketChannel是在main reactor接收连接时在服务端NioServerSocketChannel中被创建的,所以在创建客户端NioSocketChannel的时候会通过构造函数指定了parent属性为NioServerSocketChanel。并将JDK NIO 原生的SocketChannel封装进Netty的客户端NioSocketChannel中。

  而在Reactor启动过程中创建NioServerSocketChannel的时候parent属性指定是null。因为它就是顶层的Channel,负责创建客户端NioSocketChannel。

  

 public NioServerSocketChannel(ServerSocketChannel channel) {

 

   super(null, channel, SelectionKey.OP_ACCEPT);

   config = new NioServerSocketChannelConfig(this, javaChannel().socket());

  

 

  2:向Reactor注册的IO事件不同

  客户端NioSocketChannel向Sub Reactor注册的是SelectionKey.OP_READ事件,而服务端NioServerSocketChannel向Main Reactor注册的是SelectionKey.OP_ACCEPT事件。

  

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

 

   protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {

   super(parent, ch, SelectionKey.OP_READ);

  public class NioServerSocketChannel extends AbstractNioMessageChannel

   implements io.netty.channel.socket.ServerSocketChannel {

   public NioServerSocketChannel(ServerSocketChannel channel) {

   //父类AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT

   super(null, channel, SelectionKey.OP_ACCEPT);

   //DefaultChannelConfig中设置用于Channel接收数据用的buffer- AdaptiveRecvByteBufAllocator

   config = new NioServerSocketChannelConfig(this, javaChannel().socket());

  

 

  3: 功能属性不同造成继承结构的不同

  客户端NioSocketChannel继承的是AbstractNioByteChannel,而服务端NioServerSocketChannel继承的是AbstractNioMessageChannel。
 

  它们继承的这两个抽象类一个前缀是Byte,一个前缀是Message有什么区别吗??

  客户端NioSocketChannel主要处理的是服务端与客户端的通信,这里涉及到接收客户端发送来的数据,而Sub Reactor线程从NioSocketChannel中读取的正是网络通信数据单位为Byte。

  
服务端NioServerSocketChannel主要负责处理OP_ACCEPT事件,创建用于通信的客户端NioSocketChannel。这时候客户端与服务端还没开始通信,所以Main Reactor线程从NioServerSocketChannel的读取对象为Message。这里的Message指的就是底层的SocketChannel客户端连接。

  
以上就是NioSocketChannel与NioServerSocketChannel创建过程中的不同之处,后面的过程就一样了。

  在AbstractNioChannel 类中封装JDK NIO 原生的SocketChannel,并将其底层的IO模型设置为非阻塞,保存需要监听的IO事件OP_READ。

  

 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {

 

   super(parent);

   this.ch = ch;

   this.readInterestOp = readInterestOp;

   try {

   //设置Channel为非阻塞 配合IO多路复用模型

   ch.configureBlocking(false);

   } catch (IOException e) {

  

 

  为客户端NioSocketChannel创建全局唯一的channelId,创建客户端NioSocketChannel的底层操作类NioByteUnsafe,创建pipeline。

  

 protected AbstractChannel(Channel parent) {

 

   this.parent = parent;

   //channel全局唯一ID machineId+processId+sequence+timestamp+random

   id = newId();

   //unsafe用于底层socket的读写操作

   unsafe = newUnsafe();

   //为channel分配独立的pipeline用于IO事件编排

   pipeline = newChannelPipeline();

  

 

  在NioSocketChannelConfig的创建过程中,将NioSocketChannel的RecvByteBufAllocator类型设置为AdaptiveRecvByteBufAllocator。

  

 public DefaultChannelConfig(Channel channel) {

 

   this(channel, new AdaptiveRecvByteBufAllocator());

  

 

  在Bug修复后的版本中服务端NioServerSocketChannel的RecvByteBufAllocator类型设置为ServerChannelRecvByteBufAllocator

  最终我们得到的客户端NioSocketChannel结构如下:

  6. ChannelRead事件的响应

  在前边介绍接收连接的整体核心流程框架的时候,我们提到main reactor线程是在一个do{.....}while(...)循环read loop中不断的调用ServerSocketChannel#accept方法来接收客户端的连接。

  当满足退出read loop循环的条件有两个:

  
从NioServerSocketChannel中读取客户端连接的次数达到了16次,无论此时是否还有客户端连接都需要退出循环。

  
main reactor就会退出read loop循环,此时接收到的客户端连接NioSocketChannel暂存与List Object readBuf集合中。

  

 

 

   private final class NioMessageUnsafe extends AbstractNioUnsafe {

   private final List Object readBuf = new ArrayList Object

   @Override

   public void read() {

   try {

   try {

   do {

   ........省略.........

   //底层调用NioServerSocketChannel- doReadMessages 创建客户端SocketChannel

   int localRead = doReadMessages(readBuf);

   ........省略.........

   allocHandle.incMessagesRead(localRead);

   } while (allocHandle.continueReading());

   } catch (Throwable t) {

   exception = t;

   int size = readBuf.size();

   for (int i = 0; i size; i ++) {

   readPending = false;

   pipeline.fireChannelRead(readBuf.get(i));

   ........省略.........

   } finally {

   ........省略.........

  

 

  随后main reactor线程会遍历List Object readBuf集合中的NioSocketChannel,并在NioServerSocketChannel的pipeline中传播ChannelRead事件。

  最终ChannelRead事件会传播到ServerBootstrapAcceptor 。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: