深入剖析 Netty 源码,4 步全链路实现客户端启动
- 前言 -
创建一个SocketChannel 连接到服务方端口 将SocketChannel设置为非阻塞的 将SocketChannel注册到selector上
- 客户端引导代码 -
创建一个EventLoopGroup,与服务端不同的是,这里不需要创建bossGroup,因为客户端不需要处理新连接的接入,所以这里的eventLoopGroup的作用相当于服务端的workerGroup。 创建一个BootStrap并将eventLoopGroup传入,设置channel为NioSocketChannel(对应jdk SocketChannel)。 设置一个Channel参数和一个Channel属性。 配置一个handler,这个handler里我们什么也没做,仅仅是打印一些事件日志。 调用bootstrap.connect方法连接到服务端。运行这段程序,将在控制台打印出如下结果:
HandlerAdded
ChannelRegistered
ChannelActive
/**
* 欢迎关注公众号“种代码“,获取博主微信深入交流
*
* @author wangjianxin
*/
public class com.zhongdaima.netty.analysis.bootstrap.ClientBoot {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
//设置Channel参数
.option(ChannelOption.TCP_NODELAY, true)
//设置Channel属性
.attr(AttributeKey.valueOf("ChannelName"), "ClientChannel")
.handler(new ChannelInboundHandlerAdapter(){
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelRegistered");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelActive");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("HandlerAdded");
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8000);
Channel channel = channelFuture.syncUninterruptibly().channel();
channel.closeFuture().awaitUninterruptibly();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
- 启动过程 -
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
return doResolveAndConnect(remoteAddress, config.localAddress());
}
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//关键逻辑
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
//关键逻辑
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
//关键逻辑
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
}
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
1、新创建一个NioSocketChannel
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//设置为非阻塞的
ch.configureBlocking(false);
} catch (IOException e) {
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
2、初始化Channel
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
//添加引导代码中所设置的handler
p.addLast(config.handler());
//设置Channel参数
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
//设置Channel属性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
3、注册Channel
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
chooser = chooserFactory.newChooser(children);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//绑定eventloop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
//此时我们不在EventLoop内,也就是当前线程非EventLoop线程,会走到这个分支
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
//调用子类的register0方法
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
private void register0(ChannelPromise promise) {
try {
//向Selector注册Channel
doRegister();
//去完成那些在绑定EventLoop之前触发的添加handler操作,这些操作被放在pipeline中的pendingHandlerCallbackHead中,是个链表,具体请参考`DefaultChannelPipeLine`的`addLast(EventExecutorGroup group, String name, ChannelHandler handler)`方法。
pipeline.invokeHandlerAddedIfNeeded();
//将promise设置为成功的
safeSetSuccess(promise);
//触发ChannelRegistered事件
pipeline.fireChannelRegistered();
//这里并没有Active,因为此时还没建立连接
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
4、连接到服务端
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
doConnect(remoteAddress, localAddress, promise);
return promise;
}
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
promise.setFailure(resolveFailureCause);
} else {
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation. If the connection is established immediately, as can happen with a local connection, then this method returns true. Otherwise this method returns false and the connection operation must later be completed by invoking the finishConnect method.
如果该channel在非阻塞模式下,调用该方法将初始化一个非阻塞的连接操作。如果连接立即完成了,例如可能发生在一个本地连接的情况下,该方法返回true。否则该方法将返回false,随后必须调用finishConnect方法完成连接。
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
//多数情况下connected为false
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
boolean active = isActive();
boolean promiseSet = promise.trySuccess();
if (!wasActive && active) {
pipeline().fireChannelActive();
}
if (!promiseSet) {
close(voidPromise());
}
}
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
@Override
public final void finishConnect() {
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
} finally {
}
}
@Override
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
- 总结 -
创建一个Channel实例,这个过程中将Channel设置为非阻塞的,为Channel创建了PipeLine和Unsafe。 初始化Channel,为Channel添加引导代码中所设置的handler,并设置参数和属性。 注册Channel,为Channel绑定一个EventLoop并向Selector注册Channel。 调用connect方法连接到服务端,如果connect返回false将在Channel发生OP_CONNECT事件时调用finishiConnect方法完成连接。
作者:王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。
来源公众号:种代码