第4章 Java并发编程的基础

时间:2019-10-25
本文章向大家介绍第4章 Java并发编程的基础,主要包括第4章 Java并发编程的基础使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
  本章将着重介绍Java并发编程的基础知识,从启动一个线程到线程间不同的通信方式,最后通过简单的线程池示例以及应用(简单的Web服务器)来串联本章所介绍的内容
  4.1线程简介
    4.1.1什么是线程
      现代操作系统调度的最小单元
    4.1.2为什么要使用线程
      更多的处理器核心
      更快的响应速度
      更好的编程模型
    4.1.3线程优先级
      程序正确性不能依赖线程的优先级高低
    4.1.4线程的状态
      

    更详细的了解线程的状态

       

package com.example.demo.test;

public class ThreadState {
    // 该线程不断地进行睡眠
    static class TimeWaiting implements Runnable{
        @Override
        public void run() {
            while(true) {
                SleepUtils.second(100);  //TimeWaiting 到时间后自动返回运行
            }
        }
    }
    // 该线程在Waiting.class 实例上等待
    static class Waiting implements Runnable{
        @Override
        public void run() {
            while(true) {
                synchronized (Waiting.class) {
                    try {
                        Waiting.class.wait();  //waiting等待notify
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
    }
    // 该线程在Blocked.class实例上加锁后,不会释放锁
    static class Blocked implements Runnable{
        public void run() {
            synchronized (Blocked.class) {  //block等待获得锁
                while(true) {
                    SleepUtils.second(100);
                }
            }
        }
    }
    
    public static void main(String[] args) {
        new Thread(new TimeWaiting(),"TimeWaitingThread").start();
        new Thread(new Waiting(),"WaitingThread").start();
        new Thread(new Blocked(),"BlockedThread-1").start();
        new Thread(new Blocked(),"BlockedThread-2").start();

    }
}

    线程状态的转换

          

  4.1.5Daemon线程(后台守护线程)

4.2启动和终止线程

  4.2.2启动线程 start()方法

  4.2.3理解中断

  4.2.4过期的suspend()\resume()\stop()

  4.2.5安全的终止线程(终端或者提供一个方法改变volatile变量的值)

4.3线程间通信

  4.3.1volatile关键字保证可见性,synchronized保证可见性和排他性

  4.3.2等待通知机制

    

    

   4.3.4管道的输入/输出流

    

package com.example.demo.test;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.nio.channels.Pipe;

public class Piped {
    static class Print implements Runnable{
        private PipedReader in;
        
        public Print(PipedReader in) {
            this.in = in;
        }

        @Override
        public void run() {
            int recevie = 0;
            try {
                while((recevie = in.read())!=-1) {
                    System.out.print((char) recevie);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
    }
    
    public static void main(String[] args)throws Exception {
        PipedWriter out = new PipedWriter();
        PipedReader in = new PipedReader();
        out.connect(in);
        new Thread(new Print(in),"printThread").start();
        int receive = 0;
        try {
            while((receive=System.in.read())!=-1) {
                out.write(receive);
            }
        } finally {
            out.close();
        }
        
    }
}

  4.3.5 Thread.join()的使用

package com.example.demo.test;

public class Join {
    
    public static void main(String[] args) {
        Thread previous = Thread.currentThread();
        for(int i=0;i<10;i++) {
            //每一个线程拥有钱一个线程的引用,需要前一个执行完成后后一个才能执行
            Thread thread = new Thread(new Domino(previous),String.valueOf(i));
            thread.start();
            previous = thread;
        }
    }
    
    static class Domino implements Runnable{
        private Thread thread;
        
        public Domino(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+" temriminate.");
        }
        
    }

}

4.3.6 ThreadLocal的使用

package com.example.demo.test;

import java.util.concurrent.TimeUnit;

public class Profiler {
    private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() {
        @Override
        protected Long initialValue() {
            return System.currentTimeMillis();
        }
    };
    
    public static final void begin() {
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }
    
    public static final long end() {
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }
    
    public static void main(String[] args)throws Exception {
        Profiler.begin();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Cost: "+ Profiler.end()+"mills");
    }
}

  4.4.1 等待超时模式

  

 4.4.2 一个简单的数据库连接池示例

  

package com.example.demo.test;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;
/**
 * 简单创建connection
 * @author helloworld
 *
 */
public class ConnectionDriver {
    
    static class ConnectionHandler implements InvocationHandler{

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if(method.getName().equals("commit")) {
                TimeUnit.MICROSECONDS.sleep(100);
            }
            return null;
        }
        
    }
    
    public static final Connection createConnection() {
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(), 

      new Class<?>[] {Connection.class}, new ConnectionHandler()); } }
package com.example.demo.test;

import java.sql.Connection;
import java.util.LinkedList;
/**
 * 数据库连接池
 * @author helloworld
 *
 */
public class ConnectionPool {
    private LinkedList<Connection> pool = new LinkedList<Connection>();
    
    public ConnectionPool(int initialSize) {
        if(initialSize>0) {
            for(int i = 0;i< initialSize;i++) {
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }
    
    public void releaseConnection(Connection connection) {
        if(connection!=null) {
            synchronized (pool) {
                pool.addLast(connection);
                pool.notifyAll();    //需要通知其它消费者
            }
        }
    }
    
    public Connection fetchConnection(long mills) throws Exception {
        synchronized (pool) {
            if(mills<=0) {
                while(pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            }else {
                long future = System.currentTimeMillis()+ mills;
                long remining = mills;
                while(pool.isEmpty() && remining>0) {
                    pool.wait(remining);
                    remining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if(!pool.isEmpty()) {
                    result = pool.removeFirst();
                }
                return result;
            }
        }    
    }
}
package com.example.demo.test;

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 测试数据库连接池
 * @author helloworld
 *
 */
public class ConnectionPoolTest {
    static ConnectionPool pool = new ConnectionPool(10);    //初始化10个connection
    static int threadCount = 1024;    //启动线程的数量
    static int count = 20;    //每个线程获取connection的次数
    
    static CountDownLatch start = new CountDownLatch(1);
    static CountDownLatch end = new CountDownLatch(threadCount);
    

    static class ConnectionRunner implements Runnable{
        int count;
        AtomicInteger got;
        AtomicInteger notGot;
        
        public ConnectionRunner(int count,AtomicInteger got,AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }
        
        @Override
        public void run() {
            try {
                start.await();
            }catch (Exception e) {
            }
            
            while(count>0) {
                try {
                    Connection connection = pool.fetchConnection(1000);
                    if(connection!=null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    }else {
                        notGot.incrementAndGet();
                    }
                }catch (Exception e) {
                    // TODO: handle exception
                }finally {
                    count--;
                }
            }
            end.countDown();
        }
        
    }
    
    
    public static void main(String[] args) throws Exception {
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for(int i=0;i<threadCount;i++) {
            Thread thread = new Thread(new ConnectionRunner(count, got, notGot));
            thread.start();
        }
        start.countDown();
        end.await();
        System.out.println("total invoke; "+ (threadCount*count));
        System.out.println("got connection: "+got);
        System.out.println("not got connection: "+notGot);
    }
}

4.4.3 线程池技术及其示例

public interface ThreadPool<Job extends Runnable> {
    // 执行一个Job,这个Job需要实现Runnable
    void execute(Job job);
    // 关闭线程池
    void shutdown();
    // 添加工作者线程
    void addWorkers(int num);
    // 减少工作者线程
    void removeWorker(int num);
    // 得到正在执行的任务数量
    int getJobSize();
}


package com.example.demo.test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import com.example.demo.test.SimpleHttpServer.HttpRequestHandler;


public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    // 线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    // 线程池默认数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    // 线程池最小的数量
    private static final int MIN_WORKER_NUMBERS = 1;
    // 存储所要执行的工作列表
    private final LinkedList<Job> Jobs = new LinkedList<>();
    // 工作者列表(其实本质上对应着一个线程)
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
    
    //工作线程的数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    //线程编号生成
    private AtomicLong threadNum = new AtomicLong();
    
    public DefaultThreadPool() {
        initializeWokers(DEFAULT_WORKER_NUMBERS);
    }
    
    public DefaultThreadPool(int num) {
        workerNum = num>MAX_WORKER_NUMBERS? MAX_WORKER_NUMBERS:num<MIN_WORKER_NUMBERS?MIN_WORKER_NUMBERS:num;
        initializeWokers(workerNum);
    }

    

    @Override
    public void execute(Job job) {
        if(job!=null) {
            synchronized (Jobs) {
                Jobs.add(job);
                Jobs.notify();
            }
        }
    }

    @Override
    public void shutdown() {
        for(Worker worker:workers) {
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        synchronized (Jobs) {
            if(num+this.workerNum>MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWokers(num);
            this.workerNum +=num;
        }
    }

    @Override
    public void removeWorker(int num) {
        synchronized (Jobs) {
            if(num>this.workerNum) {
                throw new RuntimeException("beyond workNum");
            }
            
            int count = 0;
            while(count<num) {
                Worker worker = workers.get(count);
                if(workers.remove(worker)) {
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        // TODO Auto-generated method stub
        return Jobs.size();
    }
    
    private void initializeWokers(int num) {
        for(int i=0;i<num;i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker,"Thread-worker-"+threadNum.incrementAndGet());
            thread.start();
        }
    }
    
    class Worker implements Runnable{
        // 是否工作
        private volatile boolean running = true;

        @Override
        public void run() {
            while(running) {
                Job job = null;
                synchronized (Jobs) {
                    while(Jobs.isEmpty()) {
                        try {
                            Jobs.wait();
                        }catch (Exception e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    job = Jobs.removeFirst();
                }
                if(job!=null) {
                    try {
                        job.run();
                    }catch (Exception e) {
                    }
                }
            }
        }
        
        
        public void shutdown() {
            running = false;
        }
    }
    
    
    public static void main(String[] args) {
        ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(11);
        threadPool.removeWorker(7);
    }

}

4.4.4 一个基于线程池技术的简单Web服务器

package com.example.demo.test;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SimpleHttpServer {
    // 线程池
    static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(11);
    // 根路径
    static String basePath;
    // 端口号
    static int port = 8080;
    // serverSocket
    static ServerSocket serverSocket;

    public static void setPort(int port) {
        if (port > 0) {
            SimpleHttpServer.port = port;
        }
    }

    public static void setBasePath(String basePath) {
        if (basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()) {
            SimpleHttpServer.basePath = basePath;
        }
    }

    public static void start() throws Exception {
        serverSocket = new ServerSocket(port);
        Socket socket = null;
        while ((socket = serverSocket.accept()) != null) {
            threadPool.execute(new HttpRequestHandler(socket));
        }
        serverSocket.close();
    }

    public static void main(String[] args) {
        setPort(8090);
        setBasePath("C:\\Users\\helloworld\\Downloads\\html");
        try {
            start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    
    
    
    
    static class HttpRequestHandler implements Runnable{
        private Socket socket;
        public HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            String line = null;
            BufferedReader br = null;
            BufferedReader reader = null;
            PrintWriter out = null;
            InputStream in = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String header = reader.readLine();
                String filePath = basePath + header.split(" ")[1];
                out = new PrintWriter(socket.getOutputStream());
                if(filePath.endsWith("jpg") || filePath.endsWith("ico")) {
                    in = new FileInputStream(filePath);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int i= 0;
                    while((i=in.read())!=-1) {
                        baos.write(i);
                    }
                    byte[] array = baos.toByteArray();
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server:Molly");
                    socket.getOutputStream().write(array,0,array.length);
                }else {
                    br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
                    out = new PrintWriter(socket.getOutputStream());
                    out.println("HTTP1.1 200 OK");
                    out.println("Server:Molly");
                    out.println("Content-Type: text/html; charset=UTF-8");
                    out.println("");
                    while((line = br.readLine())!=null) {
                        out.println(line);
                    }
                }
                out.flush();
            } catch (Exception e) {
                out.println("HTTP/1.0 500");
                out.println("");
                out.flush();
            }finally {
                close(br,in,reader,out,socket);
            }
        }
        
         
//        // 关闭流或者Socket
//        private static void close(Closeable... closeables) {
//            if (closeables != null) {
//                for (Closeable closeable : closeables) {
//                    try {
//                        closeable.close();
//                    } catch (IOException ex) {
//                        // 忽略
//                    }
//                }
//            }
//        }
//        
        private static void close(Closeable...closeables) {
            if(closeables!=null) {
                for(Closeable closeable:closeables) {
                    try {
                        closeable.close();
                    } catch (Exception e) {
                        
                    }
                }
            }
        }
        
    }
}

书写的很好,不过代码都是坑

原文地址:https://www.cnblogs.com/helloworldmybokeyuan/p/11733983.html