选择器Selector于SelectableChannel进行联合使用,可以达到非阻塞效果
Selector实现了I/O通道多路复用的效果,节省CPU资源

选择器(Selector)与I/O多路复用

多路复用与非多路复用的直观区别如下图
image.png

非多路复用

  • 场景
    需要创建多个线程对象,每个线程对应一个通道,在对应的通道中进行处理
  • 缺陷
    在高并发情况下有N个连接,需要创建N个线程对象,增大内存开销,CPU频繁切换上下文也会增加CPU开销。1:1

I/O多路复用

  • 场景
    将N个通道注册进选择器Selector中,一个线程可以对选择器中已注册的N个通道进行选择,处理。1:N
  • 核心
    多路复用的核心是使用最少的线程去操作最多的通道
  • 线程数量
    根据通道的数量来决定的,JDK中每注册1023个通道就会创建一个新的线程
  • 如何选择
    操作系统底层进行通知,告诉JVM哪个channel的数据需要处理,效率大大提高,而不是线程通过死循环判断确定是否需要调用channel

多路复用的Selector、SelectableChannel、SelectionKey关系

  • Selector、SelectableChannel、SelectionKey均为abstract抽象类
  • 只有SelectableChannel的子类才可以注册到Selector的子类
  • SelectionKey为标志,代表SelectableChannel向Selector注册的关系
    image.png
    由于FileChannel不是SelectableChannel的子类,所以不能向选择器中注册

AsynchronousChannel异步、InterruptibleChannel可异步关闭

SelectableChannel继承了AbstractInterruptibleChannel类
image.png

  • 实现AsynchronousChannel
    一个线程在一个能被中断的通道上出现了阻塞,其他线程调用close方法时这个阻塞状态的线程会收到AsynchronousCloseException
  • 实现InterruptibleChannel
    一个线程在一个能被中断的通道上出现了阻塞,其他线程调用interrupt方法时,通道会被关闭,这个阻塞状态的线程会收到ClosedByInterruptException

AbstractInterruptibleChannel

封装了能实现异步关闭和中断所需要的最低级别的机制,在调用I/O操作之前和之后应分别调用begin和end方法
image.png
image.png
这2个方法为protect,供他的实现子类调用使用的

SelectableChannel

abstract抽象类,子类很多,只有该类才能在选择器中进行注册,常用的类
分别对应UDP、TCP的3个Socket类为DatagramChannelImpl、ServerSocketChannelImpl、SocketChannelImpl
image.png

  • 使用register方法向Selector中注册返回一个SelectionKey对象。
  • 只能在任意特定选择器上注册一次
  • 并发安全
  • 可设置阻塞还是非阻塞模式,非阻塞模式下I/O操作永远不会阻塞
  • 新创建的SelectableChannel处于阻塞模式,多路复用情况下必须将其模式置为非阻塞模式

NetworkChannel

和网络Socket有关的Channel需要实现此接口,例如DatagramChannelImpl、ServerSocketChannelImpl、SocketChannelImpl等
提供了Socket相关的bind、getLocalAddress等方法

ServerSocketChannel

ServerSocketChannel的statci方法open()会创建ServerSocketChannel实例,在调用accept之前还需要bind绑定Address,不然会抛出异常

Java代码实现多路复用

通过Selector、SelectableChannel、SelectionKey的联合使用达到I/O多路复用的目的

获得ServerSocketChannel

由于ServerSocketChannel需要注册,所以不能直接通过new ServerSocket得到server,这样无法实现多路复用

  • 实现过程
    1.ServerSocketChannel的open获得ServerSocketChannel实例
    2.通过实例获得SeverSocket
    3.获得SeverSocket实例后再进行绑定Address
