我为 Netty 贡献源码

共 140402字,需浏览 281分钟

 ·

2022-06-26 17:49

本系列Netty源码解析文章基于 4.1.56.Final版本

写在前面.....

本文是笔者肉眼盯 Bug 系列的第三弹,前两弹分别是:

而在本篇文章中笔者又用肉眼盯出了 Netty 在处理 TCP 连接半关闭时的一个 Bug。

image.png
image.png

那么在接下来的内容中,笔者会随着源码深入的解读慢慢的为大家一层一层地拨开迷雾,带大家来一步一步分析这个 Bug 产生的原因以及造成的影响,并逐步带大家把这个 Bug 修复掉。

下面就让我们一起带着怀疑,审视,欣赏,崇敬,敬畏的态度来一起品读世界顶级程序员编写出的代码。由衷的感谢他们在这一领域做出的贡献。

本文概要.png

在笔者前边关于 Netty Reactor 的系列文章中,我们详细的分析了 Reactor 的创建启动运行,以及接收网络连接接收网络数据,然后通过 pipeline 对 IO 事件的编排处理,最后到发送网络数据的一整套流程实现。相信大家通过对这一系列文章的阅读思考,已经对 Reactor 在 Netty 中的实现有了一个全面并且深刻的认识。

那么现在就到了关闭连接的时刻了,在本文中笔者将带大家一起剖析下关闭连接在 Netty 中的整个实现逻辑。

在 Netty 中对于用户关闭连接的处理分为三大模块:

  1. 处理正常的 TCP 连接关闭。

  2. 处理异常的 TCP 连接关闭。

  3. 处理 TCP 连接半关闭的场景。

接下来,笔者就带大家从这三个连接关闭场景来全面分析下 Netty 是如何处理连接关闭的。

首先我们来看下最简单的场景 --- 正常的TCP连接关闭。

1. 正常 TCP 连接关闭

在进入源码实现之前,我们先来回顾下 TCP 连接关闭的整个流程,其实 Netty 中针对连接关闭的整个源码实现流程也是按照图中 TCP 连接关闭的四次挥手步骤进行的。

TCP连接关闭.png
  1. 首先 Netty 客户端在对应的 ChannelHandler 中调用 ctx.channel().close() 方法主动关闭连接,内核会向服务端发送一个 FIN 包,随即客户端连接进入 FIN_WAIT1 状态。
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

   @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       // 客户端连接进入 FIN_WAIT1 状态
       ctx.channel().close();
    }
}
  1. 服务端内核协议栈在接收到客户端发送过来的 FIN 包后,会自动回复客户端一个 ACK 包,随后会将文件结束符 EOF 插入到 Socket 接收缓冲区中的末尾。服务端连接状态进入 CLOSE_WAIT ,客户端接收到 ACK 包后进入FIN_WAIT2 状态。

  2. 当服务端内核协议栈将 EOF 插入到 Socket 的接收缓冲区时,这时 OP_READ 事件活跃,Reactor 线程随即会处理 channel 上的 OP_READ 事件,只不过此时从 channel 中读取到的字节数为 -1 ,表示对端发起了 channel 关闭请求。服务端开始执行连接关闭流程。

  3. 由于客户端调用的是 ctx.channel().close() 方法来关闭连接,相当于将 TCP 连接的读写通道同时关闭,所以客户端在 FIN_WAIT2 状态下无法在接收服务端发送的数据,但此时服务端处于 CLOSE_WAIT 状态下仍可向客户端发送数据,只不过客户端在接收到数据后会丢弃并发送 RST 报文给服务端。

  4. 服务端在 CLOSE_WAIT 状态下,调用 ctx.channel().close() 向客户端发送 FIN 包,随即进入 LAST_ACK 状态。

  5. 客户端在收到来自服务端的 FIN 包后,回复 ACK 包给服务端,完成四次挥手,随即进入 TIME_WAIT 状态,服务端在收到客户端的 ACK 包后结束 LAST_ACK 状态进入 CLOSE 状态。

Netty 中对于连接关闭的处理主要在第 3 步和第 5 步,剩下的逻辑均由内核协议栈处理完成。

从上述 TCP 关闭连接的四次挥手步骤中,我们可以看出 Netty 对于关闭连接的响应是通过处理 OP_READ 事件来完成的,而对于 OP_READ 事件的处理,笔者已经在 Netty如何高效接收网络数据 一文中详细介绍过了,这里我们直接来到 OP_READ 事件的处理函数中,聚焦于连接关闭逻辑的处理。

Reactor线程运行时结构.png

当 Reactor 线程轮询到 Channel 上有 OP_READ 事件活跃时,就会来到 NioEventLoop#processSelectedKey 函数中去处理活跃的 IO 事件,在本文的语义中 OP_READ 事件就表示连接关闭事件。

public final class NioEventLoop extends SingleThreadEventLoop {

   private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
      
                  .................省略..............

        try {
            int readyOps = k.readyOps();

                  .................省略..............

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //处理 OP_READ 事件,本文中表示连接关闭事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
}

最终会在 AbstractNioByteChannel#read 方法中完成对 OP_READ 事件的处理,下图中置灰的逻辑处理模块即为 Netty 在整个 OP_READ 事件处理中关于连接关闭事件的处理位置。

Netty 中关于 OP_READ 事件的处理一共分为两大模块,一块是针对接收连接上网络数据的处理。另一块则是本文的主题,针对连接关闭事件的处理。

连接关闭事件处理.png
public abstract class AbstractNioByteChannel extends AbstractNioChannel {

        @Override
        public final void read() {
            final ChannelConfig config = config();

            ..........省略连接半关闭处理........

            ..........省略获取allocHandle过程.......

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    //记录本次读取了多少字节数
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询
                    // -1 表示客户端主动关闭了连接close或者shutdownOutput 这里均会返回-1
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        //当客户端主动关闭连接时(客户端发送fin1),会触发read就绪事件,这里从channel读取的数据会是-1
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    .........省略.............

                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    //此时客户端发送fin1(fi_wait_1状态)主动关闭连接,服务端接收到fin,并回复ack进入close_wait状态
                    //在服务端进入close_wait状态 需要调用close 方法向客户端发送fin_ack,服务端才能结束close_wait状态
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                 ............省略...............
            } finally {
                 ............省略...............
            }
        }
    }

}

在前边 TCP 连接关闭的步骤 3 中我们提到,当服务端的内核协议栈接收到来自客户端的 FIN 包后,内核协议栈会向 Socket 的接收缓冲区插入文件结束符 EOF ,表示客户端已经主动发起了关闭连接流程,这时 NioSocketChannel 上的 OP_READ 事件活跃,随即 Reactor 线程会在 AbstractNioByteChannel#read 方法中处理 OP_READ 事件。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        //读到EOF后,这里会返回-1
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

}

Reactor 线程会通过 ByteBuf#writeBytes 方法读取 NioSocketChannel 中的数据,由于此时底层 Socket 接收缓冲区中只有一个 EOF 并没有其他接收数据,所以这里的 ByteBuf#writeBytes 方法会返回 -1。表示客户端已经发起了连接关闭流程,此时服务端连接状态为 CLOSE_WAIT ,客户端连接状态为 FIN_WAIT2 。

Netty处理TCP关闭流程.png
     boolean close = false;
     close = allocHandle.lastBytesRead() < 0;
     if (close) {
           closeOnRead(pipeline);
     }

当本次 read loop 从 Channel 中读取到的字节数为 -1 时,则进入 closeOnRead 方法,服务端开始关闭连接流程。

从上述 Netty 处理 TCP 正常关闭流程( Socket 接收缓冲区中只有 EOF ,没有其他正常接收数据)可以看出,这种情况下只会触发 ChannelReadComplete 事件而不会触发 ChannelRead 事件。

2. Netty 对 TCP 连接正常关闭的处理

       private void closeOnRead(ChannelPipeline pipeline) {
           //判断服务端连接接收方向是否关闭,这里肯定是没有关闭的
           if (!isInputShutdown0()) {
                if (isAllowHalfClosure(config())) {
                      .....省略TCP连接半关闭处理逻辑.......
                } else {
                    //如果不支持半关闭,则服务端直接调用close方法向客户端发送fin,结束close_wait状态进如last_ack状态
                    close(voidPromise());
                }
            } else {
                    .....省略TCP连接半关闭处理逻辑.......
            }
        }

众所周知 TCP 是一个面向连接的、可靠的、基于字节流的全双工传输层通信协议,既然它是全双工的,那就意味着 TCP 连接同时有一个读通道和写通道。

image.png

这里的 isInputShutdown0 方法是用来判断 TCP 连接上的读通道是否关闭,那么在当前情况下,服务端的读通道肯定还没有关闭,因为目前 Netty 还没有调用任何关闭连接的系统调用。

    @Override
    protected boolean isInputShutdown0() {
        return isInputShutdown();
    }

    @Override
    public boolean isInputShutdown() {
        return javaChannel().socket().isInputShutdown() || !isActive();
    }

至于这里为什么要对读通道是否关闭进行判断,笔者会在本文 TCP  连接半关闭相关处理章节为大家详细解释。

由于本小节介绍的是 TCP 连接正常关闭的场景,并不是半关闭,所以这里的 isAllowHalfClosure = false 。Reactor 线程进入 close 方法,执行真正的关闭流程。

2.1 close 方法发起 TCP 连接关闭流程

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

      @Override
      public void close(final ChannelPromise promise) {
            assertEventLoop();

            ClosedChannelException closedChannelException =
                    StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");

            close(promise, closedChannelException, closedChannelException, false);
      }

      private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify)
 
{

                      .........省略...........

      }

}

这里正是 netty 关闭 channel 的核心逻辑所在,而关闭 channel 的行为又分为主动关闭和被动关闭,如本例中,客户端主动调用 ctx.channel().close() 发起关闭流程为主动关闭方,而服务端则是被动关闭方。

而主动关闭方和被动关闭方在这里的传参是不一样的,我们先来看被动关闭方也就是本例中服务端在调用 close 方法的传参。

        @Override
        public void close(final ChannelPromise promise) {
            assertEventLoop();

            ClosedChannelException closedChannelException =
                    StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
            close(promise, closedChannelException, closedChannelException, false);
        }
  • ChannelPromise promise:服务端作为被动关闭方,这里传入的 ChannelPromise 类型为 VoidChannelPromise ,表示调用方对处理结果并不关心,VoidChannelPromise 不可添加 Listener ,不可修改操作结果状态。
public final class VoidChannelPromise extends AbstractFuture<Voidimplements ChannelPromise {

