JAVA NIO Socket通道

时间:2022-07-24
本文章向大家介绍JAVA NIO Socket通道,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

DatagramChannel和SocketChannel都实现定义读写功能,ServerSocketChannel不实现,只负责监听传入的连接,并建立新的SocketChannel,本身不传输数据。

Socket通道被实例化时都会创建一个对等的socket,通过此方式创建的socket都会有关联的通道,通过getChannel()获取。

继承于 SelectableChannel,所以socket可以在非阻塞模式下运行:

Readiness Selection:就绪选择,查询通道的机制,该机制可以判断通道是否准备好执行下一个目标操作(读,写...),其价值在于潜在的大量通道可以同时进行就绪检查,真正的就绪选择需要由操作系统来做,处理IO请求,并通知各个线程数据准备情况。

Selector选择器:提供了这种抽象(抽象接口),是的Java代码能够以可移植的方式,请求底层操作系统提供这种服务。

Selector选择器类:管理着一个被注册的通道集合的信息和他们的状态,通道和选择器是一起被注册的,并且使用选择器来更新通道状态。

一个通道可以被注册到多个选择器上,但在每个选择器上,只能注册一次。

SelectionKey选择键:封装了通道和选择器的注册关系,选择键被SelectableChannel.register()返回并提供标识这种注册关系的标记。

通道在被注册到选择器之前必须设置为noblocking模式,正常状态。

chanel.register(selector, keystate):通道注册选择器。

selector.select():阻塞操作,直到某一个channel的keystate发生。

selectionKey.cancel(),取消注册关系。

通道关闭,相关的注册键会自动取消,选择器关闭,则所有注册到该选择器的通道都将被注销,并且相关的键会立刻失效。

selectionkey包含两个以整数型式进行编码的比特掩码,一个用于指示那些通道和选择器组合所关心的操作,另一个表示通道准备好要执行的操作。当前的interest集合可以通过调用见对象的interestOps()方法来获取,且永远不会被选择器改变,但可以调用interestOps()方法,传入一个新的比特码来改变。

readyOpts()获取相关通道的已就绪的操作,ready集合是interest集合的子集,表示从上次调用select()之后已经就绪的操作。如下:

if((key.readOps() & SelctionKey.OP_READ) != 0){

buffer.clear();

key.channel().read(buffer);

do()....

}

附加参数:attach()

SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX, paramObj);

等价:

SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX);

key.attach(paramObj);

SelectionKey 多线程应用同步问题。

选择器:

Selector上的已注册键集合中,会存在失效键、null,keys()返回,不可修改。

已选择键集合,selectedKeys()返回,已经准备好的键集合,可能为空。

核心:选择过程,是对select(),poll(),epoll()等本地调用(native call)或者类似的操作系统的本地调用的包装(抽象),期间,将执行以下过程:

  1. 已取消的键的集合将会被检查,如果非空,则会被从其它两个集合(已注册、已选择)移除,相关通道将会被注销,键被清空。
  2. 已注册键的集合的键的interest集合将会被检查,就绪条件确定,底层操作系统对通道所关心的操作的就绪状态进行检查,如果没有,则阻塞当前(超时值)。 对于已经就绪的通道执行: a. 如果通道的键未在已选择的键集合中,那么键的reay集合将会被清空,然后设置当前准备好的比特掩码。 b. 如果通道的键已在已选择的键集合中,键的ready集合更新。不再就绪的状态不会被清除。
  3. select返回的是从上一次select()调用之后进入就绪状态的通道数量,之前的调用中已经就绪的,并且本次调用中仍然就绪的不会被计入。

使用内部已取消的键的集合来延迟注销,防止线程在取消键时阻塞及与正在进行的选择操作冲突的优化,

三种形式的select: select(), select(timeout),selectNow()(非阻塞,立刻返回当前状况)。

调用 Selector 对象的 wakeup( )方法将使得选择器上的第一个还没有返回的选择操作立即回。如果当前没有在进行中的选择,那么下一次对 select( )方法的一种形式的调用将立即返回。后续的选择操作将正常进行。在选择操作之间多次调用 wakeup( )方法与调用它一次没有什么不同。有时这种延迟的唤醒行为并不是您想要的。您可能只想唤醒一个睡眠中的线程,而使得后续的

选择继续正常地进行。您可以通过在调用 wakeup( )方法后调用 selectNow( )方法来绕过这个问题。

通常的做法是在选择器上调用一次 select 操作(这将更新已选择的键的集合),然后遍历 selectKeys( )方法返回的键的集合。在按顺序进行检查每个键的过程中,相关的通道也根据键的就绪集合进行处理。然后键将从已选择的键的集合中被移除(通过在 Iterator对象上调用 remove( )方法),然后检查下一个键。完成后,通过再次调用 select( )方法重复这个循环。如下:

package org.windwant.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by windwant on 2016/10/27.
 */
public class SocketChannelOpt {

    private static final String HOST = "localhost";
    private static final int PORT = 8888;

    private static ExecutorService read = Executors.newFixedThreadPool(5);
    private static ExecutorService write = Executors.newFixedThreadPool(5);