//初步实现ServerSocketChannel
class AA {
    private static void server() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            ServerSocket serverSocket = serverSocketChannel.socket();
            serverSocket.bind(new InetSocketAddress(8080));
            Socket socket = serverSocket.accept();
            InputStream inputStream = socket.getInputStream();
            byte[] bytes = new byte[24];
            while (inputStream.read(bytes) != -1) {
                System.out.println("server收到client发送的消息:"+new String(bytes, StandardCharsets.UTF_8));
            }
            inputStream.close();
            socket.close();
            serverSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void client() {
        try {
            SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080));
            Socket socket = socketChannel.socket();
            OutputStream outputStream = socket.getOutputStream();
            outputStream.write("一二三四五!!!".getBytes(StandardCharsets.UTF_8));
            outputStream.close();
            socket.close();
            socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
    new Thread(AA::server).start();
    new Thread(AA::client).start();
    }
}

image.png

ServerSocketChannel的accept方法阻塞与非阻塞

ServerSocket的accept是阻塞的

  • ServerSocketChannel的accept可以设置阻塞与非阻塞。当设置为非阻塞时,如果不存在挂起的连接则直接返回null
  • 调用configureBlocking(boolean block)设置true为阻塞,false为非阻塞
  • 调用isBlocking() 返回是否为阻塞Channel
//阻塞及非阻塞accept
class AB{
    private static void serverA() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            System.out.println("A"+serverSocketChannel.isBlocking());
            SocketChannel accept = serverSocketChannel.accept();
            System.out.println("A"+serverSocketChannel.isBlocking());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void serverB() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(8081));
            serverSocketChannel.configureBlocking(false);
            System.out.println("B"+serverSocketChannel.isBlocking());
            SocketChannel accept = serverSocketChannel.accept();
            System.out.println(accept);
            System.out.println("B"+serverSocketChannel.isBlocking());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        new Thread(AB::serverA).start();
        new Thread(AB::serverB).start();
    }
}

结果如下,B为false,调用accept方法之后并没有阻塞,且返回的对象为null。而A一直在阻塞在accept,没有继续执行剩余方法
image.png

  • ServerSocketChannel类的accept()返回的是SocketChannel是SocketChannel为SelectableChannel子类可以注册到Selector
    image.png

获得Selector对象

通道Channel需要注册到选择器Selector上,使用Selector抽象类的的static方法open可以获得一个选择器

public static Selector open() throws IOException

注册register获得SelectionKey对象及判断是否注册

  • 注册
public final SelectionKey register(Selector sel, int ops)
        throws ClosedChannelException

用给定的选择器注册这个通道,返回一个选择键,ops为通道感兴趣的时间,即通道可以执行的操作集合
如果要将Channel注册到Selector中,必须先将Channel设置成非阻塞

//register
class BB{
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8080));
        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println(selector);
        System.out.println(selectionKey);
    }
}

image.png

  • 判断是否注册
    public abstract boolean isRegistered();

根据Selecotr获得SelectionKey

一个SelectableChannel可以注册多个Selecotr,返回多个SelectionKey
可以通过keyFor查找对应的SelectionKey

    public abstract SelectionKey keyFor(Selector sel);
//获得selectorkey
class BD{
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println(selectionKey);
        Selector selector1 = Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel.register(selector1, SelectionKey.OP_ACCEPT);
        System.out.println(selectionKey1);
        System.out.println(serverSocketChannel.keyFor(selector)+"\n"+serverSocketChannel.keyFor(selector1));
    }
}

结果如下
image.png

SelectorProvider

选择器和可选通道的服务提供者类。默认为单例
image.png

//获得SelectorProvider
class DE{
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        SelectorProvider provider = SelectorProvider.provider();
        SelectorProvider provider1 = serverSocketChannel.provider();
        System.out.println(provider);
        System.out.println(provider1);
        System.out.println(provider==provider1);
    }
}

image.png

客户端执行SocketChannel执行阻塞与非阻塞Connect连接

  • 设置是否阻塞
SelectableChannel configureBlocking(boolean block)
  • 阻塞情况下
    1.调用connect方法后立即发起连接(阻塞之后的操作)
    2.根据连接的情况返回true或抛出异常

  • 非阻塞情况下
    1.调用connect方法后不立即发起连接,直接返回false(不阻塞之后的操作)
    2.调用finishConnect回调是否连接成功返回true,正在连接返回false,连接失败抛出异常