    @Override
    public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
        fail();
        return this;
    }

    @Override
    public boolean isDone() {
        return false;
    }

   @Override
    public boolean setUncancellable() {
        return true;
    }

    @Override
    public VoidChannelPromise setFailure(Throwable cause) {
        fireException0(cause);
        return this;
    }

    @Override
    public boolean trySuccess() {
        return false;
    }
   
}

而作为主动关闭方的客户端则需要监听 Channel 关闭的结果,所以这里传递的 ChannelPromise 参数为 DefaultChannelPromise 。

        ChannelFuture channelFuture = ctx.channel().close();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                  ...........省略.......
            }
        });
    @Override
    public ChannelFuture close() {
        return close(newPromise());
    }

    @Override
    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel(), executor());
    }
  • Throwable cause:当 Channel 关闭之后,需要清理 Channel 写入缓冲队列 ChannelOutboundBuffer 中的待发送数据,这里会将异常 cause 传递给用户的 writePromise ,通知用户 Channel 已经关闭,write 操作失败。这里传入的异常类型为 StacklessClosedChannelException 。
write事件传播流程.png

如图中所示,当用户调用 ctx.writeAndFlush(msg) 发送数据时,由于是异步发送 Netty 会在图中的第 2 步直接返回一个  ChannelFuture 给用户,发送成功或者发送失败都会通知这个 ChannelFuture 。如果在数据发送之前连接就关闭了,那么 Netty 就会把 StacklessClosedChannelException 异常通知给用户持有的这个 ChannelFuture。相关数据的发送细节,感兴趣的读者可以在回顾下笔者的 一文搞懂 Netty 发送数据全流程 这篇文章。

  • ClosedChannelException closeCause:这个参数和 Throwable cause 参数的作用差不多,都是用于在连接关闭的时候如果此时还有待发送数据未发送。就通知用户这里在参数中指定的异常。唯一不同的是 Throwable cause 负责通知给 Channel 发送数据缓冲队列 ChannelOutboundBuffer 中的 flushedEntry 队列。ClosedChannelException closeCause 负责通知给 ChannelOutboundBuffer 中的 unflushedEntry 队列。
ChannelOutboundBuffer结构.png

这里大家只需要理解个大概,稍微有个印象就行,笔者后面还会详细介绍。

  • boolean notify:由于在关闭 Channel 之后,会清理 Channel 对应的发送缓冲队列 ChannelOutboundBuffer 中存储的待发送数据,同时也会释放其中用于存储待发送数据用的 ByteBuffer ,当 ChannelOutboundBuffer 中的内存占用低于低水位线的时候,会触发 ChannelWritabilityChanged 事件。这里的参数 boolean notify 决定是否触发 ChannelWritabilityChanged 事件,由于当前是关闭操作,所以 notify = false ,不需要触发 ChannelWritabilityChanged 事件。

在介绍完 close 方法的各个参数之后,接下来我们来看一下具体的关闭逻辑:

2.1.1 连接关闭之前的校验工作

      // channel的关闭流程是否已经开始
      private boolean closeInitiated;

      // 关闭channel操作的指定future,来判断关闭流程进度 每个channel对应一个CloseFuture
      // 连接关闭之后,netty 会通知这个CloseFuture
      private final CloseFuture closeFuture = new CloseFuture(this);

      private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify)
 
{
            if (!promise.setUncancellable()) {
                //关闭操作如果被取消则直接返回
                return;
            }

            if (closeInitiated) {
                //如果此时channel已经开始关闭流程,则进入这里
                if (closeFuture.isDone()) {               
                    //如果channel已经关闭 则设置promise为success,如果promise是voidPromise类型则会跳过
                    safeSetSuccess(promise);
                } else if (!(promise instanceof VoidChannelPromise)) { 
                    //如果promise不是voidPromise,则会在关闭完成后 通过closeFuture设置promise success
                    closeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            promise.setSuccess();
                        }
                    });
                }
                // 直接返回,防止重复关闭
                return;
            }
  
            //当前channel现在开始进入正在关闭状态
            closeInitiated = true;

            .......关闭channel.........
        }

Netty 这里使用一个 boolean closeInitiated 变量来防止 Reactor 线程来重复执行关闭流程,因为 Channel 的关闭操作可以在多个业务线程中发起,这样就会导致多个业务线程向 Reactor 线程提交多个关闭 Channel 的任务。

除此之外,Netty 还为每一个 Channel 创建了一个 CloseFuture closeFuture,用来表示 Channel 关闭的相关进度状态。当 Channel 完成关闭后,Netty 会设置 closeFuture 为 success 状态,并通知 closeFuture 上注册的 listener 。

如果 closeInitiated == true 说明当前 Channel 的关闭操作已经开始,如果有多个业务线程先后提交过来多个关闭任务,Reactor 线程则会首先通过 closeFuture.isDone() 判断当前 Channel 是否已经完成关闭 ,如果 Channel 已经关闭,则会在 closeFuture 上注册的 listener 中设置关闭任务对应的 Promie 为 success ,进而通知到业务线程。

     protected final void safeSetSuccess(ChannelPromise promise) {
            if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
                logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
            }
    }

从这里也可以看出 VoidChannelPromise 表示一个空的 Promise ,不能对其设置 success 或者 fail , 更不能对其添加 listener 。一般用于不关心操作结果的场景。

如果此时 Channel 的关闭流程虽然已经开始但还未完成的情况下,则将关闭任务对应 Promise (在业务线程中持有)的通知动作封装成 ChannelFutureListener  并添加到 closeFuture 中。当 Channel 关闭后,closeFuture 会被设置为 success ,并通知其中注册的 ChannelFutureListener 。

image.png

2.1.2 Channel关闭前的准备工作

        private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify)
 
{
            
            ...........省略连接关闭之前的校验工作........

            //当前channel是否active,这里肯定是active的
            final boolean wasActive = isActive();
            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            //将channel对应的写缓冲区channelOutboundBuffer设置为null 表示channel要关闭了,不允许继续发送数据
            //此时如果还在write数据,则直接释放bytebuffer,并立马 fail 相关writeFuture 并抛出newClosedChannelException异常
            //此时如果执行flush,则会直接返回
            this.outboundBuffer = null
            //如果开启了SO_LINGER,则需要先将channel从reactor中取消掉。避免reactor线程空转浪费cpu
            Executor closeExecutor = prepareToClose();

            .............省略关闭Channel逻辑流程.......
        }

通过 isActive() 获取 Channel 的状态 boolean wasActive ,由于此时我们还没有关闭 Channel,所以 Channel 现在的状态肯定是 active 的。之所以在关闭流程的一开始就获取 Channel 是否 active 的状态,是因为当我们关闭 Channel 之后,需要通过这个状态来判断 channel 是否是第一次从 active 变为 inactive ,如果是第一次,则会触发 ChannelInactive 事件在 Channel 对应的 pipeline 中传播。

在 Channel 关闭之前,还会将 Channel 对应的写入缓冲队列 ChannelOutboundBuffer 设置为 null ,表示 Channel 即将要关闭了,不允许业务线程在继续发送数据。

一文搞懂 Netty 发送数据全流程 一文中我们提到过,如果 Channel 准备关闭的时候,用户还在向 Channel 写入数据,则直接释放 bytebuffer ,并立马 fail 掉相关 ChannelPromise 并抛出 newClosedChannelException 异常。

        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();
            //获取当前channel对应的待写入数据缓冲队列(支持用户异步写入的核心关键)
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // outboundBuffer == null说明channel准备关闭了,直接标记发送失败。
            if (outboundBuffer == null) {
                try {
                    ReferenceCountUtil.release(msg);
                } finally {
                    safeSetFailure(promise,
                            newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
                }
                return;
            }

            .............省略.........
         }

如果此时用户还在执行 Channel 的 flush 操作发送数据,那么发送流程直接会 return 掉,停止发送数据。

        @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            //channel以关闭
            if (outboundBuffer == null) {
                return;
            }

            .........省略........
       }

2.1.3 针对 SO_LINGER 选项的处理

        @Override
        protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    //在设置SO_LINGER后,channel会延时关闭,在延时期间我们仍然可以进行读写,这样会导致io线程eventloop不断的循环浪费cpu资源
                    //所以需要在延时关闭期间 将channel注册的事件全部取消。
                    doDeregister();

                    /**
                     * 设置了SO_LINGER,不管是阻塞socket还是非阻塞socket,在关闭的时候都会发生阻塞,所以这里不能使用Reactor线程来
                     * 执行关闭任务,否则Reactor线程就会被阻塞。
                     * */

                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {
            }
            //在没有设置SO_LINGER的情况下,可以使用Reactor线程来执行关闭任务
            return null;
        }
    }

要理解这段逻辑,首先我们需要理解 SO_LINGER 这个 Socket 选项,他会影响 Socket 的关闭行为。

在默认情况下,当我们调用 Socket 的 close 方法后 ,close 方法会立即返回,剩下的事情会交给内核协议栈帮助我们处理,如果此时 Socket 对应的发送缓冲区还有数据待发送,接下来内核协议栈会将 Socket 发送缓冲区的数据发送出去,随后会向对端发送 FIN 包关闭连接。注意:此时应用程序是无法感知到这些数据是否已经发送到对端的,因为应用程序在调用 close 方法后就立马返回了,剩下的这些都是内核在替我们完成。接着主动关闭方就进入了 TCP 四次挥手的关闭流程最后进入TIME_WAIT状态。

TCP连接关闭.png

而 SO_LINGER 选项会控制调用 close 方法关闭 Socket 的行为。

  struct linger {
      int l_onoff;   // linger active
      int l_linger;  // how many seconds to linger for
  };
  • l_onoff :表示是否开启 SO_LINGER 选项。0 表示关闭。默认情况下是关闭的。

  • int l_linger:如果开启了 SO_LINGER 选项,则该参数表示应用程序调用 close 方法后需要阻塞等待多长时间。单位为秒。

这两个参数的不同组合会影响到 Socket 的关闭行为:

  • l_onoff = 0 时 l_linger 的值会被忽略,属于我们上边讲述的默认关闭行为。

  • l_onoff = 1,l_linger > 0:这种情况下,应用程序调用 close 方法后就不会立马返回,无论 Socket 是阻塞模式还是非阻塞模式,应用程序都会阻塞在这里。直到以下两个条件其中之一发生,才会解除阻塞返回。随后进行正常的四次挥手关闭流程。

    • 当 Socket 发送缓冲区的数据全部发送出去,并等到对端 ACK 后,close 方法返回。
    • 应用程序在 close 方法上的阻塞时间到达 l_linger 设置的值后,close 方法返回。
