Selector.select()

共 6087字,需浏览 13分钟

 ·

2021-12-13 16:09

Netty的底层依然是依赖于JDK的NIO . 开发NIO服务端的代码如下所示


import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;import java.util.Set;
public class Server {

// 缓冲区的大小 private static final int BUFFER_SIZE = 1024;
// 缓冲区 private static ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
// 选择器 private static Selector selector = null;


public static void main(String[] args) {

ServerSocketChannel serverSocketChannel;
try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080), 64); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
for (;;) {
int readyChannels = selector.select(); if (readyChannels == 0) { continue; }
Set selectedKeys = selector.selectedKeys();
Iterator iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); handleKey(key); iterator.remove(); }
}
} catch (Exception ignored) {
}
}
// 处理SelectionKey private static void handleKey(SelectionKey key) throws IOException { // 是否有连接进来 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = server.accept(); // SocketChannel通道的可读事件注册到Selector中 registerChannel(selector, socketChannel, SelectionKey.OP_READ); // 连接成功 向Client打个招呼 if (socketChannel.isConnected()) { buffer.clear(); buffer.put("I am Server...".getBytes()); buffer.flip(); socketChannel.write(buffer);
}
} // 通道的可读事件就绪 if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.clear(); // 清空缓冲区 // 读取数据 int len = 0; while ((len = socketChannel.read(buffer)) > 0) { buffer.flip(); while (buffer.hasRemaining()) { System.out.println("Server读取的数据:" + new String(buffer.array(), 0, len)); } } if (len < 0) { // 非法的SelectionKey 关闭Channel socketChannel.close(); } // SocketChannel通道的可写事件注册到Selector中 registerChannel(selector, socketChannel, SelectionKey.OP_WRITE); } // 通道的可写事件就绪 if (key.isWritable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); buffer.clear(); // 清空缓冲区 // 准备发送的数据 String message_from_server = "Hello,Client... " + socketChannel.getLocalAddress(); buffer.put(message_from_server.getBytes()); buffer.flip(); socketChannel.write(buffer); System.out.println("Server发送的数据:" + message_from_server); // SocketChannel通道的可写事件注册到Selector中 registerChannel(selector, socketChannel, SelectionKey.OP_READ); } }

private static void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException { if (channel == null) { return; } channel.configureBlocking(false); channel.register(selector, ops); }
}




本篇文章就来讲解下selector.select()的功能 .


个人认为, 好多功能都是按照三部曲来实现的  

1.生产一个冰箱 

2.把大象装进冰箱 

3.把大象从冰箱取出来


1.生产一个冰箱


在调用Selector.open()的时候, 底层会创建各种属性和数据结构,用于存储相关信息 .


// 源码位置 java.nio.channels.Selector#openpublic static Selector open() throws IOException {    return SelectorProvider.provider().openSelector();}


在Windows平台返回 WindowsSelectorImpl 对象 , 在Linux平台返回 EPollSelectorImpl 对象 . 这里以EPollSelectorImpl分析 .



在Windows平台 , 通过跟踪open()源码的方式, 看不到sun.nio.ch.EPollSelectorImpl 这个类, 可以在Linux平台 或者 直接下载JDK源码 或者互联网搜索EPollSelectorImpl都可以看到这个类 .


EPollSelectorImpl 继承 SelectorImpl , 在 EPollSelectorImpl 内部有个EPollArrayWrapper类 , EPollArrayWrapper内部就是关于epoll相关的操作 .



IO多路复用的实现方式有 select, poll, epoll .       

epoll 主要涉及三个方法: epoll_create,  epoll_ctl, epoll_wait


个人认为, 要想学好Java, 依然要对C语言, 包括一些系统调用了解或熟悉.








在实例化EPollSelectorImpl的时候, 创建了 

Set selectedKeys ,  

Map fdToKey,   

byte[] eventsLow 或 Map eventsHigh 等重要属性 .



我们不必在意这些属性'散落'在哪些类里, 我们更关注的是, 实例化EPollSelectorImpl的时候 会 创建一些集合等属性对象, 用于存储数据. 这就是在生产一个冰箱, 为后面存储数据使用.



而且还会创建一个堆外内存的pollArray对象, 这个对象用于接收内核返回的可读写的文件描述符. 因为在进行调用epoll_wait的时候, 需要给内核传递一个对象, 内核会将已经准备就绪的文件描述符填充到这个对象.


通过man epoll_wait查看



第二个参数 struct epoll_event *events 是一个传出参数, 而pollArray对象就会传到这个参数上 .


所有与内核交互的对象, 必须是堆外内存的对象 .


2.把大象装进冰箱


在我们自己的代码中 会通过 

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) 注册感兴趣的事件 .


文章后面会有一个函数之间调用的总图


// 源码位置 EPollSelectorImpl#implRegisterprotected void implRegister(SelectionKeyImpl ski) {    if (closed)        throw new ClosedSelectorException();    SelChImpl ch = ski.channel;    int fd = Integer.valueOf(ch.getFDVal());    // 关系    fdToKey.put(fd, ski);    pollWrapper.add(fd);    keys.add(ski);}


会将的对应关系存储到 Map fdToKey 集合中, 假如我们有个6号文件描述符 fd=6, 把它存储到 fdToKey中,  即<6, SelectionKeyImpl>这样的关系 .    当6号文件描述符有数据进来的时候, 调用epoll_wait的时候, 内核就会把6号文件描述符返回给用户态, 我们再根据<6, SelectionKeyImpl>这个关系,就能找到这个SelectionKeyImpl 了 .


在调用register方法的时候, 不仅会存储的对应关系, 还会将所有的fd存储到 int[] updateDescriptors 中,  也会将 的关系存储到byte[] eventsLow 或 Map eventsHigh中.


一句话, 在上面我们已经生产了一个冰箱, 在这里, 我们把数据(也就是大象)放进这个冰箱里面.



3.把大象从冰箱取出来


selector.select()


关键代码最终会调用到EPollArrayWrapper 这个类里的方法.


会将之前上一步的文件描述符和对应的事件, 通过epoll_ctl系统调用, 放到epoll的红黑树上.


最终会调用到epoll_wait系统调用函数, 如果有文件描述符就绪, 就将对应的文件描述符放到堆外内存的pollArray对象上.


用户态拿到pollArray对象之后, 通过遍历, 根据fd从fdToKey中将SelectionKeyImpl 放到 Set selectedKeys集合中, 用户态在调用selector.selectedKeys()的时候, 就会将selectedKeys集合返回 . 这样我们的业务代码就拿到了 selectedKeys集合, 进行后续操作处理.


关于函数之间的调用如下图, 具体也可以查看

https://www.yuque.com/infuq/default/wy8fap#cGAYr






浏览 36
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报