Java NIO Selector 详解
Selector 允许一个单一的线程来操作多个 Channel,如果我们的应用程序中使用了多个 Channel,那么使用 Selector 很方便的实现这样的目的,但是因为在一个线程中使用了多个 Channel,因此也会造成了每个 Channel 传输效率的降低。
使用 Selector 的图解如下:
为了使用 Selector,我们首先需要将 Channel 注册到 Selector 中,随后调用 Selector 的 select()方法,这个方法会阻塞,直到注册在 Selector 中的 Channel 发送可读写事件。当这个方法返回后,当前的这个线程就可以处理 Channel 的事件了。
创建选择器
通过 Selector.open()方法, 我们可以创建一个选择器:
Selector selector = Selector.open();
将 Channel 注册到选择器中
为了使用选择器管理 Channel,我们需要将 Channel 注册到选择器中:
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
注意,如果一个 Channel 要注册到 Selector 中,那么这个 Channel 必须是非阻塞的,即channel.configureBlocking(false); 因为 Channel 必须要是非阻塞的,因此 FileChannel 是不能够使用选择器的,因为 FileChannel 都是阻塞的.
注意到,在使用 Channel.register()方法时,第二个参数指定了我们对 Channel 的什么类型的事件感兴趣,这些事件有:
- Connect,即连接事件(TCP 连接), 对应于SelectionKey.OP_CONNECT。
- Accept,即确认事件,对应于SelectionKey.OP_ACCEPT。
- Read,即读事件,对应于SelectionKey.OP_READ, 表示 buffer 可读。
- Write,即写事件,对应于SelectionKey.OP_WRITE, 表示 buffer 可写。
一个 Channel发出一个事件也可以称为对于某个事件, Channel 准备好了。因此一个 Channel 成功连接到了另一个服务器也可以被称为 connect ready。
我们可以使用或运算|来组合多个事件, 例如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
注意:一个 Channel 仅仅可以被注册到一个 Selector 一次,如果将 Channel 注册到 Selector 多次,那么其实就是相当于更新 SelectionKey 的 interest set. 例如:
channel.register(selector, SelectionKey.OP_READ);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
上面的 channel 注册到同一个 Selector 两次了,那么第二次的注册其实就是相当于更新这个 Channel 的 interest set 为 SelectionKey.OP_READ | SelectionKey.OP_WRITE。
关于 SelectionKey
如上所示, 当我们使用 register 注册一个 Channel 时, 会返回一个 SelectionKey 对象, 这个对象包含了如下内容:
interest set, 即我们感兴趣的事件集, 即在调用 register 注册 channel 时所设置的 interest set.ready set.channel.selector.attached object, 可选的附加对象
interest set
我们可以通过如下方式获取 interest set:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
ready set
代表了 Channel 所准备好了的操作.
我们可以像判断 interest set 一样操作 Ready set, 但是我们还可以使用如下方法进行判断:
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel 和 Selector
我们可以通过 SelectionKey 获取相对应的 Channel 和 Selector:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
Attaching Object
我们可以在selectionKey中附加一个对象:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
或者在注册时直接附加:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
通过 Selector 选择 Channel
我们可以通过 Selector.select()方法获取对某件事件准备好了的 Channel, 即如果我们在注册 Channel 时, 对其的可写事件感兴趣, 那么当 select()返回时, 我们就可以获取 Channel 了.
注意:select()方法返回的值表示有多少个 Channel 可操作.
获取可操作的 Channel
如果 select()方法返回值表示有多个 Channel 准备好了, 那么我们可以通过 Selected key set 访问这个 Channel:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
注意,在每次迭代时, 我们都调用 "keyIterator.remove()" 将这个 key 从迭代器中删除, 因为 select() 方法仅仅是简单地将就绪的 IO 操作放到 selectedKeys 集合中, 因此如果我们从 selectedKeys 获取到一个 key, 但是没有将它删除, 那么下一次 select 时, 这个 key 所对应的 IO 事件还在 selectedKeys 中。
例如此时我们收到 OP_ACCEPT 通知, 然后我们进行相关处理, 但是并没有将这个 Key 从 SelectedKeys 中删除, 那么下一次 select() 返回时 我们还可以在 SelectedKeys 中获取到 OP_ACCEPT 的 key。
注意,我们可以动态更改 SekectedKeys 中的 key 的 interest set. 例如在 OP_ACCEPT 中, 我们可以将 interest set 更新为 OP_READ, 这样 Selector 就会将这个 Channel 的 读 IO 就绪事件包含进来了。
Selector 的基本使用流程
- 通过 Selector.open() 打开一个 Selector.
- 将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)
- 不断重复: *从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)
*判断是哪些 IO 事件已经就绪了, 然后处理它们. 如果是 OP_ACCEPT 事件, 则调用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.
*根据需要更改 selected key 的监听事件.
*将已经处理过的 key 从 selected keys 集合中删除.
调用 select() 方法
调用 selector.selectedKeys() 获取 selected keys
迭代每个 selected key:
关闭 Selector
当调用了 Selector.close()方法时, 我们其实是关闭了 Selector 本身并且将所有的 SelectionKey 失效, 但是并不会关闭 Channel.
完整的 Selector 例子
public class NioEchoServer {
private static final int BUF_SIZE = 256;
private static final int TIMEOUT = 3000;
public static void main(String args[]) throws Exception {
// 打开服务端 Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 打开 Selector
Selector selector = Selector.open();
// 服务端 Socket 监听8080端口, 并配置为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 将 channel 注册到 selector 中.
// 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
// 注册到 Selector 中.
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
if (selector.select(TIMEOUT) == 0) {
System.out.print(".");
continue;
}
// 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
keyIterator.remove();
if (key.isAcceptable()) {
// 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
// 代表客户端的连接
// 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
// 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
//在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
// 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
}
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = clientChannel.read(buf);
if (bytesRead == -1) {
clientChannel.close();
} else if (bytesRead > 0) {
key.interestOps(OP_READ | SelectionKey.OP_WRITE);
System.out.println("Get data length: " + bytesRead);
}
}
if (key.isValid() && key.isWritable()) {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel clientChannel = (SocketChannel) key.channel();
clientChannel.write(buf);
if (!buf.hasRemaining()) {
key.interestOps(OP_READ);
}
buf.compact();
}
}
}
}
}
转载源:SegmentFault
- 3杂再破市场行情 6位数结拍
- 将永久存储添加到Red Hat CDK Kit 3.0
- ASP.NET MVC的Razor引擎:RazorView
- 三分钟学会 Java 单元测试
- 革了短信的命之后,微信开始把枪口对准了应用市场
- 建构微服务的第一步: 微服务哪里来?
- 最新机器学习必备十大入门算法!都在这里了
- ASP.NET MVC的Razor引擎:IoC在View激活过程中的应用
- 深度学习笔记:深度学习在计算机视觉的应用
- 快速添加永久存储到到Minishift / CDK 3
- 张小龙发布2018微信全新计划(内附演讲全文)
- 使用JClouds在Java中获取和发布云服务器
- 利用ASP.NET SiteMap生成与Bootstrap"兼容"菜单
- 埃隆·马斯克强烈推荐的5本书,看完之后他开始改变世界
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- Electron + Vue跨平台桌面应用开发实战教程(一)
- java_抽象类、接口、多态的使用
- Electron + Vue跨平台桌面应用开发实战教程(二)
- Oracle参数解析(nls_length_semantics)
- 创建单页
- Electron + Vue跨平台桌面应用开发实战教程(三)
- ES6箭头函数中this指向谁?
- 谈谈HTML中锚点及其使用
- java_数据类型转换、运算符
- JavaScript中字符串运算符是什么?有哪些?
- Docker日常使用方式
- 使用Python爬取动态网页-豆瓣电影(JSON)
- Linux安装配置PHPmyadmin
- Angular Service依赖注入的一个具体例子
- php学习day1