so_linger关闭.png
  • l_onoff = 1,l_linger = 0:这种情况下,当应用程序调用 close 方法后会立即返回,随后内核直接清空 Socket 的发送缓冲区,并向对端发送 RST 包,主动关闭方直接跳过四次挥手进入 CLOSE 状态,注意这种情况下是不会有 TIME_WAIT 状态的。
RST关闭连接.png

Netty 也提供了 SO_LINGER 选项的设置,由于一般关闭连接的行为都是由客户端发起,我们以 Netty 客户端代码为例说明:

public final class EchoClient {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.SO_LINGER, 2)
              ..........省略........
        }
}
public class DefaultSocketChannelConfig extends DefaultChannelConfig
                                        implements SocketChannelConfig 
{

    @Override
    public SocketChannelConfig setSoLinger(int soLinger) {
        try {
            if (soLinger < 0) {
                javaSocket.setSoLinger(false0);
            } else {
                javaSocket.setSoLinger(true, soLinger);
            }
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

}

默认情况下 SO_LINGER 选项是关闭的,在 JDK 底层设置 SO_LINGER 选项的方法 setSoLinger  中,参数 on 对应 l_onoff ,参数 linger 对应 l_linger ,单位为秒。

public void setSoLinger(boolean on, int linger) throws SocketException 

当我们理解了 SO_LINGER 选项的工作原理及其应用之后,现在回过头来在看 prepareToClose 方法的逻辑就很容易理解了。

        @Override
        protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    //在设置SO_LINGER后,channel会延时关闭,在延时期间我们仍然可以进行读写,这样会导致io线程eventloop不断的循环浪费cpu资源
                    //所以需要在延时关闭期间 将channel注册的事件全部取消。
                    doDeregister();

                    /**
                     * 设置了SO_LINGER,不管是阻塞socket还是非阻塞socket,在关闭的时候都会发生阻塞,所以这里不能使用Reactor线程来
                     * 执行关闭任务,否则Reactor线程就会被阻塞。
                     * */

                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {
            }
            //在没有设置SO_LINGER的情况下,可以使用Reactor线程来执行关闭任务
            return null;
        }

首先我们来关注下 prepareToClose 方法的返回值,它会返回一个 Executor  ,这个 Executor 用于执行真正的 Channel 关闭任务。

大家这里可能会有疑问,Channel 上的 IO 操作之前不都是由 Reactor 线程负责执行吗?为什么这里需要用一个单独的 Executor 来执行呢?

原因就是如果我们设置了 SO_LINGER 选项 config().getSoLinger() > 0 ,如果继续采用 Reactor 线程执行 Channel 关闭的动作,那么在这种情况下底层Socket 的 close 方法会阻塞 Reactor 线程,直到 Socket 发送缓冲区中的数据全部发送出去并收到对端 ACK ,或者 linger 指定的超时时间到达。

由于 Reactor 线程负责多个 Channel 上的 IO 处理,如果被阻塞在这里,就会影响其他 Channel 上的 IO 处理,降低吞吐。所以当我们设置了 SO_LINGER 选项时,就不能使用 Reactor 线程来执行 Channel 关闭的动作,而是用GlobalEventExecutor.INSTANCE来负责执行 Channel 的关闭动作。

如果我们没有设置 SO_LINGER 选项,底层 Socket 的 close 方法会立即返回并不会阻塞,所以这种情况下,依然会使用 Reactor 线程来执行 Channel 的关闭动作。

prepareToClose 方法这种情况下会返回 null ,表示默认采用 Reactor 线程来执行 Channel 的关闭。

这里还有一个重要的点需要和大家强调的是,当我们设置了 SO_LINGER 选项之后,Channel 的关闭动作会被阻塞并延时关闭,在延时关闭期间,Reactor 线程依然可以响应 OP_READ 事件和 OP_WRITE 事件,这可能会导致 Reactor 线程不断的自旋循环浪费 CPU 资源,所以基于这个原因,netty 这里需要将 Channel 从 Reactor 上注销掉。这样 Reactor 线程就不会在响应 Channel 上的 IO 事件了。

2.1.4 doDeregister 注销 Channel

public abstract class AbstractNioChannel extends AbstractChannel {

   //channel注册到Selector后获得的SelectKey
    volatile SelectionKey selectionKey;

    @Override
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());
    }

    protected SelectionKey selectionKey() {
        assert selectionKey != null;
        return selectionKey;
    }
}
public final class NioEventLoop extends SingleThreadEventLoop {
    //记录socketChannel从Selector上注销的个数 达到256个 则需要将无效selectKey从SelectedKeys集合中清除掉
    private int cancelledKeys;

    private static final int CLEANUP_INTERVAL = 256;
    /**
     * 将socketChannel从selector中注销 取消监听IO事件
     * */

    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        // 当从selector中注销的socketChannel数量达到256个,设置needsToSelectAgain为true
        // 在io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中重新做一次轮询,将失效的selectKey移除,
        // 以保证selectKeySet的有效性
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }

}

Channel 在向 Reactor 中的 Selector 注册成功后,会得到一个 SelectionKey 。这个 SelectionKey 可以理解成 Channel 在 Selector 中的模型。

当 Channel 需要将自己从 Selector 中注销掉时,直接可以通过调用对应的 SelectionKey#cancel 方法。此时调用 SelectionKey#isValid 将会返回 false 。

SelectionKey#cancel 方法调用后,Selector 会将要取消的这个 SelectionKey 加入到 Selector 中的 cancelledKeys 集合中。

public abstract class AbstractSelector extends Selector {

    private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();

    void cancel(SelectionKey k) {                      
        synchronized (cancelledKeys) {
            cancelledKeys.add(k);
        }
    }
}

随后在 Selector 的下一次轮询过程中,会将 cancelledKeys 集合中的 SelectionKey 从 Selector 中所有的 KeySet 中移除。这里的 KeySet 包括Selector用于存放 IO 就绪 SelectionKey 的 selectedKeys 集合,以及用于存放所有在 Selector 上注册的 Channel 对应 SelectionKey 的 keys 集合。

public abstract class SelectorImpl extends AbstractSelector {

    protected Set<SelectionKey> selectedKeys = new HashSet();
    protected HashSet<SelectionKey> keys = new HashSet();
    
     .....................省略...............
}

这里需要注意的是当我们调用 SelectionKey#cancel 方法后,该 SelectionKey 并不会立马从 Selector 中删除,只不过此时调用 SelectionKey#isValid 方法会返回 false 。需要等到下次轮询 selector.selectNow() 的时候,被取消掉的 SelectionKey 才会从 Selector 中被删除掉。

当在本次轮询期间,假如有大量的 Channel 从 Selector 中注销,就绪集合 selectedKeys 中依然会保存这些 Channel 对应 SelectionKey 直到下次轮询。那么当然会影响本次轮询结果 selectedKeys 的有效性,增加了许多不必要的遍历开销。

所以 netty 在 NioEventLoop#cancel 方法中做了一个优化来保证 Selector 中的 IO 就绪集合 selectedKeys 的有效性,当 Selector 中注销的 Channel 数量 cancelledKeys 超过 CLEANUP_INTERVAL = 256 个时,就会将 needsToSelectAgain 标志设置为 true 。

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {

            ......循环处理Selector中的IO就绪集合selectedKeys.....

            if (needsToSelectAgain) {
                selectedKeys.reset(i + 1);
                selectAgain();
                i = -1;
            }
        }
    }

当 Reactor 线程在循环遍历处理 Selector 中的 IO 活跃 Channel 时,如果 needsToSelectAgain = true ,那么就会立马执行一次 selector.selectNow() ,目的就是为了清除 Selector 中已经注销的 Selectionkey ,从而保证IO就绪集合 selectedKeys 的有效性。

    private void selectAgain() {
        needsToSelectAgain = false;
        try {
            selector.selectNow();
        } catch (Throwable t) {
            logger.warn("Failed to update SelectionKeys.", t);
        }
    }

2.1.5 Channel 的关闭

image.png

prepareToClose 方法返回的 closeExecutor 是用来执行 Channel 关闭操作的,当我们开启了 SO_LINGER 选项时,closeExecutor = GlobalEventExecutor.INSTANCE ,避免了 Reactor 线程的阻塞。

由 GlobalEventExecutor 负责执行 doClose0 方法关闭 Channel 底层的 Socket,并通知 closeFuture 关闭结果。

        private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify)
 
{
            
            ...........省略重进入关闭流程处理........

            ...........省略Channel关闭前的准备工作........

            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 在GlobalEventExecutor中执行channel的关闭任务,设置closeFuture,promise success
                            doClose0(promise);
                        } finally {
                            // reactor线程中执行
                            invokeLater(new Runnable() {
                                @Override
                                public void run() {
                                    if (outboundBuffer != null) {
                                        // cause = closeCause = ClosedChannelException, notify = false
                                        // 此时channel已经关闭,需要清理对应channelOutboundBuffer中的待发送数据flushedEntry
                                        outboundBuffer.failFlushed(cause, notify);
                                        //循环清理channelOutboundBuffer中的unflushedEntry
                                        outboundBuffer.close(closeCause);
                                    }
                                    //这里的active = true
                                    //关闭channel后,会将channel从reactor中注销,首先触发ChannelInactive事件,然后触发ChannelUnregistered
                                    fireChannelInactiveAndDeregister(wasActive);
                                }
                            });
                        }
                    }
                });
            } else {
                 ...........省略在Reactor中Channel关闭的逻辑........
            }
        }

当 Channel 的关闭操作在 closeExecutor 线程中执行完毕之后,此时 Channel 从物理上就已经关闭了,但是 Channel 中还有一些遗留的东西需要清理,比如 Channel 对应的写入缓冲队列 ChannelOutboundBuffer 中的待发送数据需要被清理掉,并通知用户线程由于 Channel 已经关闭,导致数据发送失败。

同时 Netty 也需要让用户感知到 Channel 已经关闭的事件,所以还需要在关闭 Channel 对应的 pipeline 中触发 ChannelInactive 事件和 ChannelUnregistered 事件。

而以上列举的这两点清理 Channel 的相关工作则需要在 Reactor 线程中完成,不能在 closeExecutor 线程中完成。这是处于线程安全的考虑,因为在 Channel 关闭之前,对于 ChannelOutboundBuffer 以及 pipeline 的操作均是由 Reactor 线程来执行的,Channel 关闭之后相关的清理工作理应继续由 Reactor 线程负责,避免多线程执行产生线程安全问题。