boolean finishConnect() throws IOException
class CA {
    private static void server() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8080));
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.close();
            serverSocketChannel.close();
            System.out.println("server");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //阻塞
    private static void blockClient() {
        long l = System.currentTimeMillis();
        try (SocketChannel socketChannel = SocketChannel.open()) {
            System.out.println("阻塞"+socketChannel.connect(new InetSocketAddress(8080)));
            System.out.println("阻塞无异常花费时间:" + (System.currentTimeMillis() - l));
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("阻塞异常花费时间:" + (System.currentTimeMillis() - l));
        }
    }
    //非阻塞
    private static void noBlockClient() {
        long l = System.currentTimeMillis();
        try (SocketChannel socketChannel = SocketChannel.open()) {
            socketChannel.configureBlocking(false);
            System.out.println("非阻塞第一次"+socketChannel.connect(new InetSocketAddress(8080)));
            System.out.println("非阻塞无异常花费时间:" + (System.currentTimeMillis() - l));
            System.out.println("非阻塞第二次"+socketChannel.finishConnect());
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("非阻塞异常花费时间:" + (System.currentTimeMillis() - l));
        }
    }
    public static void main(String[] args) throws IOException {
        new Thread(CA::server).start();
        new Thread(CA::blockClient).start();
        new Thread(CA::noBlockClient).start();
    }
}
  • 模拟失败,注释服务端,只保留客户端
    如下图
    1.阻塞直接抛出异常
    2.非阻塞首先返回flse,调用回调方法抛出异常
    image.png
  • 模拟成功
    1.阻塞直接返回true
    2.非阻塞直接返回false,调用回调方法返回true

image.png

boolean isConnectionPending()查看是否正在连接

当且仅当一个连接操作已经启动了这一通道上,但尚未通过调用完成finishConnect方法

//客户端阻塞与非阻塞connect
class CB {
    private static void server() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8080));
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.close();
            serverSocketChannel.close();
            System.out.println("server");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //阻塞
    private static void blockClient() {
        long l = System.currentTimeMillis();
        try (SocketChannel socketChannel = SocketChannel.open()) {
            System.out.println("阻塞"+socketChannel.connect(new InetSocketAddress("www.baidu.com",8080)));
            System.out.println("阻塞是否正在连接"+socketChannel.isConnectionPending());
            System.out.println("阻塞无异常花费时间:" + (System.currentTimeMillis() - l));
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("阻塞异常花费时间:" + (System.currentTimeMillis() - l));
        }
    }
    //非阻塞
    private static void noBlockClient() {
        long l = System.currentTimeMillis();
        try (SocketChannel socketChannel = SocketChannel.open()) {
            socketChannel.configureBlocking(false);
            System.out.println("非阻塞第一次"+socketChannel.connect(new InetSocketAddress("www.baidu.com",8080)));
            System.out.println("非阻塞是否正在连接"+socketChannel.isConnectionPending());
            System.out.println("非阻塞无异常花费时间:" + (System.currentTimeMillis() - l));
            System.out.println("非阻塞第二次"+socketChannel.finishConnect());
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("非阻塞异常花费时间:" + (System.currentTimeMillis() - l));
        }
    }
    public static void main(String[] args) throws IOException {
//        new Thread(CB::server).start();
        new Thread(CB::blockClient).start();
        new Thread(CB::noBlockClient).start();
    }
}
  • 结果如下图
    1.阻塞的连接严重影响了代码的执行效率
    2.非阻塞调用后因为正在连接所以返回true
    image.png

boolean finishConnect() 连接socket

  • 连接失败直接抛出IOException
  • 如果此通道已连接,则此方法不会阻塞并立即返回true
  • 阻塞
    此方法将阻塞,直到连接完成或失败,并且将始终返回true或抛出描述失败的已检查异常
  • 非阻塞
    则如果连接过程尚未完成,则此方法将返回false 。 如果此通道处于阻塞模式

Selector类

Selector的实现根据不同的平台实现机制不同,由于不需要CPU主动轮询查询,所以效率较高
image.png

API及说明

