深入剖析 Netty 源码,4 步全链路实现客户端启动

架构之美

共 48765字,需浏览 98分钟

 ·

2021-05-14 06:56


-     前言    -

本文源码地址:
https://gitee.com/wangjianxin199003/netty-source-code-analysis.git
本文所使用的netty版本4.1.6.Final。带注释的netty源码
https://gitee.com/wangjianxin199003/netty

在“BIO vs NIO”这篇文章中我们给出了使用jdk原生nio编写的客户端Hello World。还记得其中的关键步骤吗,咱们再来温习一下。

  1. 创建一个SocketChannel
  2. 连接到服务方端口
  3. 将SocketChannel设置为非阻塞的
  4. 将SocketChannel注册到selector上

今天我们就以这几个关键步骤为目标来看一下在netty中是怎么做的,以及在这几个步骤的中间netty又多做了哪些工作。


-     客户端引导代码    -


以下代码引导启动一个客户端,在本文以下内容中我们以“引导代码”指代这段程序。

  1. 创建一个EventLoopGroup,与服务端不同的是,这里不需要创建bossGroup,因为客户端不需要处理新连接的接入,所以这里的eventLoopGroup的作用相当于服务端的workerGroup。
  2. 创建一个BootStrap并将eventLoopGroup传入,设置channel为NioSocketChannel(对应jdk SocketChannel)。
  3. 设置一个Channel参数和一个Channel属性。
  4. 配置一个handler,这个handler里我们什么也没做,仅仅是打印一些事件日志。
  5. 调用bootstrap.connect方法连接到服务端。运行这段程序,将在控制台打印出如下结果:
HandlerAdded
ChannelRegistered
ChannelActive
/**
 * 欢迎关注公众号“种代码“,获取博主微信深入交流
 *
 * @author wangjianxin
 */
public class com.zhongdaima.netty.analysis.bootstrap.ClientBoot {
    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    //设置Channel参数
                    .option(ChannelOption.TCP_NODELAY, true)
                    //设置Channel属性
                    .attr(AttributeKey.valueOf("ChannelName"), "ClientChannel")
                    .handler(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                            System.out.println("ChannelRegistered");
                        }

                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            System.out.println("ChannelActive");
                        }

                        @Override
                        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                            System.out.println("HandlerAdded");
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8000);
            Channel channel = channelFuture.syncUninterruptibly().channel();
            channel.closeFuture().awaitUninterruptibly();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}


-     启动过程    -


我们从bootstrap.connect("127.0.0.1",8000)跟进去,这里的代码很简单,就是将host和port封装成InetSocketAddress,接着调用doResolveAndConnect方法。

public ChannelFuture connect(String inetHost, int inetPort) {
    return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}

public ChannelFuture connect(SocketAddress remoteAddress) {
    return doResolveAndConnect(remoteAddress, config.localAddress());
}

我们来看一下oResolveAndConnect方法,这里的关键逻辑是调用initAndRegister和doResolveAndConnect0方法。

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        //关键逻辑
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            //关键逻辑
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        //关键逻辑
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }


initAndRegister方法内有关键的3步。1是通过channelFactory.newChannel()创建一个Channel,此处的chnnelFactory的赋值咱们已经在“服务端启动流程”分析过,这里不再赘述;2是init(channel),这里init方法的实现在Boostrap类中;3是config().group().register(channel)。

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
       
    }
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}

接下来咱们重点分析initAndRegister方法内的这3个关键步骤和doResolveAndConnect0方法。

1、新创建一个NioSocketChannel

channelFactory.newChannel()调用Channel实现类的无参构造方法创建实例,此处的实现类为NioSocketChannel,咱们跟到该类的无参构造方法。这里与服务端启动时所使用的NioServerSocketChannel不同的是NioServerSocketChannel是调用provider.openServerSocketChannel()创建一个jdk的ServerSocketChannel,而客户端是调用provider.openSocketChannel()创建一个jdk的SocketChannel。到这里,我们看到了导读中提到的第1步“创建一个SocketChannel”。最后调用到父类AbstractNioByteChannel的构造方法。