2.1.5.1 doClose0 关闭 Channel

        // 关闭channel操作的指定future,来判断关闭流程进度 每个channel一个
        private final CloseFuture closeFuture = new CloseFuture(this);

        private void doClose0(ChannelPromise promise) {
            try {
                // 关闭channel,此时服务端向客户端发送fin2,服务端进入last_ack状态,客户端收到fin2进入time_wait状态
                doClose();
                // 设置clostFuture的状态为success,表示channel已经关闭
                // 调用shutdownOutput则不会通知closeFuture
                closeFuture.setClosed();
                // 通知用户promise success,关闭操作已经完成
                safeSetSuccess(promise);
            } catch (Throwable t) {
                closeFuture.setClosed();
                // 通知用户线程关闭失败
                safeSetFailure(promise, t);
            }
        }

首先调用 doClose() 方法关闭底层 JDK 中的 SocketChannel 。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    @Override
    protected void doClose() throws Exception {
        super.doClose();
        javaChannel().close();
    }

}

这里大家需要注意的一个点是,在 JDK 底层 SocketChannel 的关闭方法中,同样也会将该 Channel 关联的所有 SelectionKey 取消掉。因为在 prepareToClose 方法中我们提到,只有我们设置了 SO_LINGER 选项时,才会在 prepareToClose 方法中调用 doDeregister 方法将 Channel 关联的 SelectionKey 从 Selector 中取消掉。

而当我们没有设置 SO_LINGER 选项时,则不会提前调用 doDeregister 方法取消。所以需要在这里真正关闭 Channel 的地方,将其关联的所有 SelectionKey 取消掉。

    public final void close() throws IOException {
        synchronized (closeLock) {
            if (!open)
                return;
            open = false;
            implCloseChannel();
        }
    }

    protected final void implCloseChannel() throws IOException {
        implCloseSelectableChannel();
        synchronized (keyLock) {
            int count = (keys == null) ? 0 : keys.length;
            //关闭与该Channel相关的所有SelectionKey
            for (int i = 0; i < count; i++) {
                SelectionKey k = keys[i];
                if (k != null)
                    k.cancel();
            }
        }
    }

当我们调用了 doClose() 方法后,此时服务端的内核协议栈就会向客户端发出 FIN 包,服务端结束 CLOSE_WAIT 状态进入 LAST_ACK 状态。客户端收到服务端的 FIN 包后,向服务端回复 ACK 包,随后客户端进入 TIME_WAIT 状态。服务端收到客户端的 ACK 包后结束 LAST_ACK 状态进入 CLOSE 状态。

TCP连接关闭.png

当调用 doClose() 完成 Channel 的关闭后,就会调用 closeFuture.setClosed() 通知 Channel 的 closeFuture 关闭成功。

static final class CloseFuture extends DefaultChannelPromise {

        boolean setClosed() {
            return super.trySuccess();
        }

}

随后调用 safeSetSuccess(promise) 通知用户的 promise 关闭成功。

image.png

2.1.5.2 清理 ChannelOutboundBuffer

这里大家需要注意:清空 ChannelOutboundBuffer 的操作是在 Reactor 线程中执行的。

ChannelOutboundBuffer结构.png
       if (outboundBuffer != null) {
                // Fail all the queued messages
                // cause = closeCause = ClosedChannelException, notify = false
                // 此时channel已经关闭,需要清理对应channelOutboundBuffer中的待发送数据flushedEntry
                outboundBuffer.failFlushed(cause, notify);
                //循环清理channelOutboundBuffer中的unflushedEntry
                outboundBuffer.close(closeCause);
       }

当 Channel 关闭之后,此时 Channel 中的写入缓冲队列 ChannelOutboundBuffer 中可能会有一些待发送数据,这时就需要将这些待发送数据从 ChannelOutboundBuffer 中清除掉。

通过调用 ChannelOutboundBuffer#failFlushed 方法,循环遍历 flushedEntry 指针到 tailEntry 指针之间的 Entry 对象,将其从 ChannelOutboundBuffer 链表中删除,并释放 Entry 对象中封装的 byteBuffer ,通知用户的 promise 写入失败。并回收 Entry 对象实例。

public final class ChannelOutboundBuffer {

    void failFlushed(Throwable cause, boolean notify) {
        if (inFail) {
            return;
        }

        try {
            inFail = true;
            for (;;) {
                // 循环清除channelOutboundBuffer中的待发送数据
                // 将entry从buffer中删除,并释放entry中的bytebuffer,通知promise failed
                if (!remove0(cause, notify)) {
                    break;
                }
            }
        } finally {
            inFail = false;
        }
    }

    private boolean remove0(Throwable cause, boolean notifyWritability) {
        Entry e = flushedEntry;
        if (e == null) {
            //清空当前reactor线程缓存的所有待发送数据
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;
        //从channelOutboundBuffer中删除该Entry节点
        removeEntry(e);

        if (!e.cancelled) {
            // only release message, fail and decrement if it was not canceled before.
            //释放msg所占用的内存空间
            ReferenceCountUtil.safeRelease(msg);
            //编辑promise发送失败,并通知相应的Lisener
            safeFail(promise, cause);
            //由于msg得到释放,所以需要降低channelOutboundBuffer中的内存占用水位线,并根据notifyWritability决定是否触发ChannelWritabilityChanged事件
            decrementPendingOutboundBytes(size, false, notifyWritability);
        }

        // recycle the entry
        //回收Entry实例对象
        e.recycle();

        return true;
    }
}

在 remove0 方法中 netty 会将已经关闭的 Channel 对应的 ChannelOutboundBuffer 中还没来得及 flush 进 Socket 发送缓存区中的数据全部清除掉。这部分数据就是上图中 flushedEntry 指针到 tailEntry 指针之间的 Entry对象。

Entry 对象中封装了用户待发送数据的 ByteBuffer,以及用于通知用户发送结果的 promise 实例。

这里需要将这些还未来得及 flush 的 Entry 节点从 ChannelOutboundBuffer 中全部清除,并释放这些 Entry 节点中包裹的发送数据 msg 所占用的内存空间。并标记对应的 promise 为失败同时通知对应的用户 listener 。

以上的清理逻辑主要是应对在 Channel 即将关闭之前,用户极限调用 flush 操作想要发送数据的情况。

另外还有一种情况 Netty 这里需要考虑处理,由于在关闭 Channel 之前,用户可能还会向 ChannelOutboundBuffer 中 write 数据,但还未来得及调用 flush 操作,这就导致了 ChannelOutboundBuffer 中在 unflushedEntry 指针与 tailEntry 指针之间还可能会有数据。

之前我们清理的是 flushedEntry 指针与 tailEntry 指针之间的数据,这里大家需要注意区分。

所以还需要调用 ChannelOutboundBuffer#close 方法将这一部分数据全部清理掉。

public final class ChannelOutboundBuffer {

  void close(final Throwable cause, final boolean allowChannelOpen) {
        if (inFail) {
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    close(cause, allowChannelOpen);
                }
            });
            return;
        }

        inFail = true;

        if (!allowChannelOpen && channel.isOpen()) {
            throw new IllegalStateException("close() must be invoked after the channel is closed.");
        }

        if (!isEmpty()) {
            throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
        }

        // Release all unflushed messages.
        //循环清理channelOutboundBuffer中的unflushedEntry,因为在执行关闭之前有可能用户有一些数据write进来,需要清理掉
        try {
            Entry e = unflushedEntry;
            while (e != null) {
                // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
                int size = e.pendingSize;
                TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);

                if (!e.cancelled) {
                    //释放unflushedEntry中的bytebuffer
                    ReferenceCountUtil.safeRelease(e.msg);
                    //通知unflushedEntry中的promise failed
                    safeFail(e.promise, cause);
                }
                e = e.recycleAndGetNext();
            }
        } finally {
            inFail = false;
        }
        //清理channel用于缓存JDK nioBuffer的 threadLocal缓存NIO_BUFFERS
        clearNioBuffers();
    }

}

当我们清理完 ChannelOutboundBuffer 中的残留数据之后,ChannelOutboundBuffer 中的内存水位线就会下降,由于当前是关闭操作,所以这里的 notifyWritability = false ,不需要触发 ChannelWritabilityChanged 事件。

关于对 ChannelOutboundBuffer 的详细操作,笔者已经在 一文搞懂 Netty 发送数据全流程 一文中详细介绍过了,忘记的同学可以在回顾下这篇文章。

2.1.5.3  触发 ChannelInactive 事件和 ChannelUnregistered 事件

在 Channel 关闭之后并清理完 ChannelOutboundBuffer 中遗留的待发送数据,就该在 Channel 的 pipeline 中触发 ChannelInactive 事件和 ChannelUnregistered 事件了。同样以下的这些操作也都是在 Reactor 线程中执行的。

       private void fireChannelInactiveAndDeregister(final boolean wasActive) {
            //wasActive && !isActive() 条件表示 channel的状态第一次从active变为 inactive
            //这里的wasActive = true  isActive()= false
            deregister(voidPromise(), wasActive && !isActive());
        }

这里传递进来的参数 wasActive = true ,在我们关闭 Channel 之前会通过 isActive() 先获取一次,在该方法中通过 wasActive && !isActive() 判断 Channel 是否是第一次从 active 状态变为 inactive 状态。如果是,则触发后续的 ChannelInactive 事件。

        private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
            if (!promise.setUncancellable()) {
                return;
            }

            if (!registered) {
                safeSetSuccess(promise);
                return;
            }

            invokeLater(new Runnable() {
                @Override
                public void run() {
                    try {
                        //将channel从reactor中注销,reactor不在监听channel上的事件
                        doDeregister();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                    } finally {
                        if (fireChannelInactive) {
                            //当channel被关闭后,触发ChannelInactive事件
                            pipeline.fireChannelInactive();
                        }

                        if (registered) {
                            //如果channel没有注册,则不需要触发ChannelUnregistered
                            registered = false;
                            //随后触发ChannelUnregistered
                            pipeline.fireChannelUnregistered();
                        }
                        //通知deRegisterPromise
                        safeSetSuccess(promise);
                    }
                }
            });
        }

注意这里又会调用 doDeregister() 方法将 Channel 从 Reactor 上注销,到目前为止,我们已经看到有三个地方执行注销 Channel 的操作了。

  • 第一次是在 prepareToClose() 方法中,当我们设置了 SO_LINGER 选项后,为了防止 Reactor 线程在延时关闭期间,还在不停的自旋循环响应 OP_READ 事件和 OP_WRITE 事件从而造成浪费 CPU 资源,我们需要 doDeregister() 方法将 Channel 从 Reactor 上取消。

  • 第二次是在真正的关闭 Channel 的时候,JDK 底层在关闭 SocketChannel 的时候又会将 Channel 从 Selector 上取消。应对关闭 SO_LINGER 选项的情况

  • 第三次就是在本小节中,触发 ChannelUnregistered 事件之前,又会调用 doDeregister() 方法将 Channel 从 Reactor 上取消。

