Netty 源码深度解析-EventLoop(1):EventLoop的构造

共 21805字,需浏览 44分钟

 ·

2021-05-23 09:18


-     前言    -


本文源码地址:netty-source-code-analysis

本文所使用的netty版本4.1.6.Final:带注释的netty源码

EventLoop在netty中发挥着驱动引擎的作用,本文我们以NioEventLoopGroupNioEventLoop为例着重分析一下EventLoopGroupEventLoop的创建、一些重要的数据结构和netty的一些优化。



-     NioEventLoopGroup    -


咱们以NioEventLoopGroup为例进行分析。NioEventLoopGroup有很多构造方法,咱们不再一一贴出,只贴出2个关键的构造方法。

NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider)该构造方法给出了SelectStrategyFactory的默认值为DefaultSelectStrategyFactory.INSTANCEEventLoop在每次循环时需要调用该类的calculateStrategy方法来决定循环的策略,具体咱们后边讲。

我们通过new NioEventLoopGroup()或者new NioEventLoopGroup(32)创建EventLoopGroup时最终都会调用到这个构造方法。接着又调用了另一个构造方法,咱们跟下去。

public NioEventLoopGroup(
        int nThreads, Executor executor, final SelectorProvider selectorProvider)
 
{
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

这里调用了父类 MultithreadEventLoopGroup的构造方法,咱们继续跟下去。

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

MultithreadEventLoopGroup的构造方法如下,这里给出了nThreads的默认值,为MultithreadEventLoopGroup的静态属性DEFAULT_EVENT_LOOP_THREADS。接着调用父类MultithreadEventExecutorGroup的构造方法。

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

DEFAULT_EVENT_LOOP_THREADS的赋值在MultithreadEventLoopGroup的静态代码块中,我们看到该值默认为cpu核数×2。

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    

咱们接着跟到MultithreadEventExecutorGroup的构造方法,这里继续调用本类的构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),并且为executor赋默认值。

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}

executor的默认值为ThreadPerTaskExecutor,顾名思义就是为每一个任务创建一个线程,我们看一下它的实现,代码如下,execute方法中为每一个任务创建了一个线程。

public final class ThreadPerTaskExecutor implements Executor {

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

我们继续跟到MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),这里接着调用另一个构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)并且为chooserFacotry赋默认值,chooserFactory咱们在“服务端的启动流程”和“客户端的启动流程”中均有涉及,作用是在为Channel绑定EventLoop时轮询地从EventLoopGroup里选择EventLoop,这里不再赘述。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

接着看下一个构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)这里就是创建EventLoopGroup的核心逻辑了

首先创建了一个EventExecutor数组,并赋值给children属性,数据长度和nThreads相等,很容易想到,这里应该就是保存EventLoop的数组了。

紧接着循环调用newChild方法为数组的每一个元素赋值。

最后调用chooserFactory.newChooser(children)chooser赋值。

 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
              
            } finally {
            }
        }
        chooser = chooserFactory.newChooser(children);
}

我们跟着看一下newChild方法,该方法为抽象的,这里newChild方法的实现在NioEventLoopGroup中。好了,到这里咱们看到了NioEventLoop的构造方法,继续跟进去。

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}



-     NioEventLoop    -


来看NioEventLoop的构造方法,这里NioEventLoop保存了3个属性。

  • providerselector的工厂类,从provider可以得到selector
  • selector: 调用provider.openSelector()得到的selector,这里netty做了优化,咱们后边讲。
  • selectStrategy:这个的calculateStrategy方法返回值,决定了EventLoop的下一步动作,咱们也是后边讲。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    selector = openSelector();
    selectStrategy = strategy;
}

继续跟到父类SingleThreadEventLoop的构造方法,这里初始化了一个属性tailTasks,具体有什么用,咱们一会儿说,先记住这里有一个任务队列叫tailTasks

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                boolean addTaskWakesUp, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    tailTasks = newTaskQueue(maxPendingTasks);
}

继续跟到父类SingleThreadEventExecutor的构造方法,这里有几个属性赋值。

  • addTaskWakesUp:这个是用来唤醒阻塞在taskQueue上的EventLoop线程的,在NioEventLoopEventLoop不会阻塞在taskQueue上,这里用处不是很大,可以先不研究它。
  • maxPendingTasks:后面紧接着就用到了,任务队列的最大长度。
  • executor:真正生成线程的类,默认是ThreadPerTaskExecutor
  • taskQueue任务队列,还记得上边已经有一个tailTask队列了吗,这里是另外一个队列
  • rejectedExecutionHandler:拒绝策略。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

继续跟到父类AbstractScheduledEventExecutor的构造方法,这里什么也没干,又继续调用父类的构造方法,但是我们注意到该类中也有一个队列scheduledTaskQueue,顾名思义是定时任务队列,只是该队列没有在构造方法中初始化,等到有定时任务加入时才初始化。

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    Queue<ScheduledFutureTask<?>> scheduledTaskQueue;

    protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
        super(parent);
    }
}

继续跟到父类AbstractEventExecutor的构造方法,这里为parent赋值,即该EventLoop所属的EventLoopGroup

protected AbstractEventExecutor(EventExecutorGroup parent) {
    this.parent = parent;
}



-     Netty 的优化    -


3.1 对selector的优化

我们回到NioEventLoop类中的openSelector方法,这个方法在干什么呢,它在通过反射替换sun.nio.ch.SelectorImpl类的两个属性selectedKeyspublicSelectedKeys,将这两个属性都替换成了SelectedSelectionKeySet类的实例。为什么要这么换呢,咱们接着往下看。

 private Selector openSelector() {
        final Selector selector;
        try {
            selector = provider.openSelector();
        } catch (IOException e) {
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (ClassNotFoundException e) {
                } catch (SecurityException e) {
                }
            }
        });

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    selectedKeysField.setAccessible(true);
                    publicSelectedKeysField.setAccessible(true);

                    selectedKeysField.set(selector, selectedKeySet);
                    publicSelectedKeysField.set(selector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
 
                } catch (IllegalAccessException e) {

                } catch (RuntimeException e) {
                   
                }
            }
        });
        if (maybeException instanceof Exception) {

        } else {
            //将selectedKeySet保存到selectedKeys属性中
            selectedKeys = selectedKeySet;
        }

        return selector;
    }

sun.nio.ch.SelectorImpl类中的selectedKeyspublicSelectedKeys的作用是保存有兴趣事件发生的Selectionkey,其中publicSelectedKeysselectedKeys的包装类,Util.ungrowableSet(this.selectedKeys),看方法名ungrowableSet表示包装之后这个set就不能添加元素了,但是可以删除元素。也就是说jdk内部使用selectedKeys(注意这是个protected字段)进行添加和删除操作,而暴露给用户的是publicSelectedKeys只能进行删除操作(看selectedKeys方法)。

默认实现用的是HashSet

public abstract class SelectorImpl extends AbstractSelector {
    protected Set<SelectionKey> selectedKeys = new HashSet();
    protected HashSet<SelectionKey> keys = new HashSet();
    private Set<SelectionKey> publicKeys;
    private Set<SelectionKey> publicSelectedKeys;

    protected SelectorImpl(SelectorProvider var1) {
        super(var1);
        if (Util.atBugLevel("1.4")) {
            this.publicKeys = this.keys;
            this.publicSelectedKeys = this.selectedKeys;
        } else {
            this.publicKeys = Collections.unmodifiableSet(this.keys);
            this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
        }

    }
    
    public Set<SelectionKey> selectedKeys() {
        if (!this.isOpen() && !Util.atBugLevel("1.4")) {
            throw new ClosedSelectorException();
        } else {
            return this.publicSelectedKeys;
        }
    }
}

再来看看SelectedSelectionKeySet,这里用的数据结构是数组,很显然netty这里的优化是因为数组的元素添加和遍历操作比HashSet更快。有同学对这里有两个数组的存在表示疑问,大家不用操心这个问题了,因为我也没看懂为什么会有两个数组,还要交替使用。netty在后来的版本中做了优化,只用一个数组就实现了。所以这里只要知道netty用数组进行优化就可以了。

那么又一个问题来了,为什么jdk里边不用数组或者List,而用HashSet呢,那是因为selectorselect方法每次调用都会把已经准备好的SelectionKey放入selectedKeys中,如果用户在第一次调用select方法之后没有处理相应Channel的事件的,也没有删除selectedKeys中的元素,那么再次调用select之后,相同的SelectionKey会再次加入selectedKeys中,如果selectedKeys使用数组或者List实现将起不到去重的效果。

另一方面,如果使用List或者数组,删除成本也比较高。

而netty的实现中保证不会在连续调用两次select方法之间不删除selectedKeys中的元素,而且netty直接将selectedKeys暴露出来,在删除的时候可以直接将数据中对应索引的元素设置为null。

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    private SelectionKey[] keysA;
    private int keysASize;
    private SelectionKey[] keysB;
    private int keysBSize;
    private boolean isA = true;
}

3.2 对任务队列的优化

SingleThreadEventExecutor的构造方法中taskQueue属性通过调用newTaskQueue方法产生,newTaskQueue方法在NioEventLoop中被覆盖了,我们看一下。

这里被优化成了MpscQueue,全称是MultiProducerSingleConsumerQueue,即多生产者单消费者队列,感兴趣的同学自己去看一下,既然单独做一个这样的队列出来,在当前场景下自然是比BlockingQueue性能更好了。

为什么可以做这样的优化呢,很显然,这里的队列消费者只有一个那就是EventLoop,而生产者会有多个。并且,这里有一行注释This event loop never calls takeTask(),这就说明这个MultiProducerSingleConsumerQueue并没有阻塞的take方法,而正好NioEventLoop也不需要调用阻塞的take方法,正好适合。

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return PlatformDependent.newMpscQueue(maxPendingTasks);
}

同样的,tailTask这个队列也被优化成了MpscQueue。我们看一下scheduledTaskQueue是什么队列,答案在AbstractScheduledEventExecutor#scheduledTaskQueue方法中,我们看到scheduledTaskQueue仅仅是一个普通的优先级队列,甚至都不是一个线程安全的阻塞队列。

Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
    }
    return scheduledTaskQueue;
}

为什么呢,为什么tailTaskstaskQueueMpscQueue,而scheduledTaskQueue是非线程安全的队列呢?可能是因为没有的带优先级功能的MpsQueue吧。

scheduledTaskQueue是非线程安全的队列,会不会有多线程安全问题呢,答案是不会。我们看一下AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)方法,这里在添加定时任务时,如果是非EventLoop线程调用,则会发起一个异步调用,最终往scheduledTaskQueue添加定时任务的还是EventLoop线程。所以呢,这里又有另一个优化,那就是可以延迟初始化scheduledTaskQueue



-     总结    -


画重点来了,本文的重点就这两个。

  • 每个EventLoop中有3个队列,分别是tailTaskstaskQueuescheduledTaskQueue。而且tailTaskstaskQueue队列在NioEventLoop中的被优化为MultiProducerSingleConsumerQueue
  • netty对selector中的selectedKeys做了优化,从HashSet替换为SelectedSelectionKeySetSelectedSelectionKeySet是用数组实现的Set,添加元素和遍历效率更高。


作者:王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。

来源公众号:种代码

浏览 25
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报