「通信框架Netty4 源码解读(一)」起步,关于IO的简单总结,模拟一个redis客户端

时间:2022-07-24
本文章向大家介绍「通信框架Netty4 源码解读(一)」起步,关于IO的简单总结,模拟一个redis客户端,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Netty是一个高效稳定的NIO应用通信框架,笔者在本专题将带领大家分析Netty底层源码,彻底理解底层通信原理。

注意,本专题只适宜了解java多线程和java io知识的小伙伴阅读。

IO

在计算机系统中I/O就是输入(Input)和输出(Output)的意思,针对不同的操作对象,可以划分为磁盘I/O模型,网络I/O模型,内存映射I/O, Direct I/O、数据库I/O等,只要具有输入输出类型的交互系统都可以认为是I/O系统,也可以说I/O是整个操作系统数据交换与人机交互的通道,这个概念与选用的开发语言没有关系,是一个通用的概念。 在如今的系统中I/O却拥有很重要的位置,现在系统都有可能处理大量文件,大量数据库操作,而这些操作都依赖于系统的I/O性能,也就造成了现在系统的瓶颈往往都是由于I/O性能造成的。因此,为了解决磁盘I/O性能慢的问题,系统架构中添加了缓存来提高响应速度;或者有些高端服务器从硬件级入手,使用了固态硬盘(SSD)来替换传统机械硬盘;在大数据方面,Spark越来越多的承担了实时性计算任务,而传统的Hadoop体系则大多应用在了离线计算与大量数据存储的场景,这也是由于磁盘I/O性能远不如内存I/O性能而造成的格局(Spark更多的使用了内存,而MapReduece更多的使用了磁盘)。因此,一个系统的优化空间,往往都在低效率的I/O环节上,很少看到一个系统CPU、内存的性能是其整个系统的瓶颈。也正因为如此,Java在I/O上也一直在做持续的优化,从JDK 1.4开始便引入了NIO模型,大大的提高了以往BIO模型下的操作效率。

BIO、NIO、AIO

BIO (Blocking I/O):同步阻塞I/O模式,数据的读取写入必须阻塞在一个线程内等待其完成。这里使用那个经典的烧开水例子,这里假设一个烧开水的场景,有一排水壶在烧开水,BIO的工作模式就是, 叫一个线程停留在一个水壶那,直到这个水壶烧开,才去处理下一个水壶。但是实际上线程在等待水壶烧开的时间段什么都没有做。

NIO (New I/O):同时支持阻塞与非阻塞模式,但这里我们以其同步非阻塞I/O模式来说明,那么什么叫做同步非阻塞?如果还拿烧开水来说,NIO的做法是叫一个线程不断的轮询每个水壶的状态,看看是否有水壶的状态发生了改变,从而进行下一步的操作。

AIO ( Asynchronous I/O):异步非阻塞I/O模型。异步非阻塞与同步非阻塞的区别在哪里?异步非阻塞无需一个线程去轮询所有IO操作的状态改变,在相应的状态改变后,系统会通知对应的线程来处理。对应到烧开水中就是,为每个水壶上面装了一个开关,水烧开之后,水壶会自动通知我水烧开了。

进程中的IO调用

进程中的IO调用步骤大致可以分为以下四步: 1. 进程向操作系统请求数据 ; 2. 操作系统把外部数据加载到内核的缓冲区中; 3. 操作系统把内核的缓冲区拷贝到进程的缓冲区 ; 4. 进程获得数据完成自己的功能 ; 当操作系统在把外部数据放到进程缓冲区的这段时间(即上述的第二,三步),如果应用进程是挂起等待的,那么就是同步IO,反之,就是异步IO,也就是AIO 。

