Java AIO 异步IO应用实例

时间:2022-07-24
本文章向大家介绍Java AIO 异步IO应用实例,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

项目地址:https://github.com/windwant/windwant-demo/tree/master/io-service

Server:

package org.windwant.io.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

/**
 * AsynchronousServerSocketChannel
 */
public class AIOServer implements Runnable{

    private int port = 8889;
    private int threadSize = 10;
    protected AsynchronousChannelGroup asynchronousChannelGroup;

    protected AsynchronousServerSocketChannel serverChannel;

    public AIOServer(int port, int threadSize) {
        this.port = port;
        this.threadSize = threadSize;
        init();
    }

    private void init(){
        try {
            asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);
            serverChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);
            serverChannel.bind(new InetSocketAddress(port));
            System.out.println("listening on port: " + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void run() {
        try{
           if(serverChannel == null) return;
            serverChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AIOServer>() {
                final ByteBuffer echoBuffer = ByteBuffer.allocateDirect(1024);

                public void completed(AsynchronousSocketChannel result, AIOServer attachment) {
                    System.out.println("==============================================================");
                    System.out.println("server process begin ...");
                    try {
                        System.out.println("client host: " + result.getRemoteAddress());
                        echoBuffer.clear();
                        result.read(echoBuffer).get();
                        echoBuffer.flip();
                        System.out.println("received : " + Charset.defaultCharset().decode(echoBuffer));

                        int random = ThreadLocalRandom.current().nextInt(5);
                        printProcess(random);
                        System.out.println("server deal request execute: " + random + "s");

                        String msg = "server test msg-" + Math.random();
                        System.out.println("server send data: " + msg);
                        result.write(ByteBuffer.wrap(msg.getBytes()));
                        System.out.println("server process end ...");
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } finally {
                        attachment.serverChannel.accept(attachment, this);// 监听新的请求,递归调用。
                    }

                }

                public void failed(Throwable exc, AIOServer attachment) {
                    System.out.println("received failed");
                    exc.printStackTrace();
                    attachment.serverChannel.accept(attachment, this);
                }
            });
            System.in.read();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private void printProcess(int s) throws InterruptedException {
        String dot = "";
        for (int i = 0; i < s; i++) {
            Thread.sleep(1000);
            dot += ".";
            System.out.println(dot);

        }
    }

    public static void main(String[] args) throws IOException {
        new Thread(new AIOServer(8989, 19)).start();
    }
}

Client:

package org.windwant.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

/**
 * AsynchronousSocketChannel
 */
public class AIOClient implements Runnable{

    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    public AIOClient(String host, int port) throws IOException {
        this.client = AsynchronousSocketChannel.open();
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) {
        try {
            new Thread(new AIOClient("127.0.0.1", 8989)).start();
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void run() {
        client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
            public void completed(Void result, Object attachment) {
                    String msg = "client test msg-" + Math.random();
                    client.write(ByteBuffer.wrap(msg.getBytes()));
                    System.out.println("client send data:" + msg);
            }

            public void failed(Throwable exc, Object attachment) {
                System.out.println("client send field...");
            }
        });

        final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        client.read(byteBuffer, this, new CompletionHandler<Integer, Object>() {
            public void completed(Integer result, Object attachment) {
                System.out.println(result);
                System.out.println("client read data: " + new String(byteBuffer.array()));
            }

            public void failed(Throwable exc, Object attachment) {
                System.out.println("read faield");
            }
        });
    }
}

2017-12-11 改造client: AsynchronousChannelGroup

package org.windwant.io.aio;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
 
/**
 * AsynchronousSocketChannel
 */
public class AIOClient implements Runnable{
 
    private AsynchronousChannelGroup group;   //异步通道组 封装处理异步通道的网络IO操作
    private String host;
    private int port;
    public AIOClient(String host, int port) {
        this.host = host;
        this.port = port;
        initGroup();
    }
 
    private void initGroup(){
        if(group == null) {
            try {
                group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newFixedThreadPool(5), 5); //使用固定线程池实例化组
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
 
 
    private void send(){
        try {
            //异步流式socket通道 open方法创建 并绑定到组 group
            final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group);
            //连接
            client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
                public void completed(Void result, Object attachment) {
                    String msg = "client test msg-" + Math.random();
                    client.write(ByteBuffer.wrap(msg.getBytes()));
                    System.out.println(Thread.currentThread().getName() + " client send data:" + msg);
 
                    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    client.read(byteBuffer, this, new CompletionHandler<Integer, Object>() {
                        public void completed(Integer result, Object attachment) {
                            System.out.println(Thread.currentThread().getName() + " client read data: " + new String(byteBuffer.array()));
                            try {
                                byteBuffer.clear();
                                if (client != null) client.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
 
                        public void failed(Throwable exc, Object attachment) {
                            System.out.println("read faield");
                        }
                    });
                }
 
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("client send field...");
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public void run() {
        for (int i = 0; i < 100; i++) {
            send();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    @Override
    protected void finalize() throws Throwable {
        super.finalize();
        group.awaitTermination(10000, TimeUnit.SECONDS);
    }
 
    public static void main(String[] args) {
        try {
            new Thread(new AIOClient("127.0.0.1", 8989)).start();
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
 
    }
}