Netty的异步任务处理与Socket事件处理

源码学徒

共 16741字,需浏览 34分钟

 ·

2021-07-26 17:33

有道无术,术尚可求也!有术无道,止于术!

经过前面几章的学习,我们基本是明白了Netty通道的创建、注册、与绑定与JDK NIO的对应关系,如果我们使用的是JDK NIO的方式去开发一个Socket服务端的时候,此时还缺少了一个重要的环节,就是循环处理IO事件!

我们前面不只一次的见到Netty的异步事件,因为我们某些知识还没有学习到,所以我们都按照同步的方式去获取的,所以我们本章节将带你学习,Netty对于IO事件的处理与异步事件的处理!

我们以绑定为出发点,由点到面进行分析!

一、源码入口

我们直接进入到绑定的源码分析:

private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise)
{

// 在触发channelRegistered()之前调用此方法。给用户处理程序一个设置的机会
// 其channelRegistered()实现中的管道。
channel.eventLoop().execute(() -> {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
});
}

我们上节课直接分析的channel.bind方法,而忽略上上面的异步方法,这里我们开始分析异步方法,我们进入到channel.eventLoop().execute()方法:

image-20210430145227945

二、源码分析

我们前面分析过,每个Channel绑定一个NioEventLoop,而EventLoop又是SingleThreadEventExecutor的子类,所以我们进入到io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable):

@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}

---------------------------分界线------------------------------------

//继续往下追 execute
private void execute(Runnable task, boolean immediate) {
//判断当前执行的线程是不是 NIoEventLoopGroup的线程 这里是false
boolean inEventLoop = inEventLoop();
//将任务加入到队列
addTask(task);
//这里永远只能启动一次 一个eventLoop
if (!inEventLoop) {
//启动线程
startThread();
.....................................
}
//io.netty.channel.nio.NioEventLoop.selector
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}

我们这里可以分为两部分:

1. 添加任务

addTask(task);

----------------------------------分界线---------------------------

protected void addTask(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (!offerTask(task)) {
reject(task);
}
}

基础好一点的同学我估计已经有点猜到了,单看这个 offerTask有没有像和队列相关的操作,我们进入到offerTask方法:

final boolean offerTask(Runnable task) {
...............忽略.................
return taskQueue.offer(task);
}

果不其然,果然是入队操作,taskQueue是什么呢?

image-20210430152558414

我们再初始化NioEventLoop的源码分析学习的时候,学习到,我们会创建两个MpscQ队列(多生产者,单消费者),这个taskQueue就是当时我们创建的一个任务队列,这里面将我们提交的异步任务追加到队列里面!

返回异步任务是不是被追加到队列里面了,如果队列满了,或者其他原因追加失败的话,会返回false,就会执行reject方法:

protected final void reject(Runnable task) {
rejectedExecutionHandler.rejected(task, this);
}

这个拒绝策略同样是我们再创建NioEventLoop的时候创建保存的,给大家留一个作业,去追一下这个拒绝策略,判断一下当发生了添加异步任务失败之后,会发生什么呢?

2. 启动消费线程

startThread();

-----------------------------分割线-------------------------
/**
* 启动线程
*/

private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
//启动线程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}

注意,这里有个CAS操作 STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED); 判断消费线程是不是已经启动,如果已经启动就不进入这个逻辑,如果没启动就进入这个逻辑!我们第一次调用,肯定没启动,进入这个逻辑:

doStartThread();
----------------------------分割线---------------------------

private void doStartThread() {
assert thread == null;
//创建一条线程并启动
//这个线程又EventLoop
executor.execute(new Runnable() {
@Override
public void run() {
//保存当前线程 给线程赋值的就是这里
thread = Thread.currentThread();
...........................忽略........................
try {
//进行实际的启动
//io.netty.channel.nio.NioEventLoop.run
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...........................忽略........................
}
}
...........................忽略........................
}
...........................忽略........................
}

代码比较长,我们只分析主线逻辑:

thread = Thread.currentThread();

首先保存了一下当前线程到成员变量,这个分支不是很重要,后面有时间进行分析!

SingleThreadEventExecutor.this.run();

这个就是处理异步任务的代码,我们进入到run方法查看:

image-20210501112253211
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
//存在任务就返回IO时间的数量,不存在任务就返回select阻塞等待事件发生
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
//如果不存在异步任务 就进行事件选择
case SelectStrategy.SELECT:
//下一个定时任务的截至时间 当不存在任务的时候就返回-1
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
//不存在任务就去阻塞获取IO事件
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {
//替换一个选择器
rebuildSelector0();
//选择次数重置为0
selectCnt = 0;
//处理循环异常 主要处理方式就是睡眠一会让程序主动释放CPU
handleLoopException(e);
continue;
}
//本次循环次数+1
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
//这里是默认值 50
final int ioRatio = this.ioRatio;
boolean ranTasks;
//不会进这个分支
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
//当存在I/O事件的时候
} else if (strategy > 0) {
//记录一下当前的时间
final long ioStartTime = System.nanoTime();
try {
//处理IO事件
processSelectedKeys();
} finally {
//计算处理IO事件耗费的事件
final long ioTime = System.nanoTime() - ioStartTime;
//里面的时间是计算处理异步任务的时间尽量保持为1:1
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//没有IO事件的话就处理异步任务
ranTasks = runAllTasks(0);
}

if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
//没有空轮询的话三次一清空
selectCnt = 0;
//如果空轮询的次数超过默认的512次 就处理空轮询BUG的选择器
} else if (unexpectedSelectorWakeup(selectCnt)) {
//空轮询被处理后清空 轮询次数
selectCnt = 0;
}
} catch (CancelledKeyException e) {
...................忽略........................
} finally {
...................忽略........................
}
}
}

这主线逻辑分为三个:如何解决IO事件、如何处理异步任务、如何解决空轮询BUG!!分支代码关注一下注释,这里分析下主线代码:

I. I/O事件的处理

processSelectedKeys();

private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

selectedKeys是我们在创建NIOEventLoop的时候,会创建一个优化后的的SelectorKeySet集合,使用数组来实现的,大家忘记的话,可以会看一下NioEventLoop的初始化源码篇!

当你没有禁用优化的时候,就会进入到if分支,我们查看if内部代码的源码:

private void processSelectedKeysOptimized() {
//开始遍历所有的主键
for (int i = 0; i < selectedKeys.size; ++i) {
//获取事件
final SelectionKey k = selectedKeys.keys[i];
//将该位置的数据制空
selectedKeys.keys[i] = null;
//获取之间注册NioServerSocketChannel的时候,绑定的Channel对象
final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
//开始进行IO事件处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
.........................忽略............................
}
.........................忽略............................
}
}

获取事件集合中的每一个key,同时获取之前绑定的NioServerSocketChannel,然后调用processSelectedKey处理这个事件:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
//当key失效之后,就关闭通道
....................忽略....................
}

try {
//获取当前事件的key 掩码
int readyOps = k.readyOps();
//是否包含连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//获取包含的事件
int ops = k.interestOps();
//剔除OP_CONNECT事件
ops &= ~SelectionKey.OP_CONNECT;
//重新更新关注的事件
k.interestOps(ops);
//传播 connect事件
unsafe.finishConnect();
}
//如果当前返回的关注事件的掩码包含 OP_WRITE的话
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//开始向通道内刷新数据
ch.unsafe().forceFlush();
}
//如果当前的事件掩码包含读、新连接接入事件 或者 不关注任何事件的时候 传播read事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //传播read事件 可能是新连接接入也可能有数据可读
unsafe.read();
}
} catch (CancelledKeyException ignored) {
//发生异常关闭通道
unsafe.close(unsafe.voidPromise());
}
}

大家可以看到,里面的处理基本和我们对于JDK NIO的处理一致,就是判断各种事件然后进行对应的处理!

II、异步任务的处理

runAllTasks();
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;

do {
//合并任务 将定时任务的队列里面的任务拉去出来,和异步任务的队列进行合并
fetchedAll = fetchFromScheduledTaskQueue();
//开始执行全部的任务
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll);

if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}

这里就是异步任务的被执行的地方,这里分为两个步骤:1. 合并任务   2.执行taskQueue异步任务  3.执行tailQueue异步任务!

  1. 合并任务

    fetchedAll = fetchFromScheduledTaskQueue();

    Netty在我们学习中已经知道了两种队列,一种是taskQueue队列,一种是tailQueue队列,现在又出现了第三种队列:scheduledTaskQueue,他是一个专门存放定时任务的对队列,这里的合并任务就是将即将要执行的任务合并到taskQueue中等待执行!

    这行代码执行完毕后,所有即将要执行的任务都被添加在了taskQueue队列中,等待后续的执行!

  2. 执行taskQueue异步任务

    //注意这里传入的是合并完成后额taskQueue
    runAllTasksFrom(taskQueue)

    上述代码将对应的任务全部集中到了taskQueue队列中后们这里开始消费taskQueue队列进行执行!我们可以适当的看一下源码:

    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    //从taskQueue队列中弹出一个任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
    return false;
    }
    for (;;) {
    //执行任务 调用run方法
    safeExecute(task);
    //继续弹出任务
    task = pollTaskFrom(taskQueue);
    //如果弹出的任务为空
    if (task == null) {
    //直接返回
    return true;
    }
    }
    }
  3. 执行tailQueue异步任务

    afterRunningAllTasks();

    这里开始执行tailQueue节点的任务,可以看到,tailQueue节点的任务执行优先级低于上述两种队列!

    image-20210503101059511
    @Override
    protected void afterRunningAllTasks() {
    //注意这里传入的是 tailQueue
    runAllTasksFrom(tailTasks);
    }

    //继续往下看源码
    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    //弹出任务
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
    return false;
    }
    for (;;) {
    //执行任务
    safeExecute(task);
    //再次弹出任务
    task = pollTaskFrom(taskQueue);
    if (task == null) {
    //任务执行完毕 返回true
    return true;
    }
    }
    }

    这里就不作过多讲解了,这里和上面的逻辑基本一致,只是执行的qeueb不是一个!

III、解决臭名昭著的JDK空轮询BUG

可能大家大家都知道,JDK NIO在事件循环判断的时候可能会出现空轮询的BUG,导致CPU100%,虽然Oracle官方宣称空轮询的BUG已经解决了,但是后续经过一些公司实际的业务上证明并没有解决,只是出现几率小了点,Netty事实上并没有解决这个空轮询BUG只是用另外一种比较巧妙的方法规避开了,我们一起学习下:

首先,我们先想一下,我们如何断定我们的程序可能发生了空轮询的BUG,学习过NIO的都知道,我们会调用一个selector.select()进行阻塞等待有完成的事件发生,当selet方法阻塞解除的时候,就证明一定有我么感兴趣的事件发生,但是当我们发现select方法解除了阻塞,但是事件数量却为0的时候,我们就认为可能出现了空轮询的BUG!

但是IO数量为0并不是一定出现了空轮询的BUG,也可能外部调用了markUp方法,所以我们不能每一次出现事件数量为0的时候都认为程序出现了空轮询BUG,所以我们就需要有一个记录它出现该类异常情况发生的次数,当发生的次数达到了我们设置的阈值,就证明它可能发生了空轮询的BUG,这个时候需要处理这个空轮询的BUG!

那么如何处理呢? 我们任务发生空轮询问题是因为(JDK官方认为,这个Linux Epoll告诉JDK有事件了,但是JDK获取事件的时候获取了一个空,所以JDK只能返回一个0)所以就发生了空轮询:

JDK官方给出的解决方案

Netty是使用的第三种,抛弃旧的选择器,重建一个新的选择器,然后替换旧的选择器,我们一起看下源码!

我们看看Netty是如何做的,我们回到io.netty.channel.nio.NioEventLoop#run源码:

我还是,为了方便讲解,把这段代码贴出来省略和空轮询无关的代码(完整代码见上):

@Override
protected void run() {
int selectCnt = 0;
for (;;) {
........................忽略进行事件选择的代码...................
//本次循环次数+1
selectCnt++;
....................忽略事件处理和异步任务执行的代码................
//当处理的异步任务或者IO事件的数量大于0,证明没有发生空轮询
if (ranTasks || strategy > 0) {
//每隔三次打印一次日志
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
//没有空轮询的话清空
selectCnt = 0;
//如果出现异步任务为空 IO事件为空的话就会进入到这个逻辑
} else if (unexpectedSelectorWakeup(selectCnt)) {
//空轮询被处理后清空 轮询次数
selectCnt = 0;
}
} catch (CancelledKeyException e) {
...................忽略........................
} finally {
...................忽略........................
}
}

可以仔细的看一下 上述代码的注释,我们进入到 unexpectedSelectorWakeup(selectCnt) 方法:

private boolean unexpectedSelectorWakeup(int selectCnt) {
..............忽略日志打印................
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
//判断异常情况的次数是不是超过了预设的512次
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//开始重新构建一个selector
rebuildSelector();
return true;
}
return false;
}

我们读源码到这里,可以知道,当异常执行的次数超过了阈值 512次,就会调用一个  rebuildSelector方法,我们点进去看一下:

public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}

我们按照惯例,按照同步方法调用 rebuildSelector0();

private void rebuildSelector0() {
//获取原始的选择器
final Selector oldSelector = selector;
//声明一个新的选择器
final SelectorTuple newSelectorTuple;

if (oldSelector == null) {
return;
}

try {
//创建一个新的选择器,赋值给新的选择器变量
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}

int nChannels = 0;
//开始遍历旧的选择器,将旧选择器的IO事件的key,绑定到新创建的选择器上
for (SelectionKey key: oldSelector.keys()) {
//获取旧选择器的管道
Object a = key.attachment();
try {
//如果key失效了,就跳过!
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
//获取对应关注的事件掩码
int interestOps = key.interestOps();
//将旧key置为失效
key.cancel();
//重新将管道绑定到新的选择器上
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
//替换管道里面保存的选择器事件主键
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
...............省略...............
}
}
//重新保存新的优化后的选择器和原始选择器
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;

try {
//关闭旧的选择器
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
...............省略..................
}
}
...............省略..................
}

我们从上述代码可以看到,Netty处理空轮询的问题的策略是,当发现你可能发生空轮询的次数超过了512次的时候,就直接重新获取一个新的选择器,然后将旧的选择器直接替换掉,这样空轮询的BUG也就很轻易的解决了!

三、总结

  1. 每一个EventLoop都会启动一条永久运行的线程,用于处理异步任务和IO事件,我们称之为Reactor线程。
  2. 如果存在IO事件的话,会先处理IO事件!
  3. Reactor线程会先将定时任务里面的任务合并到taskqueue里面,然后执行!taskQueue执行完毕后执行tailQueue队列的任务!
  4. 如果空轮询的次数发生了512次,就认为发生了空轮询的BUG,就会抛弃原来的选择器,重建一个新的选择器,将旧选择器上的事件全部绑定到新的选择器上,然后将旧选择器删除!

才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎关注作者的公众号,一起进步,一起学习!



❤️「转发」「在看」,是对我最大的支持❤️



浏览 25
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报