异步、异步、阻塞、非阻塞

  1. 同步阻塞I/O(BIO): 同步阻塞I/O,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制来改善。BIO方式适用于连接数目比较小且固定的架构,这种方式对服务端资源要求比较高,并发局限于应用中,在jdk1.4以前是唯一的io现在,但程序直观简单易理解
  2. 同步非阻塞I/O(NIO): 同步非阻塞I/O,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求时才启动一个线程进行处理。NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,jdk1,4开始支持
  3. 异步非阻塞I/O(AIO): 异步非阻塞I/O,服务器实现模式为一个有效请求一个线程,客户端的IO请求都是由操作系统先完成了再通知服务器用其启动线程进行处理。AIO方式适用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,jdk1.7开始支持。
  4. IO与NIO区别:
  • IO面向流,NIO面向缓冲区
  • IO的各种流是阻塞的,NIO是非阻塞模式 Java NIO的选择允许一个单独的线程来监视多个输入通道,可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入或选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道
  1. 同步与异步的区别: 同步:发送一个请求,等待返回,再发送下一个请求,同步可以避免出现死锁,脏读的发生 异步:发送一个请求,不等待返回,随时可以再发送下一个请求,可以提高效率,保证并发
  • 同步异步关注点在于消息通信机制,阻塞与非阻塞关注的是程序在等待调用结果时(消息、返回值)的状态。阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程
  • 不同层次: CPU层次:操作系统进行IO或任务调度层次,现代操作系统通常使用异步非阻塞方式进行IO(有少部分IO可能会使用同步非阻塞),即发出IO请求后,并不等待IO操作完成,而是继续执行接下来的指令(非阻塞),IO操作和CPU指令互不干扰(异步),最后通过中断的方式通知IO操作的完成结果。 线程层次:操作系统调度单元的层次,操作系统为了减轻程序员的思考负担,将底层的异步非阻塞的IO方式进行封装,把相关系统调用(如read和write)以同步的方式展现出来,然而同步阻塞IO会使线程挂起,同步非阻塞IO会消耗CPU资源在轮询上,3个解决方法; 1. 多线程(同步阻塞) 2. IO多路复用(select、poll、epoll) 3. 直接暴露出异步的IO接口,kernel-aio和IOCP(异步非阻塞)

传统BIO创建服务

JNIO是jdk1.4以后才有的,之前JAVA IO一直是BIO,C、C++程序员为什么看不起java程序员?我想BIO的低性能就是其中一个重要的原因吧! Java BIO其实就是同步阻塞,高并发处理效率低,我们利用JAVA BIO开始一个服务端程序。

public class BioServer {
  public static void main(String[] args) throws IOException {
      //端口
      int port=8080;
      ServerSocket serverSocket=null;
      try {
          //绑定端口
          serverSocket=new ServerSocket(port);
          while (true){
              //主线程main会阻塞在这里,等待客户端链接
              Socket socket = serverSocket.accept();
              processClient(socket);
          }
      } catch (IOException | InterruptedException e) {
          e.printStackTrace();
      }finally {
          if(serverSocket!=null){
              serverSocket.close();
          }
      }
  }
  public static  void processClient(Socket socket) throws InterruptedException {
      //模拟处理socket
      Thread.sleep(1000);
  }
}

这段代始终在main线程中执行,就好比公司创建初期只有老板一个人,确实能完成客户端的链接及请求处理,运行程序代码会阻塞在serverSocket=new ServerSocket(port);,一直等到客户端链接成功后,才执行处理函数processClient(socket);,处理结束后,继续循环,此时程序继续阻塞在Socket socket = serverSocket.accept();等待新的客户端链接。processClient花了10秒钟才处理完毕,在此期间,如果其他客户端请链接服务器是不成功的,它必须等上一个客户端请求处理完成了才能继续。假如有1000个客户请求呢?10000个呢?想想你在浏览器页面等待一天才下单成功...于是,这家电商公司倒闭了! 客户端发链接请求,希望你服务器立马处理我的请求,而不是等你处理完毕了别人的事情再来搭理我!时间很宝贵好吗?服务器很委婉,表示人手不够,没办法处理别人事情的同时再处理你的事情,毕竟一心不可二用。 那就增加人手!于是线程临危受命(公司开始招人),服务器派主线程接收请求(相当于公司前台),然后将请求交给另一线程(相当于业务人员)处理,服务器继续等待连接,这样的话新的客户端能立马链接上服务器,而不用等待服务器处理完别人的事情再来接待我了,代码如下:

public class BioServer {
    public static void main(String[] args) throws IOException {
        //端口
        int port=8080;
        ServerSocket serverSocket=null;
        try {
            //绑定端口
            serverSocket=new ServerSocket(port);
            while (true){
                //主线程main会阻塞在这里,等待客户端链接
                Socket socket = serverSocket.accept();
              //请求处理交给别人,主线程继续接待客户端的请求
                new Thread(()->{
                        processClient(socket);
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(serverSocket!=null){
                serverSocket.close();
            }
        }
    }
      public static  void processClient(Socket socket)  {
        //模拟处理socket
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

客户端链接成功后,交给一个线程处理请求,主线程继续循环等待客户端的链接?如果,有10000个人来链接,那服务器就要开10000个线程,如果10万呢?你开10万个线程?哇,你服务器性能好高耶!线程的创建与销毁很耗资源的好吗?就好比你的公司,你招10000万个业务人员处理客户需求?正常的做法是,招10个业务人员,轮询处理客户请求,每一个业务人员处理完客户请求后等待服务器分给他下一单任务,于是,线程池登场了:

public class BioServer {
    public static void main(String[] args) throws IOException {
        //端口
        int port=8080;
        ServerSocket serverSocket=null;
        try {
            //绑定端口
            serverSocket=new ServerSocket(port);
            //创建一个线程池,相当于一个固定规模的业务团队
            TimeServerHandlerExecutorPool pool = new TimeServerHandlerExecutorPool(50, 1000);
            while (true){
                //主线程main会阻塞在这里,等待客户端链接
                Socket socket = serverSocket.accept();
                pool.execute(()->{processClient(socket);});
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(serverSocket!=null){
                serverSocket.close();
            }
        }
    }

    public static  void processClient(Socket socket)  {
        //模拟处理socket
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class TimeServerHandlerExecutorPool implements Executor{
     private ExecutorService executorService;
    public TimeServerHandlerExecutorPool(int maxPoolSize,int queueSize) {
        /**
         * @param corePoolSize 核心线程数量
         * @param maximumPoolSize 线程创建最大数量
         * @param keepAliveTime 当创建到了线程池最大数量时  多长时间线程没有处理任务,则线程销毁
         * @param unit keepAliveTime时间单位
         * @param workQueue 此线程池使用什么队列
         */
        System.out.println(Runtime.getRuntime().availableProcessors());
        this.executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                maxPoolSize,120L, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));
    }
    @Override
    public void execute(Runnable command) {
        executorService.execute(command);
    }
}

OK,现在这个公司有模有样了,一个前台,N个业务人员,只要有订单,这N个业务人员可以不睡觉! 公司规模日益增长,用户量越来越大,N个业务人员已经加班加点累吐血了,公司到了一个瓶颈期,急需改变现状。 大家有没有发现,当一件事参与的人多了以后,沟通往往会成为事情发展的最大障碍,前台人员需要不断的与业务人员沟通,业务人员来回不断的找前台沟通,前台在多个业务人员之间不断的进行脑力切换。如果前台正在跟业务人员A沟通,这时业务人员B插进来了,前台转而去跟B沟通,沟通完后需要回忆刚才跟A沟通到哪里了,前台想,太累了,有跟业务人员解释的时间还不如我自己干。其实,这就是多线程的上下文切换,CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个任务。但是,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再次加载这个任务的状态,从任务保存到再加载的过程就是一次上下文切换。线程切换时需要知道在这之前当前线程已经执行到哪条指令了,所以需要记录程序计数器的值,另外比如说线程正在进行某个计算的时候被挂起了,那么下次继续执行的时候需要知道之前挂起时变量的值时多少,因此需要记录CPU寄存器的状态。所以一般来说,线程上下文切换过程中会记录程序计数器、CPU寄存器状态等数据。

image.png

公司不得不进行改革,对前台人员进行业务培训。前台记录多个用户需求,搜集到一定程度后,暂停收集,对这些需求进行筛选,大部分短期自己能做的任务自己做了,难度大且耗时的任务交给业务人员处理。随着规模的增大,可以分成多个组,每组一个前台和多个业务人员。这就是NIO单线程Reactor模型和多线程Reactor模型,上面的比喻可能不恰当,后面会通过代码的形式详细讲解NIO。

利用传统BIO手写一个Redis客户端

Redis作为高性能的缓存数据库,想必大家都用过,应用程序通过Jedis客户端来链接redies,我们就利用java BIO来模拟一个Jedis客户端来向redis发送请求数据。 通信需要双方定好通信协议和数据格式,这里通信协议就是TCP,我们主要关系数据格式,方法就是查看Jedis发送数据的格式。

  • 首先,准备服务程序,用于接收并查看Jedis发送来的数据:
public class BioServer {

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(9999);
            while (true) {
                Socket socket = serverSocket.accept();
                System.out.println("客户端" + socket.getRemoteSocketAddress().toString() + "来连接了");
                InputStream inputStream = null;
                OutputStream outputStream = null;
                try {
                    inputStream = socket.getInputStream();
                    outputStream = socket.getOutputStream();
                    int count = 0;
                    byte[] bytes = new byte[1024];
                    while ((count = inputStream.read(bytes)) != -1) {
                        String line = new String(bytes, 0, count, "utf-8");
                       //打印jedis发送过来的数据
                        System.out.println(line);
                        outputStream.write("ok".getBytes());
                        outputStream.flush();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    socket.close();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            serverSocket.close();
        }
    }
}
  • 准备Jedis链接程序:
    <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.8.0</version>
        </dependency>

//模拟jedis
public class RedisClient {

    public static void main(String[] args) {
        Jedis redisClient=new Jedis("localhost",9999);
        System.out.println(redisClient.set("yuanma", "123456"));
        redisClient.close();
    }
}

服务程序打印结果:

客户端/127.0.0.1:65257来连接了
*3
$3
SET
$6
yuanma
$6
123456

上面的服务程序不是真正的Redis服务器,我们只是为了查看Jedis发送的数据格式。Jedis的请求是set yuanma 123456,意思是设置键yuanma的值为123456,Jedis将这个请求封装成了上面的数据格式。我们根据这个数据格式,利用BIO模拟Jedis向真正的Redis服务器发请求,然后再根据键从redis服务器获取值,看是否能成功。 先分析下上面的数据格式,*3的意思是发送的参数有3个,即set、yuanma和123456,3表示第一个参数长度是3,SET表示的就是第一个参数;以此类推,6表示第二个参数长度是6,yuanma表示第二个参数;

*3
$3
SET
$4
name
$5
netty

如果,向Redis服务器发送 get name(即获取name的值),数据格式应该是这样的:

*2
$3
GET
$4
name

OK,数据格式我们研究清楚了,下面就是模拟Jedis向服务其发送请求并接收返回的数据。

模拟Jedis客户端来链接Redis服务器

第一步,定义向Redis发送请求的客户端API:

import redis.clients.jedis.Jedis;
//模拟jedis
public class RedisClient {
    //发送set key value命令
    public String set(String key, String value){
          reutrn null;
    }
    //发送get key命令
    public String get(String key){
        return null;
    }
    //发送incr key命令
    public String incr(String key){
        return  null;
    }
}

第二步,定义Socket通信层:

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

//socket通信
public class LubanSocket {

    private Socket socket;
    private InputStream inputStream;
    private OutputStream outputStream;

//构造函数,链接Redis服务器,拿到输入流和输出流
    public LubanSocket(String ip,int prot) {
        try {
            if(!isCon()){
                socket=new Socket(ip,prot);
                inputStream=socket.getInputStream();
                outputStream=socket.getOutputStream();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

//发送请求
    public void send(String str){
        try {
            outputStream.write(str.getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
//读取Redis返回的数据
    public String read(){
        byte[] b=new byte[1024];
        int count=0;
        try {
            count= inputStream.read(b);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new String(b,0,count);
    }

//判断链接是否断开
    public boolean isCon(){
        return socket!=null && !socket.isClosed() && socket.isConnected();
    }
//关闭连接
    public void close(){
        if(outputStream!=null){
            try {
                outputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if(inputStream!=null){
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if(socket!=null){
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

第三步,定义数据协议层

public class Resp {
    /**
     * redis网络通信协议,比如set("name","congzhizhi")
     * *3
     * $3
     * set
     * $4
     * name
     * $10
     * congzhizhi
     * 其中,*3表示发了3个参数,$3表示下面的参数3个字符,以此类推
     *
     *
     */
    public static final String star="*";
    public static final String crlf="rn";
    public static final String lengthStart="$";
//枚举类,定义指令,这里有SET指令、GET指令、INCR指令
    public static enum command{
        SET,GET,INCR
    }
}

下一步就是完善第一步的客户端,组装发送命令。代码也很简单,直接看:

//模拟jedis
public class RedisClient {

    private LubanSocket lubanSocket;
//构造函数,链接Redis服务器
    public RedisClient(String ip,int prot) {
        this.lubanSocket=new LubanSocket(ip,prot);
    }
//发送set命令
    public String set(String key, String value){
        lubanSocket.send(commandStrUtil(Resp.command.SET,key.getBytes(),value.getBytes()));
        return lubanSocket.read();
    }
//关闭链接
    public void close(){
        lubanSocket.close();
    }
//发送get命令
    public String get(String key){
        lubanSocket.send(commandStrUtil(Resp.command.GET,key.getBytes()));
        return lubanSocket.read();
    }
//发送incr命令
    public String incr(String key){
        lubanSocket.send(commandStrUtil(Resp.command.INCR,key.getBytes()));
        return lubanSocket.read();
    }

//组装命令
    public String commandStrUtil(Resp.command command, byte[]... bytes){
    StringBuilder stringBuilder=new StringBuilder();
        //拼接*3,set key value,总共3个,bytes代表键和值参数,注意拼接完要加回车换行
        stringBuilder.append(Resp.star).append(1+bytes.length).append(Resp.crlf);
        //拼接SET的长度,$3
        stringBuilder.append(Resp.lengthStart).append(command.toString().getBytes().length).append(Resp.crlf);
        //拼接SET字符串
        stringBuilder.append(command.toString()).append(Resp.crlf);
        //拼接键和值
        for (byte[] aByte : bytes) {
            stringBuilder.append(Resp.lengthStart).append(aByte.length).append(Resp.crlf);
            stringBuilder.append(new String(aByte)).append(Resp.crlf);
        }
        return stringBuilder.toString();
    }
}

上面的代码很简单,不细讲了,下面我们来做个测试:

  • 首先,我们先启动redis,小编为演示,在这里启动一个windows版本的redis,到安装目录下通过命令redis-server.exe "redis.windows.conf"即可启动,端口号默认为6379:
  • 编写测试
 public static void main(String[] args) {
        RedisClient redisClient=new RedisClient("localhost",6379);
        System.out.println(redisClient.set("yuanma", "123456"));
        System.out.println(redisClient.get("yuanma"));
        redisClient.close();
    }

打印结果:

这说明,我们成功向Redis发送的set和get命令,并成功接收了Redis返回的数据。 为进一步证明,我们有可视化客户端连接Redis,然后查看我们刚才set的数据

完美!其实,不光SET 和GET命令,Redis中大部分常用的命令都可以使用咱们这个手写的客户端都可以。Mysql驱动连接数据库也是这个原理啦,就是TCP通信,只不过数据协议和IO模型(以后详讲)不同而已。这就是传统的JAVA AIO编程,他是同步阻塞的,无法满足高并发链接,下一节我们就开始讲高并发网络通信基础NIO。