这里大家可能会有疑问,这第三次注销操作是应对哪种情况呢?

首先 JDK NIO 底层在将 Channel 从 Selector 上注销的时候做了防重处理,多次调用注销操作是没有影响的。

另外这个方法可能会在用户的 ChannelHandler 中被调用,因为用户的行为我们无法预知,用户可能在 Channel 关闭前调用,所以这里还是需要调用一次 doDeregister() 方法。为的就是应对用户在 ChannelHandler 中主动注销 Channel 同时不希望 Channel 关闭的场景。

        // 仅仅是注销 Channel,但是 Channel 不会关闭
        ctx.deregister();
        ctx.channel().deregister();

在调用完 doDeregister() 方法之后,netty 紧接着就会在 Channel 的 pipeline 中触发 ChannelInactive 事件以及 ChannelUnregistered 事件,并且这两个事件只会被触发一次。

在接收连接的时候,当 Channel 向 Reactor 注册成功之后,是先触发 ChannelRegistered 事件后触发 ChannelActive 事件。

在关闭连接的时候,当 Channel 从 Reactor 中取消注册之后,是先触发 ChannelInactive 事件后触发 ChannelUnregistered 事件

这里大家还需要注意的一个点是,以上的逻辑会封装在 Runnable 中被提交到 Reactor 的任务队列中延迟执行那么这里为什么要延迟执行呢

这里延后 deRegister 操作的原因是用于处理一种极端的异常情况,前边我们提到 Channel 的 deregister() 操作是可以在用户的 ChannelHandler 中执行的,用户行为是不可预知的。

我们想象一下这样的一个场景:假如当前 pipeline 中还有事件传播(比如正在处理编码解码),与此同时 deregister() 方法可能会在某个事件回调中被用户调用,导致其它事件在传播的过程中,Channel 被从 Reactor 上注销掉了。

并且同时 channel 又注册到新的 Reactor 上。如果此时旧的 Reactor 正在处理 pipeline 上的事件而旧 Reactor 还未处理完的数据理应继续在旧的 Reactor 中处理,如果此时我们立马执行 deRegister ,未处理完的数据就会在新的 Reactor 上处理,这样就会导致一个 handler 被多个 Reactor 线程处理导致线程安全问题。所以需要延后 deRegister 的操作。


到这里呢,关于 netty 如何处理 TCP 连接正常关闭的逻辑,笔者就为大家全部介绍完了,不过还留了一个小小的尾巴,就是当我们未设置 SO_LINGER 选项时,Channel 的关闭操作会直接在 Reactor 线程中执行。closeExecutor 这种情况下会是 null 。

        private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify)
 
{
            
            ...........省略重进入关闭流程处理........

            ...........省略Channel关闭前的准备工作........

            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                ...........省略在closeExecutor中Channel关闭的逻辑........
            } else {
                try {
                    // Close the channel and fail the queued messages in all cases.
                    doClose0(promise);
                } finally {
                    if (outboundBuffer != null) {
                        // Fail all the queued messages.
                        outboundBuffer.failFlushed(cause, notify);
                        outboundBuffer.close(closeCause);
                    }
                }

                // 此时 Channel 已经关闭,如果此时用户还在执行 flush 操作
                // netty 则会在 flush 方法的处理中处理 Channel 关闭的情况
                // 所以这里 deRegister 操作需要延后到 flush 方法处理完之后
                if (inFlush0) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                } else {
                    fireChannelInactiveAndDeregister(wasActive);
                }                 
            }
        }

这里可以看到其实逻辑都是一样的。都是先调用 doClose0 关闭 JDK NIO 底层的 SocketChannel ,然后清理 ChannelOutboundBuffer 中遗留的待发送数据,最后触发 ChannelInactive 事件和 ChannelUnregistered 事件。

image.png

3. TCP 连接的异常关闭

image.png

在本文前边的内容中,我们介绍了 TCP 数据包中的 SYN 包,FIN 包,ACK 包的使用场景,它们都是通过 TCP 首部协议中的 8 位控制位来标识,不同的控制位代表不同的含义。

第二小节介绍的内容均属于 TCP 在正常情况下进行的连接的建立,发送数据,关闭连接。

而现实中情况往往是复杂的,TCP 连接不可能总是处于正常的状态,那么当 TCP 连接出现异常时,就需要有一种机制让我们来强制关闭连接,这个就是本小节要介绍的 RST 包用于异常情况下强制关闭 TCP 连接。

由于 RST 包是用来处理 TCP 连接的异常情况的,所以当本端发送一个 RST 包给对端之后,并不需要对端回复 ACK 确认包。

通讯方不管是发出或者是收到一个 RST 包 ,都会导致内存,端口等连接资源被释放,并且跳过正常的 TCP 四次挥手关闭流程直接强制关闭,Socket 缓冲区的数据来不及处理直接被丢弃。

当通讯端收到一个 RST 包后,如果仍然对 Socket 进行读取,那么就会抛出 connection has been reset by the peer 异常,如果仍然对 Socket 进行写入,就会抛出 broken pipe 异常。应用程序通过这样的方式来感知内核是否收到 RST 包。

发送 RST 强制关闭连接,这将导致之前已经发送但尚未送达的、或是已经进入对端 Socket 接收缓冲区但还未被对端应用程序处理的数据被无条件丢弃,导致对端应用程序可能会出现异常

说了这么多,那么究竟会有哪些场景导致需要发送 RST 来强制关闭连接呢?下面笔者就来为大家一一梳理下:

3.1 TCP 连接队列已满

TCP连接过程.png

我们先根据上面这副图来看一下一个正常的 TCP 连接建立的过程:

  1. 客户端向服务端发送 SYN 包请求建立 TCP 连接。客户端连接状态变为 SYN_SENT 状态。

  2. 服务端收到 SYN 包之后,服务端连接状态变为 SYN_RECV 状态。随后会创建轻量级 request_sock 结构来表示连接信息(里面能唯一确定某个客户端发来的 SYN 的信息),并将这个 request_sock 结构放入 TCP 的半连接队列 SYN_Queue 中,TCP 内核协议栈发送 SYN+ACK 包给客户端。

  3. 客户端的 TCP 内核协议栈收到服务端发送过来的 SYN+ACK 后,随即回复 ACK 包给服务端。此时客户端连接状态变为 ESTANLISHED 状态。

  4. 服务端收到客户端的 ACK 包之后,从半连接队列中查找是否有代表该客户端连接的轻量级 request_sock 结构,如果有,连接状态变为 ESTABLISHED 状态,随后会从半连接队列 SYN-Queue 中将 request_socket 结构取出移动到全连接队列 ACCEPT-Queue 中。

  5. 用户进程的 accpet 系统调用根据监听 Socket 克隆出一个真正的连接 Socket 然后返回。

从 TCP 建立连接的过程我们看到,这里涉及到两个重要的队列,一个存放客户端 SYN 信息的半连接队列 SYN-Queue ,另一个是存放完成三次握手的客户端连接信息的全连接队列 ACCEPT-Queue 。

那么只要是队列它就会有长度的限制,就可能会满。那么在这两个连接队列已满的状况下,又会发生什么情况呢?

3.1.1 半连接队列 SYN-Queue 已满

SYN FLOOD.png

假设现在有大量的客户端在向服务端发送 SYN 包请求建立连接,但是这些客户端比较坏,在收到服务端的 SYN+ACK 包之后就是不回复 ACK 包给服务端,而服务端一直收不到客户端的 ACK 包,所以就会重传 SYN+ACK 包给客户端,重传次数由内核参数 tcp_synack_retries 限制,默认为 5 次。

$ cat /proc/sys/net/ipv4/tcp_synack_retries
5

这 5 次的重传时间间隔为 1s , 2s , 4s , 8s , 16s ,总共 31s ,而第 5 次重传的 SYN+ACK 包发出后还要等 32s 才能知道第 5 次也超时了,所以,总共需要 1s + 2s + 4s+ 8s+ 16s + 32s = 63s ,TCP 才会把断开这个连接,并从半连接队列中移除对应的 request_sock 。

我们可以看到 TCP 内核协议栈需要等待 63s 的时间才能断开这个半连接,假设这 63s 内又有大量的客户端这样子搞事情,那么很快服务端的半连接队列 SYN-Queue 堆积的 request_sock 就会越来越多最终溢出。

当半连接队列溢出之后,再有正常的客户端连接进来之后,内核协议栈默认情况下就会直接丢弃 SYN 包,导致服务端无法处理正常客户端的请求,这就叫做 SYN Flood 攻击。

有一个内核参数 net.ipv4.tcp_syncookies 可以影响内核处理半连接队列溢出时的行为:

  • net.ipv4.tcp_syncookies = 0 :服务端直接丢弃客户端发来的 SYN 包。

  • net.ipv4.tcp_syncookies = 1 :如果此时全连接队列 ACEPT-Queue 也满了,并且 qlen_young 的值大于 1 ,那么直接丢弃 SYN 包,否则就生成 syncookie(一个特别的 sequence number )然后作为 SYN + ACK 包中的序列号返回给客户端。并输出 "possible SYN flooding on port . Sending cookies."。

qlen_young 表示目前半连接队列中,没有进行 SYN+ACK 包重传的连接数量。

随后客户端会在 ACK 包中将这个 syncookie 带上回复给服务端,服务端校验 syncookie ,并根据 syncookie 的信息构造 request_sock 结构放入全连接队列中。

从以上过程我们可以看出在开启 tcp_syncookies 的情况下,服务端利用 syncookie 可以绕过半连接队列从而完成建立连接的过程。我们可以利用这种方式来防御 SYN Flood 攻击。

但是 tcp_syncookies 不适合用在服务端负载很高的场景,因为在启用 tcp_syncookies 的时候,服务端在发送 SYN+ACK 包之前,会要求客户端在短时间内回复一个序号,这个序号包含客户端之前发送 SYN 包内的信息,比如 IP 和端口。

如果客户端回复的这个序号是正确的,那么服务端就认为这个客户端是正常的,随后就会发送携带 syncookie 的 SYN+ACK 包给客户端。如果客户端不回复这个序号或者序号不正确,那么服务端就认为这个客户端是不正常的,直接丢弃连接不理会。