public NioSocketChannel() {
    this(DEFAULT_SELECTOR_PROVIDER);
}

public NioSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}

private static SocketChannel newSocket(SelectorProvider provider) {
    try {
       return provider.openSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a socket.", e);
    }
}

public NioSocketChannel(SocketChannel socket) {
    this(null, socket);
}

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

我们来看一下AbstractNioByteChannel的构造方法,很简单,继续调用父类AbstractNioChannel的构造方法。

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}


AbstractNioChannel的构造方法内调用了ch.configureBlocking(false)将Channel设置为非阻塞的,并继续调用了父类AbstractChannel的构造方法。到这里我们看到了导读中提到的第3步“将SocketChannel设置为非阻塞的”。


 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //设置为非阻塞的
            ch.configureBlocking(false);
        } catch (IOException e) {
           
        }
    }

AbstractChannel的构造方法里为Channel分配了一个id,创建了一个Unsafe和一个PipeLine,Unsafe和PipeLine咱们后面再讲。

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}


2、初始化Channel

init方法的实现在Boostrap类中,这里将我们在引导代码所设置的handler添加到pipeline中,再为Channel设置一些参数(引导代码中的option(ChannelOption.TCP_NODELAY, true))和属性(引导代码中的attr(AttributeKey.valueOf("ChannelName"), "ClientChannel"))。

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    //添加引导代码中所设置的handler
    p.addLast(config.handler());
    
    //设置Channel参数
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
            try {
                if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + channel, t);
            }
        }
    }
    //设置Channel属性
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    }
    

3、注册Channel

我们回到AbstractBootstrap的initAndRegister方法,接着往下看到ChannelFuture regFuture = config().group().register(channel),这里就是注册Channel的地方了,咱们跟进去看看。

config.group()的返回是我们在引导代码中所设置的eventLoopGroup。

跟到register(channel)方法里看看,这个register方法是抽象的,具体实现在MultithreadEventLoopGroup中,跟进去。

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

next()方法调用EventExecutorChooser的next()方法选择一个EventLoop。EventExecutorChooser有两个实现,分别是PowerOfTowEventExecutorChooser和GenericEventExecutorChooser,这两个Chooser用的都是轮询策略,只是轮询算法不一样。如果EventLoopGroup内的EventLoop个数是2的幂,则用PowerOfTowEventExecutorChooser,否则用GenericEventExecutorChooser。

PowerOfTowEventExecutorChooser使用位操作。

@Override
public EventExecutor next() {
    return executors[idx.getAndIncrement() & executors.length - 1];
}

从EventLoop的选择算法上我们可以看出,netty为了性能,无所不用其极。

chooser属性的赋值在MultithreadEventExecutorGroup的构造方法内通过chooserFactory创建的。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    chooser = chooserFactory.newChooser(children);
}

而chooserFactory的赋值在MultithreadEventExecutorGroup的另一个构造方法内。当我们在引导代码中通过new NioEventLoopGroup(1)创建EventLoopGroup时最终会调用到这个构造方法内,默认值为DefaultEventExecutorChooserFactory.INSTANCE。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

next()方法选出的EventLoop就是个SingleThreadEventLoop了,我们跟到SingleThreadEventLoop的register方法,最终调用的是unsafe的register方法。

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

unsafe.register方法在io.netty.channel.AbstractChannel.AbstractUnsafe内,我们跟下去看看。在register方法中最主要的有两件事,一是绑定eventloop,二是调用register0方法。此时的调用线程不是EventLoop线程,会发起一个异步任务。

 @Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //绑定eventloop
    AbstractChannel.this.eventLoop = eventLoop;
    
    if (eventLoop.inEventLoop()) {
        register0(promise);
        //此时我们不在EventLoop内,也就是当前线程非EventLoop线程,会走到这个分支
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    //调用子类的register0方法
                    register0(promise);
                }
            });
        } catch (Throwable t) {
           
        }
    }
}
        

        
register0方法内主要有3步操作。

第1步是doRegister(),这个咱们稍后说。

第2步是pipeline.invokeHandlerAddedIfNeeded()这一步是去完成那些在绑定EventLoop之前触发的添加handler操作,比如我们添加了一个ChannelInitializer,在ChannelInitalizer的initChannel方法中添加的Handler,而initChannel被channelAdded方法调用,channelAdded方法的调用必须在EventLoop内,未绑定EventLoop之前这个调用会被封装成异步任务。

这些操作被放在pipeline中的pendingHandlerCallbackHead中,是个双向链表,具体请参考DefaultChannelPipeLine的addLast(EventExecutorGroup group, String name, ChannelHandler handler)方法。

这一步调用了咱们的引导程序中的System.out.println("HandlerAdded"),在控制台打出"HandlerAdded"。

第3步触发ChannelRegistered事件。这一步调用了咱们的引导程序中的System.out.println("ChannelRegistered"),在控制台打出"ChannelRegistered"。

好了,到这里我们已经知道了,为什么我们的引导程会先打出"HandlerAdded"和"ChannelRegistered"。

接着往下isActive()最终调用是的jdk SocketChannel类的isOpen()方法和isConnected方法,咱们不再贴出代码,读者自行查看,很简单,显然这里我们还没有完成连接建立,所以这个if分支的代码并不会执行。

    private void register0(ChannelPromise promise) {
        try {
            //向Selector注册Channel
            doRegister();
           
            //去完成那些在绑定EventLoop之前触发的添加handler操作,这些操作被放在pipeline中的pendingHandlerCallbackHead中,是个链表,具体请参考`DefaultChannelPipeLine`的`addLast(EventExecutorGroup group, String name, ChannelHandler handler)`方法。
            pipeline.invokeHandlerAddedIfNeeded();
            
            //将promise设置为成功的
            safeSetSuccess(promise);

            //触发ChannelRegistered事件
            pipeline.fireChannelRegistered();
            
            //这里并没有Active,因为此时还没建立连接
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
        }
    }

接下来咱们跟进去doRegister方法,这是个抽象方法,本例中方法实现在AbstractNioChannel中。好了,到这里我们终于看到了导读中提到的第4步“向Selector注册Channel”的操作。

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0this);
                return;
            } catch (CancelledKeyException e) {
                
            }
        }
    }

到了这里,我们在导读中说的总共4步操作中,还有第2步没有看到,在哪里呢,接着往下看。

4、连接到服务端

我们回到Bootstrap类的doResolveAndConnect方法,我们已经分析完了initAndRegister(),因为initAndRegister是异步的,返回结果是Future,此时Future有可能已经完成,也可能没有完成,这里对结果做了判断。

如果regFuture已经完成,则直接调用doResolveAndConnect0,否则将doResolveAndConnect0方法的调用放在regFuture的Listener中,等regFuture操作完成成,由EventLoop线程来回调。

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                   
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

那么又有读者疑问了,在这个if判断完成之后到添加Listener之间的这个时间,promise有可能已经完成了,Listener可能不会回调了, 奥秘在DefaultPromise的addListener(GenericFutureListener<? extends Future<? super V>> listener)方法里,这里注册完Listener之后,如果发现promise已经完成了,那么将直接调用nofityListeners方法向EventLoop提交异步任务(此时已经完成绑定EventLoop),该异步任务即是回调刚刚注册的Listener。

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}



咱们回归正题,去看BootStrap类里的doResolveAndConnect0方法,这里首先用AddressResolver去解析SocketAddress,这里的AddressResolver默认值为io.netty.resolver.DefaultAddressResolverGroup#INSTANCE,同样这里是异步的,和initAndRegister那里是一样的,咱们不多讨论。我们把重点聚集在doConnect方法上。

 private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise)
 