    public static void main(String[] args){
        ServerSocketChannel serverSocketChannel = null;
        ServerSocket serverSocket = null;
        Selector selector = null;
        try {
            serverSocketChannel = ServerSocketChannel.open();//工厂方法创建ServerSocketChannel
            serverSocket = serverSocketChannel.socket(); //获取channel对应的ServerSocket
            serverSocket.bind(new InetSocketAddress(HOST, PORT)); //绑定地址
            serverSocketChannel.configureBlocking(false); //设置ServerSocketChannel非阻塞模式
            selector = Selector.open();//工厂方法创建Selector
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注册选择器,接受连接就绪状态。
            while (true){//循环检查
                if(selector.select() == 0){//阻塞检查,当有就绪状态发生,返回键集合
                    continue;
                }

                Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //获取就绪键遍历对象。
                while (it.hasNext()){
                    SelectionKey selectionKey = it.next();
                    //处理就绪状态
                    if (selectionKey.isAcceptable()){
                        ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只负责监听,阻塞,管理,不发送、接收数据
                        SocketChannel socketChannel = schannel.accept();//就绪后的操作,刚到达的socket句柄
                        if(null == socketChannel){
                            continue;
                        }
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ); //告知选择器关心的通道,准备好读数据
                    }else if(selectionKey.isReadable()){
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);

                        StringBuilder result = new StringBuilder();
                        while (socketChannel.read(byteBuffer) > 0){//确保读完
                            byteBuffer.flip();
                            result.append(new String(byteBuffer.array()));
                            byteBuffer.clear();//每次清空 对应上面flip()
                        }

                        System.out.println("server receive: " + result.toString());
                        socketChannel.register(selector, SelectionKey.OP_WRITE);

                    }else if(selectionKey.isWritable()){
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        String sendStr = "server send data: " + Math.random();
                        ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
                        while (send.hasRemaining()){
                            socketChannel.write(send);
                        }
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println(sendStr);
                    }
                    it.remove();
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Selector多线程执行,同步需求。

一个线程监控通道的就绪状态,一个线程池处理业务需求。线程池也可以扩展为不同的业务处理线程池,如日志、业务、心跳。

package org.windwant.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 线程处理读取,写出
 * Created by windwant on 2016/10/27.
 */
public class TSocketChannelOpt {

    private static final String HOST = "localhost";
    private static final int PORT = 8888;

    private static ExecutorService read = Executors.newFixedThreadPool(5);
    private static ExecutorService write = Executors.newFixedThreadPool(5);

    public static void main(String[] args){
        ServerSocketChannel serverSocketChannel = null;
        ServerSocket serverSocket = null;
        Selector selector = null;
        try {
            serverSocketChannel = ServerSocketChannel.open();//工厂方法创建ServerSocketChannel
            serverSocket = serverSocketChannel.socket(); //获取channel对应的ServerSocket
            serverSocket.bind(new InetSocketAddress(HOST, PORT)); //绑定地址
            serverSocketChannel.configureBlocking(false); //设置ServerSocketChannel非阻塞模式
            selector = Selector.open();//工厂方法创建Selector
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注册选择器,接受连接就绪状态。
            while (true){//循环检查
                if(selector.select() == 0){//阻塞检查,当有就绪状态发生,返回键集合
                    continue;
                }

                Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //获取就绪键遍历对象。
                while (it.hasNext()){
                    SelectionKey selectionKey = it.next();
                    it.remove();
                    //处理就绪状态
                    if (selectionKey.isAcceptable()){
                        ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只负责监听,阻塞,管理,不发送、接收数据
                        SocketChannel socketChannel = schannel.accept();//就绪后的操作,刚到达的socket句柄
                        if(null == socketChannel){
                            continue;
                        }
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ); //告知选择器关心的通道,准备好读数据
                    }else if(selectionKey.isReadable()){
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        read.execute(new MyReadRunnable(socketChannel));

//                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//                        ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
//
//                        StringBuilder result = new StringBuilder();
//                        while (socketChannel.read(byteBuffer) > 0){//确保读完
//                            byteBuffer.flip();
//                            result.append(new String(byteBuffer.array()));
//                            byteBuffer.clear();//每次清空 对应上面flip()
//                        }
//
//                        System.out.println("server receive: " + result.toString());
                        socketChannel.register(selector, SelectionKey.OP_WRITE);

                    }else if(selectionKey.isWritable()){
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        write.execute(new MyWriteRunnable(socketChannel));
//                        String sendStr = "server send data: " + Math.random();
//                        ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
//                        while (send.hasRemaining()){
//                            socketChannel.write(send);
//                        }
//                        System.out.println(sendStr);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class MyReadRunnable implements Runnable {

        private SocketChannel channel;

        public MyReadRunnable(SocketChannel channel){
            this.channel = channel;
        }

        @Override
        public synchronized void  run() {
            ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);

            StringBuilder result = new StringBuilder();
            try {
                while (channel.read(byteBuffer) > 0){//确保读完
                    byteBuffer.flip();
                    result.append(new String(byteBuffer.array()));
                    byteBuffer.clear();//每次清空 对应上面flip()
                }
                System.out.println("server receive: " + result.toString());
            } catch (IOException e) {
                e.printStackTrace();
            }


        }
    }

    static class MyWriteRunnable implements Runnable {

        private SocketChannel channel;

        public MyWriteRunnable(SocketChannel channel){
            this.channel = channel;
        }

        @Override
        public void run() {
            String sendStr = "server send data: " + Math.random();
            ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
            try {
                while (send.hasRemaining()) {
                    channel.write(send);
                }
                System.out.println(sendStr);
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }
}