从这个过程中,我们可以看出当启用 tcp_syncookies 的时候,这个建立连接的过程并不是一个正常的 TCP 三次握手的过程,因为服务端在发送 SYN+ACK 包之前还需要等待客户端回复一个序号,这就产生了一定的延迟,所以 tcp_syncookies 不适合用在服务端负载很高的场景,但是一般的负载情况还是比较有效防御 SYN Flood 攻击的方式。

除此之外,我们还可以调整以下内核参数来防御 SYN Flood 攻击

  • 增大半连接队列容量 tcp_max_syn_backlog 。设置比默认 256 更大的一个数值。

  • 减少 SYN+ACK 重试次数 tcp_synack_retries 。

3.1.2 全连接队列 ACCEPT-Queue 已满

当服务端的负载比较大并且从全连接队列中 accept 连接处理的比较慢,同时又有大量新的客户端连接上来的时候,就会导致 TCP 全连接队列溢出。

内核参数 net.ipv4.tcp_abort_on_overflow 会影响内核协议栈处理全连接队列溢出的行为。

当客户端发来三次握手最后一个 ACK 包时,但此时服务端全连接队列已满:

  • 当 tcp_abort_on_overflow = 0 时,服务端内核协议栈会将该连接标记为 acked 状态,但仍保留在 SYN-Queue 中,并开启 SYN+ACK 重传机制。当 SYN+ACK 包的重传次数超过 net.ipv4.tcp_synack_retries 设置的值时,再将该连接从 SYN queue 中删除。但是此时在客户端的视角来说,连接已经建立成功了。客户端并不知道此时 ACK 包已经被服务端所忽略,如果此时向服务端发送数据的话,服务端会回复 RST 给客户端。
全连接队列满0.png
  • 当 tcp_abort_on_overflow = 1 时, 服务端TCP 协议栈直接回复 RST 包,并直接从 SYN-Queue 中删除该连接信息。
全连接队列已满1.png

面对全连接队列溢出的情况,我们需要及时增大全连接队列的长度,而全连接队列的长度由两个参数控制:

  • 内核参数 net.core.somaxconn,默认 128 。

  • listen 系统调用方法参数 backlog 。

int listen(int sockfd, int backlog)

在 Netty 中我们可以通过如下配置指定:

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 全连接队列长度)

全连接队列 ACCEPT-Queue 的长度由 min(backlog, somaxconn) 决定,所以当全连接队列满时,我们需要检查如下设置:

  • 调整内核参数 net.core.somaxconn。
  • 检查应用程序中的 backlog 参数。
  • 设置 tcp_abort_on_overflow = 1 。

3.2 连接未被监听的端口

连接未LISTEN端口RST.png

当客户端 Connect 一个未被监听的远端服务端口,则会收到对端发来的一个 RST 包。

客户端要连接的端口未被监听,有两种情况:

  • 该端口在服务端从来没有应用程序监听过。

  • 服务端监听该端口的应用程序崩溃挂掉了。

3.3 服务端程序崩溃

服务端程序崩溃RST.png

TCP 连接正常的状态下,无论是连接时发送的 SYN ,还是连接建立成功后发送的正常数据包,以及最后关闭连接时发送的 FIN ,都会收到对端的 ACK 确认。

当服务端因为某种原因导致崩溃之后,客户端再次向服务端发送数据,就会收到 RST 。

3.4 开启 SO_LINGER 选项设置 l_linger = 0

RST关闭连接.png

在前边《2.1.3 针对 SO_LINGER 选项的处理》小节我们介绍 SO_LINGER 选项的时候提到过,当我们将选项参数设置为 l_onoff = 1,l_linger = 0 时,当客户端调用 close 方法关闭连接的时候,这时内核协议栈会发出 RST 而不是 FIN 。跳过正常的四次挥手关闭流程直接强制关闭,Socket 缓冲区的数据来不及处理直接丢弃。

3.5 主动关闭方在关闭时 Socket 接收缓冲区还有未处理数据

客户端提前关闭Socket RST.png
  • 主动关闭方在调用 close() 系统调用关闭 Socket 时,内核会检查 Socket 接收缓冲区中是否还有数据未被读取处理,如果有,则直接清空 Socket 接收缓冲区中的未处理数据,并向对端发送 RST 。

  • 如果此时 Socket 接收缓冲区中没有未被处理的数据,内核才会走正常的关闭流程,尝试将 Socket 发送缓冲区中的数据发送出去,然后向对端发送 FIN ,走正常的四次挥手关闭流程。

3.6 主动关闭方 close 关闭但在 FIN_WAIT2 状态接收数据

close关闭但FIN_WAIT2接收数据 RST.png

TCP是一个面向连接的、可靠的、基于字节流的全双工传输层通信协议,既然它是全双工的,那就意味着TCP连接同时有一个读通道和写通道。

image.png

而调用 close() 来关闭连接,意味着会将读写通道同时关闭,之后不能再读取数据。

如果客户端调用 close() 方法关闭连接,而服务端在 CLOSE_WAIT 状态下继续向客户端发送数据,客户端在 FIN_WAIT2 状态下直接会丢弃数据,并发送 RST 给服务端,直接强制关闭连接,也是个暴脾气,哈哈。

4. Netty 对 RST 包的处理

同 TCP 正常关闭收到 FIN 包一样,当服务端收到 RST 包后,OP_READ 事件活跃,Reactor 线程再次来到了 AbstractNioByteChannel#read 方法处理 OP_READ 事件。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

        @Override
        public final void read() {
            final ChannelConfig config = config();

            ..........省略连接半关闭处理........

            ..........省略获取allocHandle过程.......

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    //在读取Channel中的数据时会抛出IOExcetion异常             
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    .........省略.............

                } while (allocHandle.continueReading());

                .........省略.............
    
            } catch (Throwable t) {
                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                 ............省略...............
            }
        }
    }

}

这里和 TCP 正常关闭不同的是,在调用 doReadBytes 方法从 Channel 中读取数据的时候会抛出 IOException 异常。这里会有两种情况抛出异常:

  • 此时Socket接收缓冲区中只有 RST 包,并没有其他正常数据。

  • Socket 接收缓冲区有正常的数据,OP_READ 事件活跃,当调用 doReadBytes 方法从 Channel 中读取数据的过程中,对端发送 RST 强制关闭连接,这时会在读取的过程中抛出 IOException 异常。

当 doReadBytes 方法抛出 IOException 异常后,会被 catch(){...} 语句捕获到,随后在 handleReadException 方法中处理 TCP 异常关闭的情况。

4.1 handleReadException

image.png
        private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                RecvByteBufAllocator.Handle allocHandle)
 
{

            if (byteBuf != null) {
                if (byteBuf.isReadable()) {
                    readPending = false;
                    //如果发生异常时,已经读取到了部分数据,则触发ChannelRead事件
                    pipeline.fireChannelRead(byteBuf);
                } else {
                    byteBuf.release();
                }
            }
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
            pipeline.fireExceptionCaught(cause);

            if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
                closeOnRead(pipeline);
            }
        }

这里可以看出,当服务端接收到 RST 强制关闭连接时,首先会触发 ExceptionCaught 事件在 pipeline 中传播,最终还是会调用到 closeOnRead 方法关闭连接,取消 Channel 注册,并触发 ChannelInactive 事件和 ChannelUnregistered 事件。

当发生异常时,如果已经从 Channel 中读取到了数据,那么也会触发 ChannelRead 事件,随后触发 ChannelReadComplete 事件和 ExceptionCaught 事件。

如果这里大家已经忘记了相关事件的传播处理流程,可以在回顾下这篇文章 一文聊透 Netty IO 事件的编排利器 pipeline

5. TCP 连接半关闭 HalfClosure

TCP 是一个全双工的传输层通信协议,那么我们在关闭 TCP 连接的时候就需要考虑读写这两个通道的关闭。

image.png

之前介绍的关闭流程是主动关闭方调用 close 方法也就是 JDK NIO 中 SocketChannel#Close 方法来发送 FIN 关闭连接。但是 close 方法是同时将读写两个通道全部关闭,也就是说主动关闭方在调用 close 方法以后既不能接收对端的数据也不能向对端发送数据了。

close关闭但FIN_WAIT2接收数据 RST.png

比如:主动关闭方调用 close 方法发出 FIN 开始关闭流程之后,如果在 FIN_WAIT2 状态下收到对端发送过来的数据,那么就会直接丢弃,并发送 RST 给对端强制关闭连接。

那么有没有一种更优雅的关闭方式就是只关闭读写通道其中一个,关闭了写通道就不能发送数据给对端,但是还可以接受对端发送过来的数据。

关闭了读通道,就不能读取对端发送过来的数据,但是还可以向对端写数据。当连接上遗留的数据全部处理完毕后,主动关闭方和被动关闭方在先后调用 close 方法关闭连接释放资源。

这种更加优雅的关闭方式就是本小节我们要讨论的 TCP 连接的半关闭 HalfClosure 。

操作系统内核为我们提供了 shutdown 这样一个系统调用来实现 TCP 连接的半关闭,shutdown 函数可以控制只关闭连接的某一个方向,或者全部关闭。

int shutdown(int sockfd, int how)

参数 sockfd 为将要关闭 Socket 的文件描述符,参数 how 表示关闭连接的哪个方向 ( 关闭读 or 关闭写 or 全部关闭 )。

  • SHUT_RD:表示关闭读通道,如果此时 Socket 接收缓冲区有已接收的数据,则会将这些数据统统丢弃。如果后续再收到新的数据,虽然也会对这些数据进行 ACK 确认,但是会悄悄丢弃掉。所以在这种情况下,对端虽然收到了 ACK 确认,但是这些以发送的数据可能已经被悄悄丢弃了。

关闭读通道的方法在 JDK NIO 中对应于 SocketChannel#shutdownInput() ,这里需要注意的是此方法并不会发送 FIN。

  • SHUT_WR:关闭写通道,这就是本小节的重点,调用该方法发起 TCP 连接的半关闭流程。此时如果 Socket 发送缓冲区还有未发送的数据,则会立即发送出去,并发送一个 FIN 给对端。关闭写通道的方法在 JDK NIO 中对应于 SocketChannel#shutdownOutput()。

  • SHUTRDWR : 同时关闭连接读写两个通道。

在介绍完了 TCP 连接半关闭的系统调用之后,我们接下来看下 TCP 连接半关闭的流程:

