Netty之美--I/O模型

时间:2022-07-23
本文章向大家介绍Netty之美--I/O模型,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

什么是I/O

顾名思义,I/O系统是用于实现数据输入、输出及数据存储的系统。

相关概念

1. 同步、异步

「同步(synchronous)」是指当一个任务A的执行需要依赖于另外一个任务B的执行结果时,任务A必须等待任务B执行完成,才可以继续执行;

「异步(asynchronous)」是指发送一个请求,不需要等待返回,随时可以再发送下一个请求,即不需要等待;

2. 阻塞、非阻塞

阻塞和非阻塞,是从获取调用结果时的状态的角度来说的。

「阻塞」是指调用结果返回之前,当前线程会被挂起,一直处于等待消息通知,不能够执行其他业务。

「非阻塞」指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回

3. 同步阻塞、同步非阻塞

「同步阻塞」是指这个线程在等待当前函数返回时,没有执行其他消息处理,而是处于挂起等待状态;

「同步非阻塞」是指这个线程在等待当前函数返回时,仍在执行其他消息处理,Netty就是同步非阻塞的;同步非阻塞是指使用轮询的方式去获取结果;

4. 异步阻塞、异步非阻塞、

「异步阻塞」本来是异步的,却非要在获取结果时候阻塞当前线程;之前的tomcat就是这种情况;

「异步非阻塞」效率最高,其实就是用多线程去做;异步非阻塞是指通过通知回调的方式来获取调用结果

Unix与java的I/O模型

如图,Unix 下共有五种 I/O 模型:阻塞 I/O、非阻塞 I/O、I/O 多路复用(select、poll、epoll)信号驱动 I/O(SIGIO)和异步 I/O(Posix.1的aio_系列函数),而java除了其中的信号驱动式之外,其他均有支持;

「注:」理解I/O模型,首先要理解一个输入操作所必须包含的2个阶段:

  1. 等待数据准备好;
  2. 从内核向进程复制数据;

对于套接字上的输入操作,第一步通常涉及等待数据从网络中到达。当所等待的分组到达时,它被复制到内核中的某个缓冲区。第二步就是把数据从内核缓冲区复制到应用进程缓冲区。

1. 阻塞式IO模型

在图6-1中,进程调用recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲中或者发生错误才返回。这就是阻塞式IO模型的微观图示。

针对阻塞IO模型的传统服务设计则如上图,服务器对每个client连接都会启动一个专门的线程去维护,服务器中的逻辑Handler需要在各自的线程中执行,这种模型对线程的需求较多,面对高并发的场景,会造成CPU资源浪费;原来的tomcat就是这种模式,只不过现在也支持NIO了。

常见写法(服务端):
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @program test
 * @description: bio
 * @author: cys
 * @create: 2020/06/30 16:20
 */
public class BIOServer {

    //线程池机制
    //1. 创建一个线程池
    //2. 如果有客户端连接了,创建一个线程与之通讯(单独写一个方法)

    public static void main(String[] args) throws IOException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务器启动了");
        while (true) {
            //监听,等待客户端连接
            final Socket socket = serverSocket.accept();
            System.out.println("有客户端连接");
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    handler(socket);
                }
            });
        }

    }

    private static void handler(Socket socket) {
        byte[] bytes = new byte[1024];
        try (InputStream inputStream = socket.getInputStream();) {
            int read = 0;
            while ((read = inputStream.read(bytes)) != -1) {
                System.out.println(new String(bytes, 0, read));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

非阻塞IO

  • 前三次调用时,数据未准备好,内核就会立即返回一个错误码(EWOULDBLOCK),此时请求线程不会被阻塞;
  • 第四次时,数据准备好,它被复制到应用进程缓冲区,recvfrom成功返回。
  • 由此可见:请求线程将不断请求内核,查看数据是否准备好。这种轮询操作,一样会消耗大量的CPU资源,所以在java的实现中,会采用同时支持I/O复用的方式支持非阻塞。

I/O复用模型

如图I/O复用模型将阻塞点由真正的I/O系统调用转移到了对select、poll或者epoll系统函数的调用上。单纯看这个微观图,有可能还会觉得与阻塞I/O区别不大,甚至于我们多增加了I/O调用还增加了性能损耗。其实不然,使用select以后最大的优势是用户可以在一个线程内同时处理多个连接的I/O请求,我们也会在下一篇,详细解读它的优势;

java NIO实现一个聊天的功能(服务端)
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
 * @program test
 * @description:
 * @author: cys
 * @create: 2020/07/02 10:37
 */
public class GroupChatServer {

    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int port = 6667;

    public GroupChatServer() {
        try {
            selector = Selector.open();
            listenChannel = ServerSocketChannel.open();
            listenChannel.socket().bind(new InetSocketAddress(port));
            listenChannel.configureBlocking(false);
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void listen() {
        try {
            while (true) {
                if (selector.select(2000) > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isAcceptable()) {//连接事件
                            SocketChannel socketChannel = listenChannel.accept();
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                        if (key.isReadable()) {//读取事件,即通道可读了
                            read(key);
                        }
                        iterator.remove();
                    }
                } else {
                    System.out.println("等待。。。。");
                }
            }
        } catch (IOException e) {

        } finally {

        }
    }

    //读取客户端消息
    private void read(SelectionKey selectionKey) {

        SocketChannel socketChannel = null;
        try {
            //定义一个SocketChannel
            socketChannel = (SocketChannel) selectionKey.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = socketChannel.read(buffer);
            if (count > 0) {//读取到了数据
                String str = new String(buffer.array());
                System.out.println("from 客户端" + str);
                //向其他的客户端转发消息
                sendInfoToOtherClients(str, socketChannel);
            }

        } catch (IOException e) {
            try {
                System.out.println(socketChannel.getRemoteAddress() + "离线了");
                selectionKey.cancel();
                socketChannel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        } finally {
        }
    }

    private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
        System.out.println("服务器转发消息中");
        //遍历所有注册到Selector上的channel,并排除self
        for (SelectionKey key : selector.keys()) {
            Channel targetChannel = key.channel();
            if (targetChannel instanceof SocketChannel && targetChannel != self) {
                SocketChannel desc = (SocketChannel) targetChannel;
                //将msg存储到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                desc.write(buffer);
            }
        }
    }

    public static void main(String[] args) {
        new GroupChatServer().listen();
    }
}

信号驱动式I/O模型

如图,我们也可以用信号,让内核在描述符就绪时发送SIGIO信号通知我们。我们称这种模型为信号驱动式I/O(signal-driven I/O).

异步I/O模型

它由POSIX规范定义,工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。与信号驱动式I/O不同的是:信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。

区别

如图,前四种模型都属于同步I/O模型,因为其中真正的 I/O操作将阻塞进程。而异步I/O是完全不会阻塞请求I/O的。目前 Windows 下通过 IOCP 实现了真正的异步 I/O。而在 Linux 系统下,目前 AIO 并不完善,因此在 Linux 下实现高并发网络编程时都是以 IO 复用模型模式为主。

参考文献

  • [1]《UNIX网络编程卷1:套接字联网API(第3版)》
  • [2]“Scalable IO in JAVA”