{
    try {
        final EventLoop eventLoop = channel.eventLoop();
        final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

        if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {

            doConnect(remoteAddress, localAddress, promise);
            return promise;
        }

        final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

        if (resolveFuture.isDone()) {
            final Throwable resolveFailureCause = resolveFuture.cause();

            if (resolveFailureCause != null) {

                promise.setFailure(resolveFailureCause);
            } else {

                doConnect(resolveFuture.getNow(), localAddress, promise);
            }
            return promise;
        }

        resolveFuture.addListener(new FutureListener<SocketAddress>() {
            @Override
            public void operationComplete(Future<SocketAddress> future) throws Exception {
                if (future.cause() != null) {
                    channel.close();
                    promise.setFailure(future.cause());
                } else {
                    doConnect(future.getNow(), localAddress, promise);
                }
            }
        });
    } catch (Throwable cause) {
        promise.tryFailure(cause);
    }
    return promise;
}

我们来看一下doConnect方法,一般情况下,我们不指定客户端的localAddress,所以这里localAddress一般为null,我们跟进去channel.connect(remoteAddress, connectPromise)方法。

private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}


再次放出这张netty整体架构图。


channel.connect(remoteAddress, connectPromise)方法的实现在AbstractChannel类中,这里调用了pipeline.connect(remoteAddress, promise),又调用到了tail.connect(remoteAddress, promise),这个调用最终会从tail传递到head(参考上边的netty整体架构图),具体怎么传递的,等咱们研究pipeLine的时候再讲。接下来咱们直接到HeadContext中去。

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}


HeadContext的connect方法如下,又委托给了unsafe,调用unsafe.connect(remoteAddress, localAddress, promise)方法,这个方法的实现在AbstractNioUnsafe类中。

@Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise)
 throws Exception 
{
    unsafe.connect(remoteAddress, localAddress, promise);
}

我们来到AbstractNioUnsafe的connect方法,这里调用了doConnect(remoteAddress, localAddress)方法。

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise)
 
{
    try {
        boolean wasActive = isActive();
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

doConnect方法这里的实现在NioSocketChannel中,前面我们说过主动连接时一般localAddress是null,所以这里我们不再讨论doBind0方法了,感兴趣的同学可以回到“服务端启动流程”这篇文章去看doBind0方法。

接下来调用了javaChannel().connect(remoteAddress),由于此前已经把该Channel设置为非阻塞的了,这个connect操作是异步的,是由操作系统来进行异步完成的。所以这里的connect方法有可能返回true也有可能返回false。至此,我们已经看到了导读中提到的第2步连接到服务方端口。到这里,导读中提到的所有操作我们都已经在netty中找到了。

先看一下jdk里面关于connect这个方法的注释:
If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation. If the connection is established immediately, as can happen with a local connection, then this method returns true. Otherwise this method returns false and the connection operation must later be completed by invoking the finishConnect method.

翻译一下:
如果该channel在非阻塞模式下,调用该方法将初始化一个非阻塞的连接操作。如果连接立即完成了,例如可能发生在一个本地连接的情况下,该方法返回true。否则该方法将返回false,随后必须调用finishConnect方法完成连接。

也就是说如果connect返回true的话,表明连接已经完成了。如果返回false的话还需要调用一下finishConnect方法才能最终完成连接。

如果connect方法返回false还需要调用一下finishConnect方法才能完成连接,这个调用在哪里呢?奥秘就在下一行,如果返回false就在selectionKey中加入SelectionKey.OP_CONNECT兴趣事件,等到所绑定的EventLoop发现这个Channel上有SelectionKey.OP_CONNECT事件发生时去调用finishConnect方法,这个咱们一会儿分析。如果返回true,表明连接已经成功,咱们继续回到到AbstractNioUnsafe的connect方法。

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        boolean connected = javaChannel().connect(remoteAddress);
        //多数情况下connected为false
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

如果doConnect方法返回true,则调用fulfillConnectPromise方法,顾名思义应该是将promise设置成完成状态,咱们跟进去看看,doConnect返回false的情况咱们一会再分析。

public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise)
 
{
    try {
        boolean wasActive = isActive();
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

在fulfillConnectPromise方法中,首先调用了promise.trySuccess()方法将promise设置为完成的,又调用了pipeline().fireChannelActive()方法,这里最终会调用到咱们引导代码中的System.out.println("ChannelActive")在控制台打印出ChannelActive。

 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {

    boolean active = isActive();

    boolean promiseSet = promise.trySuccess();

    if (!wasActive && active) {
        pipeline().fireChannelActive();
    }

    if (!promiseSet) {
        close(voidPromise());
    }
}

如果doConnect方法返回false呢,首先将AbstractNioChannel类中的connectPromise赋值为参数中所传来的promise,为什么要赋这个值呢,咱们一会儿揭秘。又继续添加了一个定时任务,这个定时任务将在连接超时时间到来时将connectPromise设置为失败的。

public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise)
 
{
    try {
        boolean wasActive = isActive();
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

好了,我们前面说过doConnect方法极大概率返回false,连接还未完成,那么真正完成连接在哪里呢。其实咱们前面已经说过了,如果connect方法返回false接下来有这么一行代码selectionKey().interestOps(SelectionKey.OP_CONNECT);添加一个SelectionKey.OP_CONNECT兴趣事件。

这部分内容咱们前边还没介绍,我就直接给出了,至于怎么执行到这里的,咱们以后讲。直接到NioEventLoop的processSelectedKey方法里看,这个方法很长,我们只截取其中的一小段。我们看到这里判断了一下是否有SelectionKey.OP_CONNECT事件发生,如果发生了SelectionKey.OP_CONNECT事件,则将SelectionKey.OP_CONNECT从兴趣事件中删除,随后调用unsafe.finishConnect()方法。

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    // See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

咱们来看一下unsafe.finishConnect()方法,这个实现在AbstractNioUnsafe内,调用doFinishFinishConnect()就是本方法的关键了,fulfillConnectPromise(connectPromise, wasActive)咱们前面已经说过了,不再分析。但是咱们要提一点,还记得前面咱们说过在AbstractNioUnsafe的connect方法中把参数中传来的promise赋值给AbstractNioChannel中的connectPromise属性吗?这里就是原因了,如果不在属性中保存这个promise,那么这里就无法将connectPromise传递给fulfillConnectPromise(connectPromise, wasActive)方法。

@Override
public final void finishConnect() {
    assert eventLoop().inEventLoop();

    try {
        boolean wasActive = isActive();
        doFinishConnect();
        fulfillConnectPromise(connectPromise, wasActive);
    } catch (Throwable t) {
      
    } finally {
       
    }
}


doFinishFinishConnect()的实现在NioSocketChannel类中,非常简单,调用jdk的NioSocketChannel类的finishConnect方法。还记得咱们前面留下的问题吗?finishiConnect()方法在哪里调用的,答案已经有了,就在这里。

@Override
protected void doFinishConnect() throws Exception {
    if (!javaChannel().finishConnect()) {
        throw new Error();
    }
}

至此,客户端启动流程我们已经分析完毕。


-     总结    -


netty客户端启动流程:

  1. 创建一个Channel实例,这个过程中将Channel设置为非阻塞的,为Channel创建了PipeLine和Unsafe。
  2. 初始化Channel,为Channel添加引导代码中所设置的handler,并设置参数和属性。
  3. 注册Channel,为Channel绑定一个EventLoop并向Selector注册Channel。
  4. 调用connect方法连接到服务端,如果connect返回false将在Channel发生OP_CONNECT事件时调用finishiConnect方法完成连接。


作者:王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。

来源公众号:种代码

浏览 34
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报