零基础学习java------30---------wordCount案例(涉及到第三种多线程callable)

时间:2019-09-23
本文章向大家介绍零基础学习java------30---------wordCount案例(涉及到第三种多线程callable),主要包括零基础学习java------30---------wordCount案例(涉及到第三种多线程callable)使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

知识补充:多线程的第三种方式

来源:http://www.threadworld.cn/archives/39.html

创建线程的两种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

一、Runnable接口

先看一下java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法:

由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

二、Callable接口

Callable接口位于java.util.concurrent包下,在它里面也只声明了一个方法,只不过这个方法叫做call()。

可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。Callable接口可以看作是Runnable接口的补充,call方法带有返回值,并且可以抛出异常。

三、FutureTask类

如何获取Callable的返回结果呢?一般是通过FutureTask这个中间媒介来实现的。整体的流程是这样的:
把Callable实例当作参数,生成一个FutureTask的对象,然后把这个对象当作一个Runnable,作为参数另起线程。

3.1 FutureTask的结构

 3.2 FutureTask的启动
由于FutureTask实现了Runnable,因此它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行

四、Future接口

FutureTask继承体系中的核心接口是Future。Future的核心思想是:一个方法,计算过程可能非常耗时,等待方法返回,显然不明智。可以在调用方法的时候,立马返回一个Future,可以通过Future这个数据结构去控制方法f的计算过程。
这里的控制包括:
get方法:获取计算结果(如果还没计算完,也是必须等待的)
cancel方法:还没计算完,可以取消计算过程
isDone方法:判断是否计算完
isCancelled方法:判断计算是否被取消

例子

public class ThreadMain {
    public static void main(String[] args) throws Exception, ExecutionException {
        ThreadTest threadTest = new ThreadTest();//线程任务
        // FutureTask接收线程的返回数据
        FutureTask<String> f = new FutureTask<>(threadTest);
        Thread thread = new Thread(f);
        thread.start();
        String res = f.get();
        System.out.println(res);
    }
}

class ThreadTest implements Callable<String>{
    @Override
    public String call() throws Exception {
        return "hello";
    }
}
View Code

一. 单词统计案例

使用多线程,不同的线程统计不同的文件中的单词的个数,将统计的结果数据写入到指定的中间文件中进行单词个数的汇总统计 。

客户端:

public class MyClient implements Callable<String>{
    
    //不同线程处理不同的文件
    String localFilePath ;
    public MyClient(String localFilePath) {
        super();
        this.localFilePath = localFilePath;
    }
    
    public static final String KEY = "天王盖地虎";
    public static final String PATH = "E:/javafile/wc_server.jar";
    @SuppressWarnings("unused")
    private String task() throws Exception{
        String res = null;
        Socket socket = new Socket("127.0.0.1",8888);
        ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
        ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
        //1 发送身份校验
        oos.writeObject(KEY);
        //2 接收校验结果
        boolean b = (boolean)ois.readObject();
        if(b) {//校验成功
            System.out.println("校验成功");
            //3 发送服务端存储jar包的路径
            oos.writeObject(PATH);
            //4 往服务端发送jar包
            // 4.1 读取本地需要的jar包
            BufferedInputStream bis = new BufferedInputStream(new FileInputStream("E:/javafile/wc.jar"));
            byte[] bs = new byte[1024];
            int len;
            while((len = bis.read(bs))!= -1) {
                // 4.2 利用网络流发送此jar包
                oos.write(bs,0,len);
            }
            oos.flush();
            bis.close();
            // 往服务端发送执行命令
            String cmd = "java -jar "+PATH+" " +localFilePath;
            oos.writeObject(cmd);
            // 接收服务端jar程序执行的结果 
            res = (String)ois.readObject();
//            System.out.println(res);
        }else {
            System.out.println("校验失败");
        }
        socket.close();
        return res;
    }
    @Override
    public String call() throws Exception {
        String res = task();
        return res;
    }
}
View Code

服务端

public class MyServer {
    public static final String KEY = "天王盖地虎";
    public static void main(String[] args) throws Exception {
        ServerSocket ss = new ServerSocket(8888);
        while(true) {
            Socket socket = ss.accept();
            OutputStream os = socket.getOutputStream();
            InputStream in = socket.getInputStream();
            ObjectInputStream ois = new ObjectInputStream(in);
            ObjectOutputStream oos = new ObjectOutputStream(os);
            //接收客户端发送的身份校验
            String str = (String)ois.readObject();
            //校验成功
            if(KEY.equals(str)) {
                oos.writeObject(true);
                // 接收路径
                String path = (String)ois.readObject();
                // 接收jar包,并写入指定的路径
                BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(path));
                byte[] bs = new byte[1024];
                int len = 0;
                while((len = ois.read(bs)) != -1) {
                    bos.write(bs,0,len);
                }
                bos.flush();
                bos.close();
                // 接收执行命令
                String cmd = (String)ois.readObject();
                Runtime runtime = Runtime.getRuntime();
                Process p = runtime.exec(cmd);
                InputStream inputStream = p.getInputStream();
                BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
                String line = null;
                StringBuilder res = new StringBuilder();
                while((line = br.readLine()) != null) {
                    res.append(line+"\r\n");
                }
                // 将结果返回给客户端
                oos.writeObject(res.toString());
                oos.flush();
            // 校验失败
            }else {
                //将结果返回到客户端
                oos.writeObject(false);
            }
        }
    }
}
View Code

多线程统计结果

public class TestClient {
    public static void main(String[] args) throws Exception {
        // 创建第一个线程任务
        MyClient m1 = new MyClient("E:/javafile/wc1.txt");
        FutureTask<String> f1 = new FutureTask<>(m1);
        new Thread(f1).start();
        String res1 = f1.get();
//        System.out.println("线程1的结果:"+"/r/n"+ res1);
        
        // 创建第二个线程任务
        MyClient m2 = new MyClient("E:/javafile/wc2.txt");
        FutureTask<String> f2 = new FutureTask<>(m2);
        new Thread(f2).start();
        String res2 = f2.get();
//        System.out.println("线程2的结果:" +res2);
        //结果数据的汇总  --->  写到中间结果
                //将结果数据追加到指定的文件中
                BufferedWriter bw = new BufferedWriter(new FileWriter(new File("E:/javafile/res.res"), true));
                bw.write(res1);
                bw.write(res2, 0, res2.length()-2);
                //bw.write(res2);
                bw.close();
                //统计结果数据
                Map<String,Integer> map = new HashMap<>() ;
                BufferedReader br = new BufferedReader(new FileReader("E:/javafile/res.res"));
                String line = null ;
                while((line = br.readLine())!=null){
                    System.out.println(line);
                    String[] split = line.split(":");
                    String word = split[0];
                    int num = Integer.parseInt(split[1]);
                    Integer num2 = map.getOrDefault(word, 0);
                    num2+=num;
                    map.put(word, num2);
                }
                Set<Entry<String, Integer>> entrySet = map.entrySet();
                for (Entry<String, Integer> entry : entrySet) {
                    System.out.println(entry);
                }
    }
}
View Code

原文地址:https://www.cnblogs.com/jj1106/p/11575599.html