image.png

  • 打开一个选择器
    根据不同的平台有不同的实现
Selector open() throws IOException 
  • 判断selector是否open
boolean isOpen()
  • 返回provider实现
SelectorProvider provider()
  • 返回此选择器的键集SelectionKey
    SelectionKey即channel注册到此select是添加的key
Set<SelectionKey> keys()
  • 返回此选择器已经select方法之后的键集
Set<SelectionKey> selectedKeys()
  • 选择一组键,其对应的通道已准备好进行 I/O 操作。非阻塞,如果不存在立即返回0
int selectNow() throws IOException
  • 选择一组键,其对应的通道已准备好进行 I/O 操作。阻塞选择操作。
    它仅在至少选择了一个通道、调用此选择器的wakeup方法、当前线程被中断或给定的超时时间到期后才返回,以先到者为准。
int select(long timeout)  throws IOException
  • 选择一组键,其对应的通道已准备好进行 I/O 操作。执行阻塞选择操作。
    只有在至少选择了一个通道、调用此选择器的wakeup方法或当前线程被中断(以先到者为准)后才返回。
    简而言之,返回已经更新的key的个数
abstract int select() throws IOException
  • 使尚未返回的第一个选择操作立即返回。
Selector wakeup()
  • 关闭此选择器
    仍然与此选择器关联的任何未取消的键都将失效,它们的通道被取消注册,并且与此选择器关联的任何其他资源都将被释放。
void close() throws IOException

SelectionKey

表示SelectableChannel注册到Selector令牌。
其内部有常量
image.png
这些常量进行了移位运算。用于设置注册时的监听事件

  • OP_ACCEPT 套接字接受操作的操作设置位
  • OP_CONNECT 套接字连接操作的操作设置位
  • OP_READ 读取操作的操作设置位
  • OP_WRITE 写操作的操作设置位

select方法具有阻塞特性及其他特性

  • 验证阻塞
//select阻塞
class DA {
    private static void server() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(8080));
            Selector selector = Selector.open();
            //设置监听感兴趣的事件为连接
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("Server1 before" + System.currentTimeMillis());
            selector.select();
            System.out.println("Server1 after" + System.currentTimeMillis());
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    private static void client() {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            System.out.println("client " + System.currentTimeMillis());
            Thread.sleep(10000);
            System.out.println("client开始连接 " + System.currentTimeMillis());
            socketChannel.connect(new InetSocketAddress(8080));
            System.out.println("client " + socketChannel.finishConnect());
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        new Thread(DA::server).start();
        new Thread(DA::client).start();
    }
}

如下图,只有当client发送请求之后才会继续
image.png

  • 接收到的selector,需要accept,否则不会阻塞,一直返回
//accept处理
class DB {
    //不进行处理
    private static void server1() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(8080));
            Selector selector = Selector.open();
            //设置监听感兴趣的事件为连接
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                System.out.println("Server1 before" + System.currentTimeMillis());
                selector.select();
                System.out.println("Server1 after" + System.currentTimeMillis());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    //进行处理
    private static void server2() {
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(8081));
            Selector selector = Selector.open();
            //设置监听感兴趣的事件为连接
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                System.out.println("Server2 before" + System.currentTimeMillis());
                int select = selector.select();
                System.out.println("Server2 after" + System.currentTimeMillis());
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey next = iterator.next();
                    ServerSocketChannel channel = (ServerSocketChannel) next.channel();
                    channel.accept();
                    channel.close();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    private static void client(int port) {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            System.out.println("client " + System.currentTimeMillis());
            Thread.sleep(10000);
            System.out.println("client开始连接 " + System.currentTimeMillis());
            socketChannel.connect(new InetSocketAddress(port));
            System.out.println("client " + socketChannel.finishConnect());
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }

    }
    public static void main(String[] args) {
        new Thread(DB::server1).start();
        new Thread(DB::server2).start();
        new Thread(() -> client(8080)).start();
        new Thread(() -> client(8081)).start();
    }
}

如下图server1一直在循环打印,二server已经阻塞
image.png







这个家伙很懒,啥也没有留下😋