选择器Selector于SelectableChannel进行联合使用,可以达到非阻塞效果
Selector实现了I/O通道多路复用的效果,节省CPU资源
选择器(Selector)与I/O多路复用
多路复用与非多路复用的直观区别如下图
非多路复用
- 场景
需要创建多个线程对象,每个线程对应一个通道,在对应的通道中进行处理 - 缺陷
在高并发情况下有N个连接,需要创建N个线程对象,增大内存开销,CPU频繁切换上下文也会增加CPU开销。1:1
I/O多路复用
- 场景
将N个通道注册进选择器Selector中,一个线程可以对选择器中已注册的N个通道进行选择,处理。1:N - 核心
多路复用的核心是使用最少的线程去操作最多的通道 - 线程数量
根据通道的数量来决定的,JDK中每注册1023个通道就会创建一个新的线程 - 如何选择
操作系统底层进行通知,告诉JVM哪个channel的数据需要处理,效率大大提高,而不是线程通过死循环判断确定是否需要调用channel
多路复用的Selector、SelectableChannel、SelectionKey关系
- Selector、SelectableChannel、SelectionKey均为abstract抽象类
- 只有SelectableChannel的子类才可以注册到Selector的子类
- SelectionKey为标志,代表SelectableChannel向Selector注册的关系
由于FileChannel不是SelectableChannel的子类,所以不能向选择器中注册
AsynchronousChannel异步、InterruptibleChannel可异步关闭
SelectableChannel继承了AbstractInterruptibleChannel类
- 实现AsynchronousChannel
一个线程在一个能被中断的通道上出现了阻塞,其他线程调用close方法时这个阻塞状态的线程会收到AsynchronousCloseException - 实现InterruptibleChannel
一个线程在一个能被中断的通道上出现了阻塞,其他线程调用interrupt方法时,通道会被关闭,这个阻塞状态的线程会收到ClosedByInterruptException
AbstractInterruptibleChannel
封装了能实现异步关闭和中断所需要的最低级别的机制,在调用I/O操作之前和之后应分别调用begin和end方法
这2个方法为protect,供他的实现子类调用使用的
SelectableChannel
abstract抽象类,子类很多,只有该类才能在选择器中进行注册,常用的类
分别对应UDP、TCP的3个Socket类为DatagramChannelImpl、ServerSocketChannelImpl、SocketChannelImpl
- 使用register方法向Selector中注册返回一个SelectionKey对象。
- 只能在任意特定选择器上注册一次
- 并发安全
- 可设置阻塞还是非阻塞模式,非阻塞模式下I/O操作永远不会阻塞
- 新创建的SelectableChannel处于阻塞模式,多路复用情况下必须将其模式置为非阻塞模式
NetworkChannel
和网络Socket有关的Channel需要实现此接口,例如DatagramChannelImpl、ServerSocketChannelImpl、SocketChannelImpl等
提供了Socket相关的bind、getLocalAddress等方法
ServerSocketChannel
ServerSocketChannel的statci方法open()会创建ServerSocketChannel实例,在调用accept之前还需要bind绑定Address,不然会抛出异常
Java代码实现多路复用
通过Selector、SelectableChannel、SelectionKey的联合使用达到I/O多路复用的目的
获得ServerSocketChannel
由于ServerSocketChannel需要注册,所以不能直接通过new ServerSocket得到server,这样无法实现多路复用
- 实现过程
1.ServerSocketChannel的open获得ServerSocketChannel实例
2.通过实例获得SeverSocket
3.获得SeverSocket实例后再进行绑定Address
//初步实现ServerSocketChannel
class AA {
private static void server() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8080));
Socket socket = serverSocket.accept();
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[24];
while (inputStream.read(bytes) != -1) {
System.out.println("server收到client发送的消息:"+new String(bytes, StandardCharsets.UTF_8));
}
inputStream.close();
socket.close();
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void client() {
try {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080));
Socket socket = socketChannel.socket();
OutputStream outputStream = socket.getOutputStream();
outputStream.write("一二三四五!!!".getBytes(StandardCharsets.UTF_8));
outputStream.close();
socket.close();
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(AA::server).start();
new Thread(AA::client).start();
}
}
ServerSocketChannel的accept方法阻塞与非阻塞
ServerSocket的accept是阻塞的
- ServerSocketChannel的accept可以设置阻塞与非阻塞。当设置为非阻塞时,如果不存在挂起的连接则直接返回null
- 调用configureBlocking(boolean block)设置true为阻塞,false为非阻塞
- 调用isBlocking() 返回是否为阻塞Channel
//阻塞及非阻塞accept
class AB{
private static void serverA() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("A"+serverSocketChannel.isBlocking());
SocketChannel accept = serverSocketChannel.accept();
System.out.println("A"+serverSocketChannel.isBlocking());
} catch (IOException e) {
e.printStackTrace();
}
}
private static void serverB() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8081));
serverSocketChannel.configureBlocking(false);
System.out.println("B"+serverSocketChannel.isBlocking());
SocketChannel accept = serverSocketChannel.accept();
System.out.println(accept);
System.out.println("B"+serverSocketChannel.isBlocking());
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(AB::serverA).start();
new Thread(AB::serverB).start();
}
}
结果如下,B为false,调用accept方法之后并没有阻塞,且返回的对象为null。而A一直在阻塞在accept,没有继续执行剩余方法
- ServerSocketChannel类的accept()返回的是SocketChannel是SocketChannel为SelectableChannel子类可以注册到Selector
获得Selector对象
通道Channel需要注册到选择器Selector上,使用Selector抽象类的的static方法open可以获得一个选择器
public static Selector open() throws IOException
注册register获得SelectionKey对象及判断是否注册
- 注册
public final SelectionKey register(Selector sel, int ops)
throws ClosedChannelException
用给定的选择器注册这个通道,返回一个选择键,ops为通道感兴趣的时间,即通道可以执行的操作集合
如果要将Channel注册到Selector中,必须先将Channel设置成非阻塞
//register
class BB{
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println(selector);
System.out.println(selectionKey);
}
}
- 判断是否注册
public abstract boolean isRegistered();
根据Selecotr获得SelectionKey
一个SelectableChannel可以注册多个Selecotr,返回多个SelectionKey
可以通过keyFor查找对应的SelectionKey
public abstract SelectionKey keyFor(Selector sel);
//获得selectorkey
class BD{
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println(selectionKey);
Selector selector1 = Selector.open();
SelectionKey selectionKey1 = serverSocketChannel.register(selector1, SelectionKey.OP_ACCEPT);
System.out.println(selectionKey1);
System.out.println(serverSocketChannel.keyFor(selector)+"\n"+serverSocketChannel.keyFor(selector1));
}
}
结果如下
SelectorProvider
选择器和可选通道的服务提供者类。默认为单例
//获得SelectorProvider
class DE{
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
SelectorProvider provider = SelectorProvider.provider();
SelectorProvider provider1 = serverSocketChannel.provider();
System.out.println(provider);
System.out.println(provider1);
System.out.println(provider==provider1);
}
}
客户端执行SocketChannel执行阻塞与非阻塞Connect连接
- 设置是否阻塞
SelectableChannel configureBlocking(boolean block)
-
阻塞情况下
1.调用connect方法后立即发起连接(阻塞之后的操作)
2.根据连接的情况返回true或抛出异常 -
非阻塞情况下
1.调用connect方法后不立即发起连接,直接返回false(不阻塞之后的操作)
2.调用finishConnect回调是否连接成功返回true,正在连接返回false,连接失败抛出异常
boolean finishConnect() throws IOException
class CA {
private static void server() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.close();
serverSocketChannel.close();
System.out.println("server");
} catch (IOException e) {
e.printStackTrace();
}
}
//阻塞
private static void blockClient() {
long l = System.currentTimeMillis();
try (SocketChannel socketChannel = SocketChannel.open()) {
System.out.println("阻塞"+socketChannel.connect(new InetSocketAddress(8080)));
System.out.println("阻塞无异常花费时间:" + (System.currentTimeMillis() - l));
} catch (IOException e) {
e.printStackTrace();
System.out.println("阻塞异常花费时间:" + (System.currentTimeMillis() - l));
}
}
//非阻塞
private static void noBlockClient() {
long l = System.currentTimeMillis();
try (SocketChannel socketChannel = SocketChannel.open()) {
socketChannel.configureBlocking(false);
System.out.println("非阻塞第一次"+socketChannel.connect(new InetSocketAddress(8080)));
System.out.println("非阻塞无异常花费时间:" + (System.currentTimeMillis() - l));
System.out.println("非阻塞第二次"+socketChannel.finishConnect());
} catch (IOException e) {
e.printStackTrace();
System.out.println("非阻塞异常花费时间:" + (System.currentTimeMillis() - l));
}
}
public static void main(String[] args) throws IOException {
new Thread(CA::server).start();
new Thread(CA::blockClient).start();
new Thread(CA::noBlockClient).start();
}
}
- 模拟失败,注释服务端,只保留客户端
如下图
1.阻塞直接抛出异常
2.非阻塞首先返回flse,调用回调方法抛出异常
- 模拟成功
1.阻塞直接返回true
2.非阻塞直接返回false,调用回调方法返回true
boolean isConnectionPending()查看是否正在连接
当且仅当一个连接操作已经启动了这一通道上,但尚未通过调用完成finishConnect方法
//客户端阻塞与非阻塞connect
class CB {
private static void server() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.close();
serverSocketChannel.close();
System.out.println("server");
} catch (IOException e) {
e.printStackTrace();
}
}
//阻塞
private static void blockClient() {
long l = System.currentTimeMillis();
try (SocketChannel socketChannel = SocketChannel.open()) {
System.out.println("阻塞"+socketChannel.connect(new InetSocketAddress("www.baidu.com",8080)));
System.out.println("阻塞是否正在连接"+socketChannel.isConnectionPending());
System.out.println("阻塞无异常花费时间:" + (System.currentTimeMillis() - l));
} catch (IOException e) {
e.printStackTrace();
System.out.println("阻塞异常花费时间:" + (System.currentTimeMillis() - l));
}
}
//非阻塞
private static void noBlockClient() {
long l = System.currentTimeMillis();
try (SocketChannel socketChannel = SocketChannel.open()) {
socketChannel.configureBlocking(false);
System.out.println("非阻塞第一次"+socketChannel.connect(new InetSocketAddress("www.baidu.com",8080)));
System.out.println("非阻塞是否正在连接"+socketChannel.isConnectionPending());
System.out.println("非阻塞无异常花费时间:" + (System.currentTimeMillis() - l));
System.out.println("非阻塞第二次"+socketChannel.finishConnect());
} catch (IOException e) {
e.printStackTrace();
System.out.println("非阻塞异常花费时间:" + (System.currentTimeMillis() - l));
}
}
public static void main(String[] args) throws IOException {
// new Thread(CB::server).start();
new Thread(CB::blockClient).start();
new Thread(CB::noBlockClient).start();
}
}
- 结果如下图
1.阻塞的连接严重影响了代码的执行效率
2.非阻塞调用后因为正在连接所以返回true
boolean finishConnect() 连接socket
- 连接失败直接抛出IOException
- 如果此通道已连接,则此方法不会阻塞并立即返回true
- 阻塞
此方法将阻塞,直到连接完成或失败,并且将始终返回true或抛出描述失败的已检查异常 - 非阻塞
则如果连接过程尚未完成,则此方法将返回false 。 如果此通道处于阻塞模式
Selector类
Selector的实现根据不同的平台实现机制不同,由于不需要CPU主动轮询查询,所以效率较高
API及说明
- 打开一个选择器
根据不同的平台有不同的实现
Selector open() throws IOException
- 判断selector是否open
boolean isOpen()
- 返回provider实现
SelectorProvider provider()
- 返回此选择器的键集SelectionKey
SelectionKey即channel注册到此select是添加的key
Set<SelectionKey> keys()
- 返回此选择器已经select方法之后的键集
Set<SelectionKey> selectedKeys()
- 选择一组键,其对应的通道已准备好进行 I/O 操作。非阻塞,如果不存在立即返回0
int selectNow() throws IOException
- 选择一组键,其对应的通道已准备好进行 I/O 操作。阻塞选择操作。
它仅在至少选择了一个通道、调用此选择器的wakeup方法、当前线程被中断或给定的超时时间到期后才返回,以先到者为准。
int select(long timeout) throws IOException
- 选择一组键,其对应的通道已准备好进行 I/O 操作。执行阻塞选择操作。
只有在至少选择了一个通道、调用此选择器的wakeup方法或当前线程被中断(以先到者为准)后才返回。
简而言之,返回已经更新的key的个数
abstract int select() throws IOException
- 使尚未返回的第一个选择操作立即返回。
Selector wakeup()
- 关闭此选择器
仍然与此选择器关联的任何未取消的键都将失效,它们的通道被取消注册,并且与此选择器关联的任何其他资源都将被释放。
void close() throws IOException
SelectionKey
表示SelectableChannel注册到Selector令牌。
其内部有常量
这些常量进行了移位运算。用于设置注册时的监听事件
- OP_ACCEPT 套接字接受操作的操作设置位
- OP_CONNECT 套接字连接操作的操作设置位
- OP_READ 读取操作的操作设置位
- OP_WRITE 写操作的操作设置位
select方法具有阻塞特性及其他特性
- 验证阻塞
//select阻塞
class DA {
private static void server() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
//设置监听感兴趣的事件为连接
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server1 before" + System.currentTimeMillis());
selector.select();
System.out.println("Server1 after" + System.currentTimeMillis());
} catch (IOException e) {
e.printStackTrace();
}
}
private static void client() {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
System.out.println("client " + System.currentTimeMillis());
Thread.sleep(10000);
System.out.println("client开始连接 " + System.currentTimeMillis());
socketChannel.connect(new InetSocketAddress(8080));
System.out.println("client " + socketChannel.finishConnect());
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(DA::server).start();
new Thread(DA::client).start();
}
}
如下图,只有当client发送请求之后才会继续
- 接收到的selector,需要accept,否则不会阻塞,一直返回
//accept处理
class DB {
//不进行处理
private static void server1() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
//设置监听感兴趣的事件为连接
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("Server1 before" + System.currentTimeMillis());
selector.select();
System.out.println("Server1 after" + System.currentTimeMillis());
}
} catch (IOException e) {
e.printStackTrace();
}
}
//进行处理
private static void server2() {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8081));
Selector selector = Selector.open();
//设置监听感兴趣的事件为连接
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("Server2 before" + System.currentTimeMillis());
int select = selector.select();
System.out.println("Server2 after" + System.currentTimeMillis());
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
ServerSocketChannel channel = (ServerSocketChannel) next.channel();
channel.accept();
channel.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void client(int port) {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
System.out.println("client " + System.currentTimeMillis());
Thread.sleep(10000);
System.out.println("client开始连接 " + System.currentTimeMillis());
socketChannel.connect(new InetSocketAddress(port));
System.out.println("client " + socketChannel.finishConnect());
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(DB::server1).start();
new Thread(DB::server2).start();
new Thread(() -> client(8080)).start();
new Thread(() -> client(8081)).start();
}
}
如下图server1一直在循环打印,二server已经阻塞
Comments | 0 条评论