一文搞懂 Netty 发送数据全流程(netty发送文件)

  本篇文章为你整理了一文搞懂 Netty 发送数据全流程(netty发送文件)的详细内容,包含有netty客户端发送数据 netty发送文件 netty的write数据发不出去 netty发送udp 一文搞懂 Netty 发送数据全流程,希望能帮助你了解 一文搞懂 Netty 发送数据全流程。

  本文笔者将会为大家一一揭晓这些谜底。我们还是以之前的 EchoServer 为例进行说明。

  

@Sharable

 

  public class EchoServerHandler extends ChannelInboundHandlerAdapter {

   @Override

   public void channelRead(ChannelHandlerContext ctx, Object msg) {

   //此处的msg就是Netty在read loop中从NioSocketChannel中读取到的ByteBuffer

   ctx.write(msg);

  

 

  我们将在《Netty如何高效接收网络数据》一文中读取到的 ByteBuffer (这里的 Object msg),直接发送回给客户端,用这个简单的例子来揭开 Netty 如何发送数据的序幕~~

  在实际开发中,我们首先要通过解码器将读取到的 ByteBuffer 解码转换为我们的业务 Request 类,然后在业务线程中做业务处理,在通过编码器对业务 Response 类编码为 ByteBuffer ,最后利用 ChannelHandlerContext ctx 的引用发送响应数据。

  
本例中用到的 channelHandlerContext.write() 会使 write 事件从当前 ChannelHandler 也就是这里的 EchoServerHandler 开始沿着 pipeline 向前传播。

  
而 channelHandlerContext.channel().write() 则会使 write 事件从 pipeline 的尾结点 TailContext 开始向前传播直到 HeadContext 。

  
 

  而 pipeline 这样一个双向链表数据结构中的类型正是 ChannelHandlerContext ,由 ChannelHandlerContext 包裹我们自定义的 IO 处理逻辑 ChannelHandler。

  ChannelHandler 并不需要感知到它所处的 pipeline 中的上下文信息,只需要专心处理好 IO 逻辑即可,关于 pipeline 的上下文信息全部封装在 ChannelHandlerContext中。

  ChannelHandler 在 Netty 中的作用只是负责处理 IO 逻辑,比如编码,解码。它并不会感知到它在 pipeline 中的位置,更不会感知和它相邻的两个 ChannelHandler。事实上 ChannelHandler也并不需要去关心这些,它唯一需要关注的就是处理所关心的异步事件

  而 ChannelHandlerContext 中维护了 pipeline 这个双向链表中的 pre 以及 next 指针,这样可以方便的找到与其相邻的 ChannelHandler ,并可以过滤出一些符合执行条件的 ChannelHandler。正如它的命名一样, ChannelHandlerContext 正是起到了维护 ChannelHandler 上下文的一个作用。而 Netty 中的异步事件在 pipeline 中的传播靠的就是这个 ChannelHandlerContext 。

  这样设计就使得 ChannelHandlerContext 和 ChannelHandler 的职责单一,各司其职,具有高度的可扩展性。

  2. write事件的传播

  我们无论是在业务线程或者是在 SubReactor 线程中完成业务处理后,都需要通过 channelHandlerContext 的引用将 write事件在 pipeline 中进行传播。然后在 pipeline 中相应的 ChannelHandler 中监听 write 事件从而可以对 write事件进行自定义编排处理(比如我们常用的编码器),最终传播到 HeadContext 中执行发送数据的逻辑操作。

  前边也提到 Netty 中有两个触发 write 事件传播的方法,它们的传播处理逻辑都是一样的,只不过它们在 pipeline 中的传播起点是不同的。

  
channelHandlerContext.write() 方法会从当前 ChannelHandler 开始在 pipeline 中向前传播 write 事件直到 HeadContext。

  
channelHandlerContext.channel().write() 方法则会从 pipeline 的尾结点 TailContext 开始在 pipeline 中向前传播 write 事件直到 HeadContext 。

  
 

  在我们清楚了 write 事件的总体传播流程后,接下来就来看看在 write 事件传播的过程中Netty为我们作了些什么?这里我们以 channelHandlerContext.write() 方法为例说明。

  3. write方法发送数据

  

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

 

   @Override

   public ChannelFuture write(Object msg) {

   return write(msg, newPromise());

   @Override

   public ChannelFuture write(final Object msg, final ChannelPromise promise) {

   write(msg, false, promise);

   return promise;

  

 

  这里我们看到 Netty 的写操作是一个异步操作,当我们在业务线程中调用 channelHandlerContext.write() 后,Netty 会给我们返回一个 ChannelFuture,我们可以在这个 ChannelFutrue 中添加 ChannelFutureListener ,这样当 Netty 将我们要发送的数据发送到底层 Socket 中时,Netty 会通过 ChannelFutureListener 通知我们写入结果。

  

 @Override

 

   public void channelRead(final ChannelHandlerContext ctx, final Object msg) {

   //此处的msg就是Netty在read loop中从NioSocketChannel中读取到的ByteBuffer

   ChannelFuture future = ctx.write(msg);

   future.addListener(new ChannelFutureListener() {

   @Override

   public void operationComplete(ChannelFuture future) throws Exception {

   Throwable cause = future.cause();

   if (cause != null) {

   处理异常情况

   } else {

   写入Socket成功后,Netty会通知到这里

  

 

  当异步事件在 pipeline 传播的过程中发生异常时,异步事件就会停止在 pipeline 中传播。所以我们在日常开发中,需要对写操作异常情况进行处理。

  
其中 inbound 类异步事件发生异常时,会触发exceptionCaught事件传播。
 

  exceptionCaught 事件本身也是一种 inbound 事件,传播方向会从当前发生异常的 ChannelHandler 开始一直向后传播直到 TailContext。

  
而 outbound 类异步事件发生异常时,则不会触发exceptionCaught事件传播。一般只是通知相关 ChannelFuture。但如果是 flush 事件在传播过程中发生异常,则会触发当前发生异常的 ChannelHandler 中 exceptionCaught 事件回调。

  
我们继续回归到写操作的主线上来~~~

  

 private void write(Object msg, boolean flush, ChannelPromise promise) {

 

   ObjectUtil.checkNotNull(msg, "msg");

   ................省略检查promise的有效性...............

   //flush = true 表示channelHandler中调用的是writeAndFlush方法,这里需要找到pipeline中覆盖write或者flush方法的channelHandler

   //flush = false 表示调用的是write方法,只需要找到pipeline中覆盖write方法的channelHandler

   final AbstractChannelHandlerContext next = findContextOutbound(flush ?

   (MASK_WRITE MASK_FLUSH) : MASK_WRITE);

   //用于检查内存泄露

   final Object m = pipeline.touch(msg, next);

   //获取pipeline中下一个要被执行的channelHandler的executor

   EventExecutor executor = next.executor();

   //确保OutBound事件由ChannelHandler指定的executor执行

   if (executor.inEventLoop()) {

   //如果当前线程正是channelHandler指定的executor则直接执行

   if (flush) {

   next.invokeWriteAndFlush(m, promise);

   } else {

   next.invokeWrite(m, promise);

   } else {

   //如果当前线程不是ChannelHandler指定的executor,则封装成异步任务提交给指定executor执行,注意这里的executor不一定是reactor线程。

   final WriteTask task = WriteTask.newInstance(next, m, promise, flush);

   if (!safeExecute(executor, task, promise, m, !flush)) {

   task.cancel();

  

 

  write 事件要向前在 pipeline 中传播,就需要在 pipeline 上找到下一个具有执行资格的 ChannelHandler,因为位于当前 ChannelHandler 前边的可能是 ChannelInboundHandler 类型的也可能是 ChannelOutboundHandler 类型的 ChannelHandler ,或者有可能压根就不关心 write 事件的 ChannelHandler(没有实现write回调方法)。

  这里我们就需要通过 findContextOutbound 方法在当前 ChannelHandler 的前边找到 ChannelOutboundHandler 类型并且覆盖实现 write 回调方法的 ChannelHandler 作为下一个要执行的对象。

  3.1 findContextOutbound

  

 private AbstractChannelHandlerContext findContextOutbound(int mask) {

 

   AbstractChannelHandlerContext ctx = this;

   //获取当前ChannelHandler的executor

   EventExecutor currentExecutor = executor();

   do {

   //获取前一个ChannelHandler

   ctx = ctx.prev;

   } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));

   return ctx;

   //判断前一个ChannelHandler是否具有响应Write事件的资格

   private static boolean skipContext(

   AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {

   return (ctx.executionMask (onlyMask mask)) == 0

   (ctx.executor() == currentExecutor (ctx.executionMask mask) == 0);

  

 

  findContextOutbound 方法接收的参数是一个掩码,这个掩码表示要向前查找具有什么样执行资格的 ChannelHandler。因为我们这里调用的是 ChannelHandlerContext 的 write 方法所以 flush = false,传递进来的掩码为 MASK_WRITE,表示我们要向前查找覆盖实现了 write 回调方法的 ChannelOutboundHandler。

  3.1.1 掩码的巧妙应用

  Netty 中将 ChannelHandler 覆盖实现的一些异步事件回调方法用 int 型的掩码来表示,这样我们就可以通过这个掩码来判断当前 ChannelHandler 具有什么样的执行资格。

  

final class ChannelHandlerMask {

 

   ....................省略......................

   static final int MASK_CHANNEL_ACTIVE = 1 3;

   static final int MASK_CHANNEL_READ = 1 5;

   static final int MASK_CHANNEL_READ_COMPLETE = 1 6;

   static final int MASK_WRITE = 1 15;

   static final int MASK_FLUSH = 1 16;

   //outbound事件掩码集合

   static final int MASK_ONLY_OUTBOUND = MASK_BIND MASK_CONNECT MASK_DISCONNECT

   MASK_CLOSE MASK_DEREGISTER MASK_READ MASK_WRITE MASK_FLUSH;

   ....................省略......................

  

 

  在 ChannelHandler 被添加进 pipeline 的时候,Netty 会根据当前 ChannelHandler 的类型以及其覆盖实现的异步事件回调方法,通过 运算 向 ChannelHandlerContext#executionMask 字段添加该 ChannelHandler 的执行资格。

  

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

 

   //ChannelHandler执行资格掩码

   private final int executionMask;

   ....................省略......................

  

 

  类似的掩码用法其实我们在前边的文章《一文聊透Netty核心引擎Reactor的运转架构》中也提到过,在 Channel 向对应的 Reactor 注册自己感兴趣的 IO 事件时,也是用到了一个 int 型的掩码 interestOps 来表示 Channel 感兴趣的 IO 事件集合。

  

 @Override

 

   protected void doBeginRead() throws Exception {

   final SelectionKey selectionKey = this.selectionKey;

   if (!selectionKey.isValid()) {

   return;

   readPending = true;

   final int interestOps = selectionKey.interestOps();

   * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件

   * 2:SocketChannel 初始化时 readInterestOp设置的是OP_READ事件

   * */

   if ((interestOps readInterestOp) == 0) {

   //注册监听OP_ACCEPT或者OP_READ事件

   selectionKey.interestOps(interestOps readInterestOp);

  

 

  
从事件集合中删除某个事件,是通过先将要删除事件取反 ~ ,然后在和事件集合做 操作:ops = ~SelectionKey.OP_CONNECT

  
这部分内容笔者会在下篇文章全面介绍 pipeline 的时候详细讲解,大家这里只需要知道这里的掩码就是表示一个执行资格的集合。当前 ChannelHandler 的执行资格存放在它的 ChannelHandlerContext 中的 executionMask 字段中。

  3.1.2 向前查找具有执行资格的ChannelOutboundHandler

  

 private AbstractChannelHandlerContext findContextOutbound(int mask) {

 

   //当前ChannelHandler

   AbstractChannelHandlerContext ctx = this;

   //获取当前ChannelHandler的executor

   EventExecutor currentExecutor = executor();

   do {

   //获取前一个ChannelHandler

   ctx = ctx.prev;

   } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));

   return ctx;

   //判断前一个ChannelHandler是否具有响应Write事件的资格

   private static boolean skipContext(

   AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {

   return (ctx.executionMask (onlyMask mask)) == 0

   (ctx.executor() == currentExecutor (ctx.executionMask mask) == 0);

  

 

  前边我们提到 ChannelHandlerContext 不仅封装了 ChannelHandler 的执行资格掩码还可以感知到当前 ChannelHandler 在 pipeline 中的位置,因为 ChannelHandlerContext 中维护了前驱指针 prev 以及后驱指针 next。

  这里我们需要在 pipeline 中传播 write 事件,它是一种 outbound 事件,所以需要向前传播,这里通过 ChannelHandlerContext 的前驱指针 prev 拿到当前 ChannelHandler 在 pipeline 中的前一个节点。

  

ctx = ctx.prev;

 

  

 

  通过 skipContext 方法判断前驱节点是否具有执行的资格。如果没有执行资格则跳过继续向前查找。如果具有执行资格则返回并响应 write 事件。

  在 write 事件传播场景中,执行资格指的是前驱 ChannelHandler 是否是ChannelOutboundHandler 类型的,并且它是否覆盖实现了 write 事件回调方法。

  

 

 

  public class EchoChannelHandler extends ChannelOutboundHandlerAdapter {

   @Override

   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

   super.write(ctx, msg, promise);

  

 

  3.1.3 skipContext

  该方法主要用来判断当前 ChannelHandler 的前驱节点是否具有 mask 掩码中包含的事件响应资格。

  方法参数中有两个比较重要的掩码:

  int onlyMask:用来指定当前 ChannelHandler 需要符合的类型。其中MASK_ONLY_OUTBOUND 为 ChannelOutboundHandler 类型的掩码, MASK_ONLY_INBOUND 为 ChannelInboundHandler 类型的掩码。

  

final class ChannelHandlerMask {

 

   //outbound事件的掩码集合

   static final int MASK_ONLY_OUTBOUND = MASK_BIND MASK_CONNECT MASK_DISCONNECT

   MASK_CLOSE MASK_DEREGISTER MASK_READ MASK_WRITE MASK_FLUSH;

   //inbound事件的掩码集合

   static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED

   MASK_CHANNEL_UNREGISTERED MASK_CHANNEL_ACTIVE MASK_CHANNEL_INACTIVE MASK_CHANNEL_READ

   MASK_CHANNEL_READ_COMPLETE MASK_USER_EVENT_TRIGGERED MASK_CHANNEL_WRITABILITY_CHANGED;

  

 

  比如本小节中我们是在介绍 write 事件的传播,那么就需要在当前ChannelHandler 前边首先是找到一个 ChannelOutboundHandler 类型的ChannelHandler。

  ctx.executionMask (onlyMask mask)) == 0 用于判断前一个 ChannelHandler 是否为我们指定的 ChannelHandler 类型,在本小节中我们指定的是 onluMask = MASK_ONLY_OUTBOUND 即 ChannelOutboundHandler 类型。如果不是,这里就会直接跳过,继续在 pipeline 中向前查找。

  int mask:用于指定前一个 ChannelHandler 需要实现的相关异步事件处理回调。在本小节中这里指定的是 MASK_WRITE ,即需要实现 write 回调方法。通过 (ctx.executionMask mask) == 0 条件来判断前一个ChannelHandler 是否实现了 write 回调,如果没有实现这里就跳过,继续在 pipeline 中向前查找。

  
关于 skipContext 方法的详细介绍,笔者还会在下篇文章全面介绍 pipeline的时候再次进行介绍,这里大家只需要明白该方法的核心逻辑即可。

  3.1.4 向前传播write事件

  通过 findContextOutbound 方法我们在 pipeline 中找到了下一个具有执行资格的 ChannelHandler,这里指的是下一个 ChannelOutboundHandler 类型并且覆盖实现了 write 方法的 ChannelHandler。

  Netty 紧接着会调用这个 nextChannelHandler 的 write 方法实现 write 事件在 pipeline 中的传播。

  

 //获取下一个要被执行的channelHandler指定的executor

 

   EventExecutor executor = next.executor();

   //确保outbound事件的执行 是由 channelHandler指定的executor执行的

   if (executor.inEventLoop()) {

   //如果当前线程是指定的executor 则直接操作

   if (flush) {

   next.invokeWriteAndFlush(m, promise);

   } else {

   next.invokeWrite(m, promise);

   } else {

   //如果当前线程不是channelHandler指定的executor,则封装程异步任务 提交给指定的executor执行

   final WriteTask task = WriteTask.newInstance(next, m, promise, flush);

   if (!safeExecute(executor, task, promise, m, !flush)) {

   task.cancel();

  

 

  在我们向 pipeline 添加 ChannelHandler 的时候可以通过ChannelPipeline#addLast(EventExecutorGroup,ChannelHandler......) 方法指定执行该 ChannelHandler 的executor。如果不特殊指定,那么执行该 ChannelHandler 的executor默认为该 Channel 绑定的 Reactor 线程。

  执行 ChannelHandler 中异步事件回调方法的线程必须是 ChannelHandler 指定的executor。

  所以这里首先我们需要获取在 findContextOutbound 方法查找出来的下一个符合执行条件的 ChannelHandler 指定的executor。

  

EventExecutor executor = next.executor()

 

  

 

  并通过 executor.inEventLoop() 方法判断当前线程是否是该 ChannelHandler 指定的 executor。

  如果是,那么我们直接在当前线程中执行 ChannelHandler 中的 write 方法。

  如果不是,我们就需要将 ChannelHandler 对 write 事件的回调操作封装成异步任务 WriteTask 并提交给 ChannelHandler 指定的 executor 中,由 executor 负责执行。

  这里需要注意的是这个 executor 并不一定是 channel 绑定的 reactor 线程。它可以是我们自定义的线程池,不过需要我们通过 ChannelPipeline#addLast 方法进行指定,如果我们不指定,默认情况下执行 ChannelHandler 的 executor 才是 channel 绑定的 reactor 线程。

  
这里有些同学可能会有疑问,如果我们向pipieline添加ChannelHandler的时候,为每个ChannelHandler指定不同的executor时,Netty如果确保线程安全呢??

  大家还记得pipeline中的结构吗?

  outbound 事件在 pipeline 中的传播最终会传播到 HeadContext 中,之前的系列文章我们提到过,HeadContext 中封装了 Channel 的 Unsafe 类负责 Channel 底层的 IO 操作。而 HeadContext 指定的 executor 正是对应 channel 绑定的 reactor 线程。

  所以最终在 netty 内核中执行写操作的线程一定是 reactor 线程从而保证了线程安全性。

  忘记这段内容的同学可以在回顾下《Reactor在Netty中的实现(创建篇)》,类似的套路我们在介绍 NioServerSocketChannel 进行 bind 绑定以及 register 注册的时候都介绍过,只不过这里将 executor 扩展到了自定义线程池的范围。

  3.1.5 触发nextChannelHandler的write方法回调

  

 //如果当前线程是指定的executor 则直接操作

 

   if (flush) {

   next.invokeWriteAndFlush(m, promise);

   } else {

   next.invokeWrite(m, promise);

  

 

  由于我们在示例 ChannelHandler 中调用的是 ChannelHandlerContext#write 方法,所以这里的 flush = false 。触发调用 nextChannelHandler 的 write 方法。

  

 void invokeWrite(Object msg, ChannelPromise promise) {

 

   if (invokeHandler()) {

   invokeWrite0(msg, promise);

   } else {

   // 当前channelHandler虽然添加到pipeline中,但是并没有调用handlerAdded

   // 所以不能调用当前channelHandler中的回调方法,只能继续向前传递write事件

   write(msg, promise);

  

 

  这里首先需要通过 invokeHandler() 方法判断这个 nextChannelHandler 中的 handlerAdded 方法是否被回调过。因为 ChannelHandler 只有被正确的添加到对应的 ChannelHandlerContext 中并且准备好处理异步事件时, ChannelHandler#handlerAdded 方法才会被回调。

  这一部分内容笔者会在下一篇文章中详细为大家介绍,这里大家只需要了解调用 invokeHandler() 方法的目的就是为了确定 ChannelHandler 是否被正确的初始化。

  

 private boolean invokeHandler() {

 

   // Store in local variable to reduce volatile reads.

   int handlerState = this.handlerState;

   return handlerState == ADD_COMPLETE (!ordered handlerState == ADD_PENDING);

  

 

  只有触发了 handlerAdded 回调,ChannelHandler 的状态才能变成 ADD_COMPLETE 。

  如果 invokeHandler() 方法返回 false,那么我们就需要跳过这个nextChannelHandler,并调用 ChannelHandlerContext#write 方法继续向前传播 write 事件。

  

 @Override

 

   public ChannelFuture write(final Object msg, final ChannelPromise promise) {

   //继续向前传播write事件,回到流程起点

   write(msg, false, promise);

   return promise;

  

 

  如果 invokeHandler() 返回 true ,说明这个 nextChannelHandler 已经在 pipeline 中被正确的初始化了,Netty 直接调用这个 ChannelHandler 的 write 方法,这样就实现了 write 事件从当前 ChannelHandler 传播到了nextChannelHandler。

  

 private void invokeWrite0(Object msg, ChannelPromise promise) {

 

   try {

   //调用当前ChannelHandler中的write方法

   ((ChannelOutboundHandler) handler()).write(this, msg, promise);

   } catch (Throwable t) {

   notifyOutboundHandlerException(t, promise);

  

 

  这里我们看到在 write 事件的传播过程中如果发生异常,那么 write 事件就会停止在 pipeline 中传播,并通知注册的 ChannelFutureListener。

  从本文示例的 pipeline 结构中我们可以看到,当在 EchoServerHandler 调用 ChannelHandlerContext#write 方法后,write 事件会在 pipeline 中向前传播到 HeadContext 中,而在 HeadContext 中才是 Netty 真正处理 write 事件的地方。

  3.2 HeadContext

  

final class HeadContext extends AbstractChannelHandlerContext

 

   implements ChannelOutboundHandler, ChannelInboundHandler {

   @Override

   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {

   unsafe.write(msg, promise);

  

 

  write 事件最终会在 pipeline 中传播到 HeadContext 里并回调 HeadContext 的 write 方法。并在 write 回调中调用 channel 的 unsafe 类执行底层的 write 操作。这里正是 write 事件在 pipeline 中的传播终点。

  

 protected abstract class AbstractUnsafe implements Unsafe {

 

   //待发送数据缓冲队列 Netty是全异步框架,所以这里需要一个缓冲队列来缓存用户需要发送的数据

   private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);

   @Override

   public final void write(Object msg, ChannelPromise promise) {

   assertEventLoop();

   //获取当前channel对应的待发送数据缓冲队列(支持用户异步写入的核心关键)

   ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

   ..........省略..................

   int size;

   try {

   //过滤message类型 这里只会接受DirectBuffer或者fileRegion类型的msg

   msg = filterOutboundMessage(msg);

   //计算当前msg的大小

   size = pipeline.estimatorHandle().size(msg);

   if (size 0) {

   size = 0;

   } catch (Throwable t) {

   ..........省略..................

   //将msg 加入到Netty中的待写入数据缓冲队列ChannelOutboundBuffer中

   outboundBuffer.addMessage(msg, size, promise);

  

 

  众所周知 Netty 是一个异步事件驱动的网络框架,在 Netty 中所有的 IO 操作全部都是异步的,当然也包括本小节介绍的 write 操作,为了保证异步执行 write 操作,Netty 定义了一个待发送数据缓冲队列 ChannelOutboundBuffer ,Netty 将这些用户需要发送的网络数据在写入到 Socket 之前,先放在 ChannelOutboundBuffer 中缓存。

  每个客户端 NioSocketChannel 对应一个 ChannelOutboundBuffer 待发送数据缓冲队列

  3.2.1 filterOutboundMessage

  ChannelOutboundBuffer 只会接受 ByteBuffer 类型以及 FileRegion 类型的 msg 数据。

  FileRegion 是Netty定义的用来通过零拷贝的方式网络传输文件数据。本文我们主要聚焦普通网络数据 ByteBuffer 的发送。

  所以在将 msg 写入到 ChannelOutboundBuffer 之前,我们需要检查待写入 msg 的类型。确保是 ChannelOutboundBuffer 可接受的类型。

  

 @Override

 

   protected final Object filterOutboundMessage(Object msg) {

   if (msg instanceof ByteBuf) {

   ByteBuf buf = (ByteBuf) msg;

   if (buf.isDirect()) {

   return msg;

   return newDirectBuffer(buf);

   if (msg instanceof FileRegion) {

   return msg;

   throw new UnsupportedOperationException(

   "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);

  

 

  在网络数据传输的过程中,Netty为了减少数据从 堆内内存 到 堆外内存 的拷贝以及缓解GC的压力,所以这里必须采用 DirectByteBuffer 使用堆外内存来存放网络发送数据。

  3.2.2 estimatorHandle计算当前msg的大小

  

public class DefaultChannelPipeline implements ChannelPipeline {

 

   //原子更新estimatorHandle字段

   private static final AtomicReferenceFieldUpdater DefaultChannelPipeline, MessageSizeEstimator.Handle ESTIMATOR =

   AtomicReferenceFieldUpdater.newUpdater(

   DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");

   //计算要发送msg大小的handler

   private volatile MessageSizeEstimator.Handle estimatorHandle;

   final MessageSizeEstimator.Handle estimatorHandle() {

   MessageSizeEstimator.Handle handle = estimatorHandle;

   if (handle == null) {

   handle = channel.config().getMessageSizeEstimator().newHandle();

   if (!ESTIMATOR.compareAndSet(this, null, handle)) {

   handle = estimatorHandle;

   return handle;

  

 

  在 pipeline 中会有一个 estimatorHandle 专门用来计算待发送 ByteBuffer 的大小。这个 estimatorHandle 会在 pipeline 对应的 Channel 中的配置类创建的时候被初始化。

  这里 estimatorHandle 的实际类型为DefaultMessageSizeEstimator#HandleImpl。

  

public final class DefaultMessageSizeEstimator implements MessageSizeEstimator {

 

   private static final class HandleImpl implements Handle {

   private final int unknownSize;

   private HandleImpl(int unknownSize) {

   this.unknownSize = unknownSize;

   @Override

   public int size(Object msg) {

   if (msg instanceof ByteBuf) {

   return ((ByteBuf) msg).readableBytes();

   if (msg instanceof ByteBufHolder) {

   return ((ByteBufHolder) msg).content().readableBytes();

   if (msg instanceof FileRegion) {

   return 0;

   return unknownSize;

  

 

  这里我们看到 ByteBuffer 的大小即为 Buffer 中未读取的字节数 writerIndex - readerIndex 。

  当我们验证了待写入数据 msg 的类型以及计算了 msg 的大小后,我们就可以通过 ChannelOutboundBuffer#addMessage方法将 msg 写入到ChannelOutboundBuffer(待发送数据缓冲队列)中。

  write 事件处理的最终逻辑就是将待发送数据写入到 ChannelOutboundBuffer 中,下面我们就来看下这个 ChannelOutboundBuffer 内部结构到底是什么样子的?

  3.3 ChannelOutboundBuffer

  ChannelOutboundBuffer 其实是一个单链表结构的缓冲队列,链表中的节点类型为 Entry ,由于 ChannelOutboundBuffer 在 Netty 中的作用就是缓存应用程序待发送的网络数据,所以 Entry 中封装的就是待写入 Socket 中的网络发送数据相关的信息,以及 ChannelHandlerContext#write 方法中返回给用户的 ChannelPromise 。这样可以在数据写入Socket之后异步通知应用程序。

  此外 ChannelOutboundBuffer 中还封装了三个重要的指针:

  
tailEntry :该指针指向 ChannelOutboundBuffer 中最后一个待发送数据的 Entry。通过 unflushedEntry 和 tailEntry 这两个指针,我们可以很方便的定位到待发送数据的 Entry 范围。

  
flushedEntry :当我们通过 flush 操作需要将 ChannelOutboundBuffer 中缓存的待发送数据发送到 Socket 中时,flushedEntry 指针会指向 unflushedEntry 的位置,这样 flushedEntry 指针和 tailEntry 指针之间的 Entry 就是我们即将发送到 Socket 中的网络数据。

  
这三个指针在初始化的时候均为 null 。

  3.3.1 Entry

  Entry 作为 ChannelOutboundBuffer 链表结构中的节点元素类型,里边封装了待发送数据的各种信息,ChannelOutboundBuffer 其实就是对 Entry 结构的组织和操作。因此理解 Entry 结构是理解整个 ChannelOutboundBuffer 运作流程的基础。

  下面我们就来看下 Entry 结构具体封装了哪些待发送数据的信息。

  

 static final class Entry {

 

   //Entry的对象池,用来创建和回收Entry对象

   private static final ObjectPool Entry RECYCLER = ObjectPool.newPool(new ObjectCreator Entry () {

   @Override

   public Entry newObject(Handle Entry handle) {

   return new Entry(handle);

   //DefaultHandle用于回收对象

   private final Handle Entry handle;

   //ChannelOutboundBuffer下一个节点

   Entry next;

   //待发送数据

   Object msg;

   //msg 转换为 jdk nio 中的byteBuffer

   ByteBuffer[] bufs;

   ByteBuffer buf;

   //异步write操作的future

   ChannelPromise promise;

   //已发送了多少

   long progress;

   //总共需要发送多少,不包含entry对象大小。

   long total;

   //pendingSize表示entry对象在堆中需要的内存总量 待发送数据大小 + entry对象本身在堆中占用内存大小(96)

   int pendingSize;

   //msg中包含了几个jdk nio bytebuffer

   int count = -1;

   //write操作是否被取消

   boolean cancelled;

  

 

  我们看到Entry结构中一共有12个字段,其中1个静态字段和11个实例字段。

  下面笔者就为大家介绍下这12个字段的含义及其作用,其中有些字段会在后面的场景中使用到,这里大家可能对有些字段理解起来比较模糊,不过没关系,这里能看懂多少是多少,不理解也没关系,这里介绍只是为了让大家混个眼熟,在后面流程的讲解中,笔者还会重新提到这些字段。

  ObjectPool Entry RECYCLER:Entry 的对象池,负责创建管理 Entry 实例,由于 Netty 是一个网络框架,所以 IO 读写就成了它的核心操作,在一个支持高性能高吞吐的网络框架中,会有大量的 IO 读写操作,那么就会导致频繁的创建 Entry 对象。我们都知道,创建一个实例对象以及 GC 回收这些实例对象都是需要性能开销的,那么在大量频繁创建 Entry 对象的场景下,引入对象池来复用创建好的 Entry 对象实例可以抵消掉由频繁创建对象以及GC回收对象所带来的性能开销。

  
Handle Entry handle:默认实现类型为 DefaultHandle ,用于数据发送完毕后,对象池回收 Entry 对象。由对象池 RECYCLER 在创建 Entry 对象的时候传递进来。

  
Entry next:ChannelOutboundBuffer 是一个单链表的结构,这里的 next 指针用于指向当前 Entry 节点的后继节点。

  
Object msg:应用程序待发送的网络数据,这里 msg 的类型为 DirectByteBuffer 或者 FileRegion(用于通过零拷贝的方式网络传输文件)。

  
ByteBuffer[] bufs:这里的 ByteBuffer 类型为 JDK NIO 原生的 ByteBuffer 类型,因为 Netty 最终发送数据是通过 JDK NIO 底层的 SocketChannel 进行发送,所以需要将 Netty 中实现的 ByteBuffer 类型转换为 JDK NIO ByteBuffer 类型。应用程序发送的 ByteBuffer 可能是一个也可能是多个,如果发送多个就用 ByteBuffer[] bufs 封装在 Entry 对象中,如果是一个就用 ByteBuffer buf 封装。

  
ChannelPromise promise:ChannelHandlerContext#write 异步写操作返回的 ChannelFuture。当 Netty 将待发送数据写入到 Socket 中时会通过这个 ChannelPromise 通知应用程序发送结果。

  
long total:Entry中总共需要发送多少数据。注意:这个字段并不包含 Entry 对象的内存占用大小。只是表示待发送网络数据的大小。

  
int pendingSize:表示待发送数据的内存占用总量。待发送数据在内存中的占用量分为两部分:

  Entry对象中所封装的待发送网络数据大小。

  Entry对象本身在内存中的占用量。

  
 

  3.3.2 pendingSize的作用

  想象一下这样的一个场景,当由于网络拥塞或者 Netty 客户端负载很高导致网络数据的接收速度以及处理速度越来越慢,TCP 的滑动窗口不断缩小以减少网络数据的发送直到为 0,而 Netty 服务端却有大量频繁的写操作,不断的写入到 ChannelOutboundBuffer 中。

  这样就导致了数据发送不出去但是 Netty 服务端又在不停的写数据,慢慢的就会撑爆 ChannelOutboundBuffer 导致OOM。这里主要指的是堆外内存的 OOM,因为 ChannelOutboundBuffer 中包裹的待发送数据全部存储在堆外内存中。

  所以 Netty 就必须限制 ChannelOutboundBuffer 中的待发送数据的内存占用总量,不能让它无限增长。Netty 中定义了高低水位线用来表示 ChannelOutboundBuffer 中的待发送数据的内存占用量的上限和下限。注意:这里的内存既包括 JVM 堆内存占用也包括堆外内存占用。

  
当待发送数据的内存占用总量超过高水位线的时候,Netty 就会将 NioSocketChannel 的状态标记为不可写状态。否则就可能导致 OOM。

  
当待发送数据的内存占用总量低于低水位线的时候,Netty 会再次将 NioSocketChannel 的状态标记为可写状态。

  
那么我们用什么记录ChannelOutboundBuffer中的待发送数据的内存占用总量呢?

  答案就是本小节要介绍的 pendingSize 字段。在谈到待发送数据的内存占用量时大部分同学普遍都会有一个误解就是只计算待发送数据的大小(msg中包含的字节数) 而忽略了 Entry 实例对象本身在内存中的占用量。

  因为 Netty 会将待发送数据封装在 Entry 实例对象中,在大量频繁的写操作中会产生大量的 Entry 实例对象,所以 Entry 实例对象的内存占用是不可忽视的。

  否则就会导致明明还没有到达高水位线,但是由于大量的 Entry 实例对象存在,从而发生OOM。

  所以 pendingSize 的计算既要包含待发送数据的大小也要包含其 Entry 实例对象的内存占用大小,这样才能准确计算出 ChannelOutboundBuffer 中待发送数据的内存占用总量。

  ChannelOutboundBuffer 中所有的 Entry 实例中的 pendingSize 之和就是待发送数据总的内存占用量。

  

public final class ChannelOutboundBuffer {

 

   //ChannelOutboundBuffer中的待发送数据的内存占用总量

   private volatile long totalPendingSize;

  

 

  3.3.3 高低水位线

  上小节提到 Netty 为了防止 ChannelOutboundBuffer 中的待发送数据内存占用无限制的增长从而导致 OOM ,所以引入了高低水位线,作为待发送数据内存占用的上限和下限。

  那么高低水位线具体设置多大呢 ? 我们来看一下 DefaultChannelConfig 中的配置。

  

public class DefaultChannelConfig implements ChannelConfig {

 

   //ChannelOutboundBuffer中的高低水位线

   private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;

  

 

  

public final class WriteBufferWaterMark {

 

   private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;

   private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;

   public static final WriteBufferWaterMark DEFAULT =

   new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);

   WriteBufferWaterMark(int low, int high, boolean validate) {

   ..........省略校验逻辑.........

   this.low = low;

   this.high = high;

  

 

  我们看到 ChannelOutboundBuffer 中的高水位线设置的大小为 64 KB,低水位线设置的是 32 KB。

  这也就意味着每个 Channel 中的待发送数据如果超过 64 KB。Channel 的状态就会变为不可写状态。当内存占用量低于 32 KB时,Channel 的状态会再次变为可写状态。

  3.3.4 Entry实例对象在JVM中占用内存大小

  前边提到 pendingSize 的作用主要是记录当前待发送数据的内存占用总量从而可以预警 OOM 的发生。

  待发送数据的内存占用分为:待发送数据 msg 的内存占用大小以及 Entry 对象本身在JVM中的内存占用。

  那么 Entry 对象本身的内存占用我们该如何计算呢?

  要想搞清楚这个问题,大家需要先了解一下 Java 对象内存布局的相关知识。关于这部分背景知识,笔者已经在 《一文聊透对象在JVM中的内存布局,以及内存对齐和压缩指针的原理及应用》这篇文章中给出了详尽的阐述,想深入了解这块的同学可以看下这篇文章。

  这里笔者只从这篇文章中提炼一些关于计算 Java 对象占用内存大小相关的内容。

  在关于 Java 对象内存布局这篇文章中我们提到,对于Java普通对象来说内存中的布局由:对象头 + 实例数据区 + Padding,这三部分组成。

  其中对象头由存储对象运行时信息的 MarkWord 以及指向对象类型元信息的类型指针组成。

  MarkWord 用来存放:hashcode,GC 分代年龄,锁状态标志,线程持有的锁,偏向线程 Id,偏向时间戳等。在 32 位操作系统和 64 位操作系统中 MarkWord 分别占用 4B 和 8B 大小的内存。

  
Java 对象头中的类型指针还有实例数据区的对象引用,在64 位系统中开启压缩指针的情况下(-XX:+UseCompressedOops)占用 4B 大小。在关闭压缩指针的情况下(-XX:-UseCompressedOops)占用 8B 大小。

  实例数据区用于存储 Java 类中定义的实例字段,包括所有父类中的实例字段以及对象引用。

  在实例数据区中对象字段之间的排列以及内存对齐需要遵循三个字段重排列规则:

  
规则2:在开启了压缩指针的 64 位 JVM 中,Java 类中的第一个字段的 OFFSET 需要对齐至 4N,在关闭压缩指针的情况下类中第一个字段的OFFSET需要对齐至 8N。

  
规则3:JVM 默认分配字段的顺序为:long / double,int / float,short / char,byte / boolean,oops(Ordianry Object Point 引用类型指针),并且父类中定义的实例变量会出现在子类实例变量之前。当设置JVM参数 -XX +CompactFields 时(默认),占用内存小于 long / double 的字段会允许被插入到对象中第一个 long / double 字段之前的间隙中,以避免不必要的内存填充。

  
还有一个重要规则就是 Java 虚拟机堆中对象的起始地址需要对齐至 8 的倍数(可由JVM参数 -XX:ObjectAlignmentInBytes 控制,默认为 8 )。

  在了解上述字段排列以及对象之间的内存对齐规则后,我们分别以开启压缩指针和关闭压缩指针两种情况,来对 Entry 对象的内存布局进行分析并计算对象占用内存大小。

  

 static final class Entry {

 

   .............省略static字段RECYCLER.........

   //DefaultHandle用于回收对象

   private final Handle Entry handle;

   //ChannelOutboundBuffer下一个节点

   Entry next;

   //待发送数据

   Object msg;

   //msg 转换为 jdk nio 中的byteBuffer

   ByteBuffer[] bufs;

   ByteBuffer buf;

   //异步write操作的future

   ChannelPromise promise;

   //已发送了多少

   long progress;

   //总共需要发送多少,不包含entry对象大小。

   long total;

   //pendingSize表示entry对象在堆中需要的内存总量 待发送数据大小 + entry对象本身在堆中占用内存大小(96)

   int pendingSize;

   //msg中包含了几个jdk nio bytebuffer

   int count = -1;

   //write操作是否被取消

   boolean cancelled;

  

 

  我们看到 Entry 对象中一共有 11 个实例字段,其中 2 个 long 型字段,2 个 int 型字段,1 个 boolean 型字段,6 个对象引用。

  默认情况下JVM参数 -XX +CompactFields 是开启的。

  开启指针压缩 -XX:+UseCompressedOops

  Entry 对象的内存布局中开头先是 8 个字节的 MarkWord,然后是 4 个字节的类型指针(开启压缩指针)。

  在实例数据区中对象的排列规则需要符合规则3,也就是字段之间的排列顺序需要遵循 long int boolean oop(对象引用)。

  根据规则 3 Entry对象实例数据区第一个字段应该是 long progress,但根据规则1 long 型字段的 OFFSET 需要对齐至 8 的倍数,并且根据 规则2 在开启压缩指针的情况下,对象的第一个字段 OFFSET 需要对齐至 4 的倍数。所以字段long progress 的 OFFET = 16,这就必然导致了在对象头与字段 long progress 之间需要由 4 字节的字节填充(OFFET = 12处发生字节填充)。

  但是 JVM 默认开启了 -XX +CompactFields,根据 规则3 占用内存小于 long / double 的字段会允许被插入到对象中第一个 long / double 字段之前的间隙中,以避免不必要的内存填充。

  所以位于后边的字段 int pendingSize 插入到了 OFFET = 12 位置处,避免了不必要的字节填充。

  在 Entry 对象的实例数据区中紧接着基础类型字段后面跟着的就是 6 个对象引用字段(开启压缩指针占用 4 个字节)。

  大家一定注意到 OFFSET = 37 处本应该存放的是字段 private final Handle Entry handle 但是却被填充了 3 个字节。这是为什么呢?

  根据字段重排列规则1:引用字段 private final Handle Entry handle 占用 4 个字节(开启压缩指针的情况),所以需要对齐至4的倍数。所以需要填充3个字。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: