netty nioeventloop,

  netty nioeventloop,

  00-1010NioEventLoop打开方法跟进inEventLoop()方法,用addTask(task)复习初始构造方法。我们跟进doStartThread()方法并回顾execute()方法。在这里,我们继续查看register方法之前的部分。我们已经学习了NioEventLoop的创建和线程分配器的初始化。那么Nioeventloop如何打开呢?我们将继续学习这一节。

  00-1010 NIOEventLoop的打开方法在它的父类SingleThreadEventExecutor的execute(Runnable task)方法中,我们按照这个方法3360。

  @ override public void execute(Runnable task){ if(task==null){ throw new NullPointerException( task );}//确定当前线程是否为eventLoop线程boolean inEventLoop=inEventLoop();//如果是eventLoop线程If(inEventLoop){ add task(task);} else {//不是eventLoop线程启动线程start thread();//添加任务add task(任务);if(isShutdown()remove task(task)){ reject();} }如果(!addtaskkawakesup wakesUpForTask(task)){ wake up(inEventLoop);}}这个方法传入一个Runnble对象,这个对象是一个任务。

  首先,布尔inEventLoop=inEventLoop()方法将确定它是否是一个NioEventLoop线程。

  

目录

@ override public boolean inEventLoop(){ return inEventLoop(thread . current thread());}这里将inEventLoop(Thread . current Thread())方法传入当前线程对象,这个方法会调用当前类的inEventLoop(Thread thread)方法。

 

  跟进事件循环(线程线程)方法3360

  @ override public boolean inEventLoop(Thread Thread){ return Thread==this . Thread;}我们看到判断是基于当前线程对象是否是NioEventLoop绑定的线程对象。这里我们会认为启动一个线程肯定会为NioEventLoop绑定一个线程对象。如果判断当前线程对象不是NioEventLoop绑定的线程对象,则意味着执行该方法的线程不是当前NioEventLoop线程。那么,这个线程是如何初始化的呢?后面会讲到,继续看execute(Runnable task)方法3360。

  如果是NioEventLoop线程,会通过addTask(task)添加任务,并通过NioEventLoop异步执行。这个任务什么时候执行,后面也会讲到?

  

NioEventLoop开启方法

受保护的void add task(Runnable task){ if(task==null){ throw new NullPointerException( task );}//如果添加不成功,如果(!offer task(task)){ reject(task);}}这里offerTask(task)代表添加任务,后面是:

 

  final布尔offer task(Runnable task){ if(is shutdown()){ reject();}//向taskq添加任务返回task queue . offer(task);}我们看到taskQueue.offer(task task)在任务队列中增加了一个任务,这个任务队列taskQueue就是我们的NioEventLoop的初始化。

  的时候与NioEventLoop唯一绑定的任务队列

  

 

  

回顾一下初始构造方法

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}

在这里通过taskQueue = newTaskQueue(this.maxPendingTasks)创建了taskQueue

 

  回到execute(Runnable task)方法中,我们继续往下看:

  如果不是NioEventLoop线程我们通过startThread()开启一个NioEventLoop线程

  跟到startThread()之前,我们先继续往下走:

  开启NioEventLoop线程之后,又通过addTask(task)往taskQueue添加任务

  最后我们注意有这么一段代码:

  

if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop);}

addTaskWakesUp代表添加task之后, NioEventLoop的select()操作是不是要唤醒,这个属性是在初始化NioEventLoop的时候传入的,大家可以回顾下,默认是false,这里!addTaskWakesUp就是需要唤醒, wakesUpForTask(task)与addTaskWakesUp意义相同,默认是true,可以看代码:

 

  

protected boolean wakesUpForTask(Runnable task) { return true;}

这里恒为true,所以这段代码就是添加task时需要通过wakeup(inEventLoop)唤醒,这样NioEventLoop在做select()操作时如果正在阻塞则立刻唤醒,然后执行任务队列的task

 

  回到execute(Runnable task)方法中我们跟进开启线程的startThread()方法中:

  

private void startThread() { //判断线程是否启动, 未启动则启动 if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { //当前线程未启动, 则启动 doStartThread(); } }}

前面的判断是判断当前NioEventLoop线程是否启动,如果未启动,则通过doStartThread()方法启动,我们第一次执行execute(Runnable task)线程是未启动的,所以会执行doStartThread(),后续该线程则不会再执行doStartThread()方法

 

  

 

  

我们跟进doStartThread()方法中

private void doStartThread() { assert thread == null; //线程执行器执行线程(所有的eventLoop共用一个线程执行器) executor.execute(new Runnable() { @Override public void run() { //将当前线程复制给属性 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //开始轮询 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { //代码省略 } } });}

我们重点关注executor.execute()这个方法,其中executor就是我们创建NioEventLoop的线程器, execute()就是开启一个线程

 

  

 

  

回顾下execute()方法

public void execute(Runnable command) { //起一个线程 threadFactory.newThread(command).start();}

我们看到通过线程工厂开启一个线程,由于前面的小节已经剖析,这里不再赘述

 

  开启线程则执行Runnble类中的run()方法,我们看到在run()方法里通过thread = Thread.currentThread()将新开启的线程对象赋值NioEventLoop的thread的属性,这样就可以通过线程对象的判断,来确定是不是NioEventLoop线程了

  后面我们看到SingleThreadEventExecutor.this.run(),这里this,就是当前NioEventLoop对象,而这里的run()方法,就是NioEventLoop中的run()方法,在这个run()方法中,真正开始了selector的轮询工作,对于run()方法的详细剖析,我们会在之后的小节中进行

  刚才我们剖析了NioEventLoop的启动方法,那么根据我们的分析,就是第一次调用NioEventLoop的execute(Runnable task)方法的时候,则会开启NioEventLoop线程,之后的调用只是往taskQueue中添加任务,那么第一次是什么时候开启的呢?这里我们要回顾上一章讲过的内容

  上一章中我们讲过在AbstractServerBootstrap中有个initAndRegister()方法,这个方法主要用于channel的初始化和注册,其中注册的代码为:

  

ChannelFuture regFuture = config().group().register(channel);

其中group()我们剖析过是Boss线程的group,我们剖析过其中的register(channel)方法:

 

  

public ChannelFuture register(Channel channel) { return next().register(channel);}

首先跟到next()方法:

 

  

public EventLoop next() { return (EventLoop) super.next();}

首先调用了其父类MultithreadEventExecutorGroup的next方法,跟进去:

 

  

public EventExecutor next() { return chooser.next();}

这里chooser,就是初始化NioEventLoopGroup的线程选择器,为此分配了不同的策略,这里不再赘述,通过这个方法,返回一个NioEventLoop线程

 

  回到MultithreadEventLoopGroup类的register()方法中, next().register(channel)代表分配后的NioEventLoop的register()方法,这里会调用NioEventLoop的父类SingleThreadEventLoop类中的register()方法

  跟到SingleThreadEventLoop类中的register()方法:

  

public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this));}

DefaultChannelPromise是一个监听器,它会跟随channel的读写进行监听,绑定传入的channel和NioEventLoop,有关Promise后面的章节会讲到

 

  这里我们继续跟进register(new DefaultChannelPromise(channel, this))

  

public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;}

unsafe()方法返回创建channel初始化的unsafe()对象,如果是NioSeverSocketChannel,则绑定NioMessageUnsafe对象,上一小节进行剖析过这里不再赘述

 

  最终这个unsafe对象会调用到AbstractChannel的内部类AbstractUnsafe中的register()方法,这里register(),无论是客户端channel和服务器channel都会通过这个一个register注册,在以后的客户端接入章节中我们会看到

  

 

  

这里我们继续看register方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) { //代码省略 //所有的复制操作, 都交给eventLoop处理(1) AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { //做实际主注册(2) register0(promise); } }); } catch (Throwable t) { //代码省略 } }}

这里我们上一小节分析过,不再陌生,这里只分析有关NioEventLoop相关的内容

 

  我们首先看到AbstractChannel.this.eventLoop = eventLoop,获取当前channel的NioEventLoop,通过上一章的学习,我们知道每个channel创建的时候会绑定一个NioEventLoop

  这里通过eventLoop.inEventLoop()判断当前线程是否是NioEventLoop线程, inEventLoop()方法在前面的小节剖析过,这里不再赘述

  如果是NioEventLoop线程则通过register0(promise)方法做实际的注册,但是我们第一次执行注册方法的时候,如果是服务器channel是则是由server的用户线程执行的,如果是客户端channel,则是由Boss线程执行的,所以走到这里均不是当前channel的NioEventLoop的线程,于是会走到下面的eventLoop.execute()方法中

  eventLoop.execute()上一小节剖析过,就是将task添加到taskQueue中并且开启器NioEventLoop线程,所以,在这里就开启了NioEventLoop线程,有关开启步骤,可以通过上一小节内容进行回顾

  这里注意一点,有的资料会讲第一次开启NioEventLoop线程是在AbstractBootstrap的doBind0(regFuture, channel, localAddress, promise)方法中开启的,个人经过debug和分析,实际上并不是那样的,希望大家不要被误导

  简单看下doBind0(regFuture, channel, localAddress, promise)方法:

  

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { //绑定端口 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });}

这里虽然调用了eventLoop的execute()方法,但是eventLoop线程在注册期间已经启动,所以这里不会重复启动,只会将任务添加到taskQueue中

 

  其实这里我们也能够看出,其实绑定端口的相关操作,同样是也是eventLoop线程中执行的

  以上就是Netty源码分析NioEventLoop线程的启动的详细内容,更多关于Netty NioEventLoop线程启动的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: