Java NIO

时间:2022-07-24
本文章向大家介绍Java NIO,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Java NIO

一. BIO

  1. BIO中最为核心的概念为,面向BIO编程即为面向流的编程
  2. 流分为输入流和输出流。对于一个流来说,要么是输入流,要么是输出流,不可能二者兼而有之。
  3. 理解Decorator设计模式。

二. NIO概述

  1. NIO中核心概念:
    1. Selector:选择器
    2. Channel:通道
    3. Buffer:缓冲区
  2. NIO是面向块(Block)或缓冲区(Buffer)编程。

三. Buffer

  1. Buffer本身就是一块内存,底层实现上就是一个数组,数据的读、写都是通过Buffer来实现的。
  2. Buffer可以同时进行读、写操作。在Buffer进行读、写切换之前,一定要先调用flip()方法。
  3. 除了数组之外,Buffer还提供了对于数据的结构化访问的方式,并且可以追踪到系统的读写过程。
  4. Java 中的7种原生数据类型都有各自对应的Buffer类型,如IntBuffer、ByteBuffer等,boolean类型没有对应的Buffer。
  5. Buffer中的三个重要状态属性:
    1. position
    2. limit
    3. capacity

    0 <= mark <= position <= limit <= capacity

四. Channel

  1. 可以向其写入数据或者从中读取数据的对象,类似于BIO中的Stream。
  2. 所有数据的读、写都是通过Buffer来进行的,永远不会出现直接向Channel读写数据的情况。
  3. 与Stream不同的是,Channel是双向的,而一个Stream只能是InputStream或者OutputStream。由于Channel是双向的,因此可以更好地反映底层操作系统的真实情况:在Linux中,底层操作系统的通道就是双向的。

五. Selector

废话不说,直接上代码。下面使用NIO编写了一个简单的聊天程序,服务端将客户端发来的消息广播给所有客户端。

首先看服务端:

package william.netty.nio;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Auther: ZhangShenao
 * @Date: 2019/1/23 15:46
 * @Description:聊天程序服务端
 */
public class ChatServer {
    private static Map<String, SocketChannel> clients = new ConcurrentHashMap<>();
    private static Selector selector;

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8080));
        System.err.println("Server 绑定端口 : 8080");
        selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            int select = selector.select();
            if (select <= 0) {
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();

                if (selectionKey.isAcceptable()) {
                    acceptClient((ServerSocketChannel) selectionKey.channel());
                } else if (selectionKey.isReadable()) {
                    broadcastMessage((SocketChannel) selectionKey.channel());
                }
            }
        }
    }

    private static void acceptClient(ServerSocketChannel serverSocketChannel) throws Exception {
        SocketChannel client = serverSocketChannel.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
        clients.putIfAbsent(UUID.randomUUID().toString(), client);
        System.err.println("客户端连接: " + client);
    }

    private static void broadcastMessage(SocketChannel channel) throws Exception {
        ByteBuffer readBuf = ByteBuffer.allocate(1024);
        int read = channel.read(readBuf);
        if (read <= 0) {
            return;
        }
        readBuf.flip();
        Charset charset = Charset.forName("UTF-8");
        String content = String.valueOf(charset.decode(readBuf).array());
        System.err.println("Client: " + channel + ", content: " + content);
        for (Map.Entry<String, SocketChannel> entry : clients.entrySet()) {
            String key = entry.getKey();
            SocketChannel client = entry.getValue();
            String msg;
            if (client == channel) {
                msg = "[self]: " + content;
            } else {
                msg = "[" + key + "]" + content;
            }
            ByteBuffer writeBuf = ByteBuffer.wrap(msg.getBytes(charset));
            //            writeBuf.flip();      使用wrap()方法创建的Buffer,不需要调用flip
            client.write(writeBuf);
        }
    }
}

下面是客户端:

package william.netty.nio;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @Auther: ZhangShenao
 * @Date: 2019/1/23 16:53
 * @Description:聊天程序客户端
 */
public class ChatClient {
    private static Selector selector;

    private static ExecutorService executor = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws Exception {
        selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));

        while (true) {
            int select = selector.select();
            if (select <= 0) {
                continue;
            }
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isConnectable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    if (channel.isConnectionPending()) {
                        channel.finishConnect();
                        /*ByteBuffer writeBuf = ByteBuffer.allocate(1024);
                        writeBuf.put((LocalDateTime.now() + "连接成功").getBytes());
                        writeBuf.flip();
                        channel.write(writeBuf);*/
                        send2Server(channel);
                    }
                    channel.register(selector,SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    showMessage((SocketChannel) selectionKey.channel());
                }
            }
            iterator.remove();
        }
    }

    private static void send2Server(SocketChannel channel) {
        executor.submit(() -> {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                buffer.clear();
                String msg = reader.readLine();
                buffer.put(msg.getBytes());
                buffer.flip();
                channel.write(buffer);
            }
        });
    }

    private static void showMessage(SocketChannel channel) throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        buffer.flip();
        byte[] bytes = new byte[buffer.limit()];
        buffer.get(bytes);
        System.err.println(new String(bytes));
    }
}

六. 直接内存

  1. HeapByteBuffer VS DirectByteBuffer
    1. HeapByteBuffer:跟普通Java对象相同,存储在JVM堆上
    2. DirectByteBuffer:DirectByteBuffer对象本身在JVM堆上,实际分配的内存空间在堆外内存中。Buffer中存在一个address成员变量,为堆外内存的地址引用,DirectByteBuffer通过该变量操作堆外内存空间。
  2. 零拷贝:将内核态的数据直接映射到用户态,减少数据的拷贝。
  3. 为什么不让操作系统直接访问堆内存:如果操作系统直接访问JVM堆,当进行GC时,就无法进行标记-整理操作,很容易产生OOM。
  4. 内存映射文件:MappedByteBuffer,位于堆外内存中。