TCP连接半关闭.png
  • 首先客户端会调用 shutdownOutput 方法发起半关闭流程,关闭客户端连接的写通道,然后发送 FIN 给服务端。

  • 和我们在《1. 正常 TCP 连接关闭》小节里介绍的流程一样,服务端的内核协议栈在接收到客户端发来的 FIN 后,会自动向客户端回复 ACK 确认,随后内核会将文件结束符 EOF 插入到 Socket 的接收缓冲区中,此时 OP_READ 事件活跃,再一次进入到 AbstractNioByteChannel.NioByteUnsafe#read 方法处理 OP_READ 事件,此时客户端的连接状态为 FIN_WAIT2 ,服务端的连接状态为 CLOSE_WAIT 。

  • 服务端在收到连接半关闭请求后,会立马调用 shutdownInput 关闭自己的读通道。随后在 pipeline 中触发 ChannelInputShutdownEvent 事件,用户可以在该事件中处理遗留的数据,处于 CLOSE_WAIT 状态的服务端可以继续向处于 FIN_WAIT2 状态的客户端继续发送数据。

  • 当 TCP 连接处于半关闭状态的时候,JDK NIO Selector 会不断的通知 OP_READ 事件活跃直到 TCP 连接真正的关闭,所以用户在处理完 ChannelInputShutdownEvent 事件之后,又会立马收到处理 OP_READ 事件的通知,在这次通知中触发 ChannelInputShutdownReadComplete 事件,表示遗留数据已经处理完毕,用户可以在这个事件响应中调用 close 来彻底关闭连接。 此后服务端结束 CLOSE_WAIT 状态进入 LAST_ACK 状态。

  • 客户端收到服务端发送过来的 FIN 后,调用 close 方法注销 Channel ,关闭连接。结束 FIN_WAIT2 状态进入 TIME_WAIT 状态。

6. 主动关闭方发起 TCP 半关闭流程

在 TCP 半关闭的场景下,主动关闭方需要调用 shutdownOutput 方法向被动关闭方发送 FIN 开始 TCP 半关闭流程。

在本小节的示例中,客户端可以在自己的 ChannelHandler 中调用 Channel 的 shutdownOutput 方法来发起 TCP 半关闭流程。

        SocketChannel sc = (SocketChannel) ctx.channel();     
        sc.shutdownOutput();

下面我们就来分析下在 netty 中对于 shutdownOutput 的实现。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    @Override
    public ChannelFuture shutdownOutput() {
        return shutdownOutput(newPromise());
    }

    @Override
    public ChannelFuture shutdownOutput(final ChannelPromise promise) {
        final EventLoop loop = eventLoop();
        if (loop.inEventLoop()) {
            ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
        } else {
            loop.execute(new Runnable() {
                @Override
                public void run() {
                    ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
                }
            });
        }
        return promise;
    }

}

从如上代码中,我们可以看出对于 shutdownOutput 的操作也是必须在 Reactor 线程中完成。

这里大家可以发现 shutdownOutput 半关闭的流程其实和 close 的流程非常的相似。

      private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
            if (!promise.setUncancellable()) {
                return;
            }

            //如果Channel已经close了,直接返回
            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                promise.setFailure(new ClosedChannelException());
                return;
            }
            
            //半关闭状态下,不允许继续写入数据到Socket
            this.outboundBuffer = null

            final Throwable shutdownCause = cause == null ?
                    new ChannelOutputShutdownException("Channel output shutdown") :
                    new ChannelOutputShutdownException("Channel output shutdown", cause);

            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {           
                            // 将jdk nio 底层的Socket shutdown
                            doShutdownOutput();
                            promise.setSuccess();
                        } catch (Throwable err) {
                            promise.setFailure(err);
                        } finally {
                            // Dispatch to the EventLoop
                            eventLoop().execute(new Runnable() {
                                @Override
                                public void run() {
                                    //清理ChannelOutboundBuffer,并触发ChannelOutputShutdownEvent事件
                                    closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // 在 Reactor 线程中执行
                    doShutdownOutput();
                    promise.setSuccess();
                } catch (Throwable err) {
                    promise.setFailure(err);
                } finally {
                    closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                }
            }
        }

一开始都需要通过 ChannelOutboundBuffer 是否为 null 来判断当前 Channel 是否已经关闭了,如果已经关闭,则停止执行后续半关闭流程。

当 shutdownOutput 方法调用之后,主动关闭方连接的写通道就被关闭了,所以在这个状态下是不允许用户继续向 Channel 写入数据的, 所以这里会将 Channel 对应的写入缓冲队列 ChannelOutboundBuffer 设置为 null 。

和前边我们介绍调用 close 方法发起 TCP 连接的正常关闭流程一样,这里也会调用 prepareToClose() 方法来处理设置 SO_LINGER 选项的情况。

     @Override
        protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    doDeregister();
                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {

            }
            return null;
        }

如果 Socket 设置了 SO_LINGER 选项则需要首先将 Channel 注销,后续的半关闭流程需要在 GlobalEventExecutor 线程中执行。否则继续在 Reactor 线程中执行。

关于 prepareToClose() 方法的详细介绍,大家可以回看本文中的《 2.1.3 针对 SO_LINGER 选项的处理》小节

接下来就会调用 doShutdownOutput() 方法关闭底层 JDK NIO SocketChannel 的写通道。此时内核协议栈会向对端发送 FIN 发起 TCP 半关闭流程。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    protected final void doShutdownOutput() throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().shutdownOutput();
        } else {
            javaChannel().socket().shutdownOutput();
        }
    }

}
TCP连接半关闭.png

当半关闭流程发起之后,ShutdownOutput 的核心任务就算结束了,此时就需要设置用户持有的 shutdownOutputPromise 成功,随后用户就会得到通知。

最后在 Reactor 线程中清理 ChannelOutboundBuffer 中的待发送数据,并在 pipeline 中传播 ChannelOutputShutdownEvent 事件。相关的清理细节笔者已经在本文前边相关的章节中详细介绍过了,这里不在重复。

    private void closeOutboundBufferForShutdown(
                ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause)
 
{
            //shutdownOutput半关闭后需要清理channelOutboundBuffer中的待发送数据flushedEntry
            buffer.failFlushed(cause, false);
            //循环清理channelOutboundBuffer中的unflushedEntry
            buffer.close(cause, true);
            pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
        }

ChannelOutputShutdownEvent 是一种 UserEventTriggered 事件,它是 netty  提供的一种事件扩展机制可以允许用户自定义异步事件,这样可以使得用户能够灵活的定义各种复杂场景的处理机制。

UserEventTriggered 也是一种 Inbound 类事件,在 pipeline 中的传播反向也是从前向后传播。

image.png

我们可以在 ChannelHandler 中这样捕获 ChannelOutputShutdownEvent 写通道关闭事件:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (ChannelOutputShutdownEvent.INSTANCE == evt) {
              .......处理写通道关闭事件.........
        }
    }

}

此时主动关闭方已经关闭了写通道,进入 FIN_WAIT2 状态。因为现在读通道还没有关闭,所以在 FIN_WAIT2 状态下还是可以继续接受并处理对端发来的数据的。

理想很美好,现实却很骨感,在本小节中主动关闭方在 FIN_WAIT2 状态下真的可以接收来自对端的数据吗??

大家先可以结合笔者在 《 2.1.3 针对 SO_LINGER 选项的处理》小节中介绍的内容以及本小节介绍的 TCP 写通道关闭流程,对照下面这副图认真思考下这个问题。

TCP连接半关闭.png

7. 啊哈!!Bug !!

在为大家解释这个 Bug 之前,笔者先再次带大家回顾下本文《 2.1.3 针对 SO_LINGER 选项的处理》小节中 prepareToClose 方法的逻辑,它有两个关键点:

  • 当使用了 SO_LINGER 选项后,调用 Socket 的 close 方法会阻塞关闭流程,所以需要将 Socket 的关闭动作放在 GlobalEventExecutor 中执行。

  • 当使用了 SO_LINGER 选项后,为了防止在延迟关闭期间继续处理读写事件,产生不必要的 CPU 资源浪费,所以需要调用 doDeregister() 方法将 Channel 从 Reactor 中注销掉

     @Override
        protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    doDeregister();
                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {

            }
            return null;
        }

这些逻辑在 close 的关闭场景中是合理的,但是在 shutdownOutput 半关闭场景就出问题了。

假设用户在开启了 SO_LINGER 选项的情况下,调用 shutdownOutput 半关闭 TCP 连接,那么用户的本意是只关闭写通道,但是仍然希望在 FIN_WAIT2 状态下接收来自服务端发送过来的数据,实现优雅关闭。

但实际上 netty 在 shutdownOutput 方法中调用了 prepareToClose() 方法从而间接导致了 doDeregister() 方法的调用,Channel 从 Reactor 中注销掉,也就是说从此以后无法在产生 OP_READ 活跃事件无法接收并且处理服务端发送过来的数据。

由于以上原因,如下如图所示,主动关闭方在 FIN_WAIT2 状态下是无法接收到数据的,因为此时 Channel 已经从 Reactor 上注销了。

SO_LINGERHalfClosureBug.png

另外还有一点就是,无论 SO_LINGER 选项是否设置,shutdown 系统调用函数均不会阻塞,这里和 close 系统调用不同。所以这里也并不需要用一个 GlobalEventExecutor 去执行 shutdownOutput 任务,直接在 Reactor 线程中执行即可。

所以综合以上两点原因,在 shutdownOutput 中是不需要调用 prepareToClose() 方法的。

现在我们知道了 Bug 产生的原因,那么修复过程就变的非常简单了~~~

8. 提交 PR ,修复 Bug

笔者首先向 Netty  社区提交了一个 Issue,在 Issue 中详细为社区人员描述了这个 Bug 产生的原因。也就是上一小节中的内容。

Issue : https://github.com/netty/netty/issues/11981

image.png
SO_LINGERHalfClosureBug.png

随后笔者按照《7. 啊哈!!Bug !!》小节中介绍的修复思路为这个 Issue 提交了修复 PR ,

PR  :https://github.com/netty/netty/pull/11982

PR.png

笔者修复后的 ShutdownOutput 流程逻辑如下:

image.png

编写单元测试,然后信心满满地等待 PR 被 Merged。

public class SocketHalfClosedTest extends AbstractSocketTest {

    @Test
    @Timeout(value = 5000, unit = MILLISECONDS)
    public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
        run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
            @Override
            public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
                testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
            }
        });
    }

    private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
            throws Throwable 
{
        Channel serverChannel = null;
        Channel clientChannel = null;

        final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
        try {
            sb.childOption(ChannelOption.SO_LINGER, 1)
              .childHandler(new ChannelInitializer<Channel>() {

                  @Override
                  protected void initChannel(Channel ch) throws Exception {
                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {

                            @Override
                            public void channelActive(final ChannelHandlerContext ctx) {
                                SocketChannel channel = (SocketChannel) ctx.channel();
                                channel.shutdownOutput();
                            }

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ReferenceCountUtil.release(msg);
                                waitHalfClosureDone.countDown();
                            }
                        });
                  }
              });

            cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
              .handler(new ChannelInitializer<Channel>() {
                  @Override
                  protected void initChannel(Channel ch) throws Exception {
                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {

                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                                if (ChannelInputShutdownEvent.INSTANCE == evt) {
                                    ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
                                }

                                if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
                                    ctx.close();
                                }
                            }
                        });
                  }
              });

            serverChannel = sb.bind().sync().channel();
            clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
            waitHalfClosureDone.await();
        } finally {
            if (clientChannel != null) {
                clientChannel.close().sync();
            }

            if (serverChannel != null) {
                serverChannel.close().sync();
            }
        }
    }
}

还是那句话 “理想很丰满,现实很骨感”,Netty 作为一个世界知名的高性能开源框架,必定有着非常严格的代码规范。比如:

  • 代码书写规范:函数与函数之间的空行个数,单行代码的长度,函数命名的长度, .... 等。

  • 注释的规范:单行注释的长度,字符与字符之间的空格,...... 等。

  • 单元测试规范。

PR 提交过去也是出现了很多规范类的 CheckStyle 错误,也是经过了多轮 Review 和多轮修改最终通过了 Netty 的 CI 流程被 Merged 进主干分支。并在 Netty 的 4.1.73.Final 中发布。

image.png
image.png

在  4.1.73.Final  版本发布之后,笔者第一时间拉下来最新的代码,看到 Git 记录中出现了自己的名字,想象着自己的代码跑在了各大知名框架中,还是很有成就感的一件事。

文章封面.png

9. 被动关闭方处理TCP半关闭流程

TCP连接半关闭.png

当主动关闭方调用 shutdownOutput 后,内核会检查此时 Socket 发送缓冲区是否还有数据,如果有就将数据发送出去,并关闭 Socket 的写通道,随后发送 FIN 给对端。

接下来的流程和《1. 正常 TCP 连接关闭》小节中的流程一样,服务端 OP_READ 事件活跃,Reactor 线程开始处理 OP_READ 事件。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

        @Override
        public final void read() {
            final ChannelConfig config = config();

            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }

            ..........省略获取allocHandle过程.......

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    //记录本次读取了多少字节数
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    //如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询
                    // -1 表示客户端主动关闭了连接close或者shutdownOutput 这里均会返回-1
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        //当客户端主动关闭连接时(客户端发送fin1),会触发read就绪事件,这里从channel读取的数据会是-1
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    .........省略.............

                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    //此时客户端发送fin1(fi_wait_1状态)主动关闭连接,服务端接收到fin,并回复ack进入close_wait状态                    
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                 ............省略...............
            } finally {
                 ............省略...............
            }
        }
    }

}

这里通过 doReadBytes 方法从 Channel 中读取数据依然返回 -1 。随后又会进入 closeOnRead 方法处理半关闭逻辑。

9.1 closeOnRead

        private void closeOnRead(ChannelPipeline pipeline) {    
            if (!isInputShutdown0()) {
                if (isAllowHalfClosure(config())) {             
                    shutdownInput();
                    pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
                } else {
                       .....省略正常关闭....
                }
            } else {
                .....省略....
            }
        }

首先会调用 isInputShutdown0 方法判断服务端 Channel 的读通道是否已经关闭,现在客户端 Channel 的写通道已经关闭,但此时服务端才刚开始处理半关闭,所以现在服务端 Channel 读写通道都还没有关闭。

    @Override
    public boolean isInputShutdown() {
        return javaChannel().socket().isInputShutdown() || !isActive();
    }

随后判断服务端是否支持半关闭 isAllowHalfClosure。

   private static boolean isAllowHalfClosure(ChannelConfig config) {
        return config instanceof SocketChannelConfig &&
                ((SocketChannelConfig) config).isAllowHalfClosure();
    }

可通过如下配置开启半关闭的支持:

    ServerBootstrap sb = new ServerBootstrap();
    sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, true)                        

如果服务端开启了半关闭的支持 isAllowHalfClosure == true ,下面就正式进入了半关闭的处理流程:

  1. 调用 shutdownInput 方法关闭服务端 Channel 的读通道,如果此时 Socket 接收缓冲区还有数据,则会将这些数据统统丢弃。注意关闭读通道并不会向对端发送 FIN ,此时服务端连接依然处于 CLOSE_WAIT 状态。
    private void shutdownInput0() throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            //调用底层JDK socketChannel关闭接收方向的通道
            javaChannel().shutdownInput();
        } else {
            javaChannel().socket().shutdownInput();
        }
    }
  1. 在 pipeline 中触发 ChannelInputShutdownEvent 事件,我们可以在 ChannelInputShutdownEvent 事件的回调方法中,向客户端发送遗留的数据,做到真正的优雅关闭。这里就是图中处于 CLOSE_WAIT 状态下的服务端在半关闭场景下可以继续向处于 FIN_WAIT2 状态下的客户端发送数据的地方
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (ChannelInputShutdownEvent.INSTANCE == evt) {
            //在close_wait状态下,发送数据给对端
            ctx.writeAndFlush(message);
        }
    }

}

在连接半关闭的情况下,JDK NIO Selector 会不停的通知 OP_READ 事件活跃,所以 read loop 会一直不停的执行,当 Reactor 处理完 ChannelInputShutdownEvent 之后,由于 Selector 又会通知 OP_READ 事件活跃,所以半关闭流程再一次来到了 closeOnRead 方法。

        //表示Input已经shutdown了,再次对channel进行读取返回-1  设置该标志
        private boolean inputClosedSeenErrorOnRead;

        private void closeOnRead(ChannelPipeline pipeline) {    
            if (!isInputShutdown0()) {
                if (isAllowHalfClosure(config())) {             
                       .....省略半关闭.....
                } else {
                       .....省略正常关闭....
                }
            } else {
                inputClosedSeenErrorOnRead = true;
                pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
            }
        }

那么此时服务端的读通道已经关闭了 isInputShutdown0 == true 。所以流程来到 else 分支。

  • 设置 inputClosedSeenErrorOnRead = true 表示此时 Channel 的读通道已经关闭了,不能再继续响应 OP_READ 事件,因为半关闭状态下,Selector 会不停的通知 OP_READ 事件,如果不停无脑响应的话,会造成极大的 CPU 资源的浪费。

不过 JDK 这样处理也是合理的,毕竟半关闭状态连接并没有完全关闭,只要连接没有完全关闭,就不停的通知你,直到关闭连接为止。

  • 在 pipeline 中触发 ChannelInputShutdownReadComplete 事件,此事件的触发标志着服务端在 CLOSE_WAIT 状态下已经将所有遗留的数据发送给了客户端,服务端可以在该事件的回调中关闭 Channel ,结束 CLOSE_WAIT 进入 LAST_ACK 状态。
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (ChannelInputShutdownReadComplete.INSTANCE == evt) {      
            ctx.close();
        }
    }

因为半关闭的状态下,在没有调用 close 方法关闭 Channel 之前,JDK NIO Selector 会一直不停的通知 OP_READ 事件,所以流程马上又会回到 OP_READ 事件的处理方法中。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

        @Override
        public final void read() {
            final ChannelConfig config = config();

            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }

            ..........省略获取allocHandle过程.......

            try {
                do {
                          .........省略.............

                } while (allocHandle.continueReading());

               .........省略.............
            } catch (Throwable t) {
                 ............省略...............
            } finally {
                 ............省略...............
            }
        }
    }

}

那么这次我们就不能在响应 OP_READ 事件了,需要调用 clearReadPending 方法将读事件从 Reactor 中取消掉,停止对 OP_READ 事件的监听。否则 Reactor 线程就会在半关闭期间内一直在这里空转,导致 CPU 100%。

这里的 shouldBreakReadReady 方法就是用来判断在半关闭期间是否取消 OP_READ 事件的监听。这里的 inputClosedSeenErrorOnRead 已经设置为 true 了。

   final boolean shouldBreakReadReady(ChannelConfig config) {
        return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
    }

到这里为止,netty 关于连接关闭所要面对的所有处理场景,笔者就为大家一一介绍完了。


总结

本文我们介绍了 netty 在面对 TCP 连接关闭时的三种处理场景时的处理逻辑和过程。

这三种处理场景分别是:TCP 连接的正常关闭,TCP 连接的异常关闭,以及用于优雅关闭的 TCP 连接的半关闭。同时我们也发现了 netty 关于半关闭处理时的一个 BUG 。

BUG :https://github.com/netty/netty/issues/11981

这个 Bug 导致主动关闭方在 FIN_WAIT2 状态下无法接受到来自被动关闭方在 CLOSE_WAIT 状态下发送的数据。随后又详细分析了这个 Bug 的整个修复过程。

其中我们还穿插介绍了 SO_LINGER 选项对于 TCP 连接关闭行为的影响,以及 netty 针对 SO_LINGER 选项的处理过程。

同时笔者还为大家列举了关于导致 TCP 连接异常关闭的 7 种场景:

  1. 半连接队列 SYN-Queue 已满

  2. 全连接队列 ACCEPT-Queue 已满

  3. 连接未被监听的端口

  4. 服务端程序崩溃

  5. 开启 SO_LINGER 选项设置 l_linger = 0

  6. 主动关闭方在关闭时 Socket 接收缓冲区还有未处理数据

  7. 主动关闭方 close 关闭但在 FIN_WAIT2 状态接收数据

以及 Netty 对 RST 包的处理流程。最后笔者还介绍了用于连接半关闭的系统调用 shutdown 的使用方法,以及 netty 对连接半关闭的流程处理逻辑。

其中笔者还详细对比了 shutdown 系统调用和 close 系统调用的区别与联系。它们在调用之后都会向对端发送 FIN 包。但是在设置 SO_LINGER 选项的时候 close 系统调用会阻塞,shutdown 系统调用则不会阻塞。

TCP连接半关闭.png

最后笔者需要特别强调的是在我们使用 shutdown 进行 TCP 连接的半关闭时,作为连接的被动关闭方,在最后一定要记得调用 close 方法来彻底关闭连接,并释放连接相关资源。否则被动关闭方就会一直停留在 CLOSE_WAIT 状态。

而作为主动关闭方在 FIN_WAIT2 状态下接收到来自被动关闭方在 CLOSE_WAIT 状态下发送的 FIN 之后,记得要释放客户端的资源。

好了,本文的内容就到这里,感谢大家收看到这里,我们下篇文章见~~~

浏览 32
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报