Java多线程相关知识点扩展实例分析

时间:2022-07-28
本文章向大家介绍Java多线程相关知识点扩展实例分析,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

这次说说多线程只是扩展,主要讲解一些应用,应用带一些原理讲解,同时希望各位老铁有所收获,这些内容跟前面的线程和并发容器有关系的,从应用场景引出多线程技术栈里面的应用,其实锁和同步块,容器,工具类,都是非常的使用的。反射更加适应于语法。

(一)多线程应用

  • ① 介绍

多线程经常使用在逻辑处理里面,一个程序N个逻辑要做,一个用户请求可能需要数据库查询,第三方的系统接口,调用redis,一个用户请求需要多步组成,可以使用多线程技术来实现,去做一些调整,

  • ② 经典场景

接触后端开发的时候,经常发现一个请求过来,后端需要做一系列的复杂的操作,下面这个后端有:系统消息,我的团队,我的钱包 对于这些信息,后面的系统如何设计。这些可能涉及到多个模块的调用。一个系统划分为多个子系统来做。

  • ③ 后端接口执行-大概流程

互联网公司存在组织结构复杂,调用的模块比较多。设计这样系统的时候,一个信息单独的查询系统的对应接口,还是移动前端发起一次请求一下获取到。一般都做网关(API)接口,一个请求获取多个信息,网关收到信息后,获取多个子系统的接口,最后把信息汇总,返回给前端。

  1. 收到一个请求。
  2. 调用多个服务接口获取其他系统的数据信息。
  3. 最后汇总范围。

通过数据分析,越来越多的互联网电商平台的单子70%以上都来自手机端,手机端有个典型的应用,网络处理很麻烦的,移动设备的固有属性,一个人走这走这到了信号的盲区了,一个页面发起五六个接口的请求,移动互联网的应用造成了很大的损耗,一般都是一个接口获取全部的信息。 如果一个API网关需要调用3个接口,这3个接口是串行完成的,A执行完(3秒),执行B,B执行完(2秒),执行C(5秒),C执行完返回给移动端json字符串,需要10秒才能返回。 如果A,B,C这3个没有相互依赖的关系,完全可以把A交给线程1,B交给线程2,C交给线程3,来一起去完成,汇总执行的结果,需要5秒,没完成就返回。这样是不是效率明显得到了提升。

(二)Future

  • ① 介绍

异步计算的结果,提供了用于检查计算是否完成,等待计算完成以及获取结果的方法。

  • ② 接口的定义
  1. boolean cancel(boolean mayInterruptIfRunning)

尝试取消当前任务的执行。如果任务已经取消、已经完成或者其他原因不能取消,尝试将失败。如果任务还没有启动就调用了cancel(true),任务将永远不会被执行。如果任务已经启动,参数mayInterruptIfRunning将决定任务是否应该中断执行该任务的线程,以尝试中断该任务。 如果任务不能被取消,通常是因为它已经正常完成,此时返回false,否则返回true

  1. boolean isCancelled()

如果任务在正常结束之前被被取消返回true

3.boolean isDone()

正常结束、异常或者被取消导致任务完成,将返回true

4.V get()

等待任务结束,然后获取结果,如果任务在等待过程中被终端将抛出InterruptedException,如果任务被取消将抛出CancellationException,如果任务中执行过程中发生异常将抛出ExecutionException。

5.V get(long timeout, TimeUnit unit)

任务最多在给定时间内完成并返回结果,如果没有在给定时间内完成任务将抛出TimeoutException。

(三)CountDownLatch

  • ① 介绍

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

  • ② 常见用法

多个人等一个信号后继续执行操作。例如5个运动员,等一个发令员的枪响。 一个人等多个人的信号。旅游团等所有人签到完成才开始出发。 常见到使用的地方是zk获取连接的时候。

  • ③ 源码分析
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class UserServiceCountLatch {
    ExecutorService executorService = Executors.newCachedThreadPool();

    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查询多个系统的数据,合并返回
     */
    public Object getUserInfo(String userId) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(2);
        ArrayList<JSONObject> values = new ArrayList<>();
        // 你可以封装成一个 提交URL 就能自动多线程调用的 工具
            executorService.submit(() -> {
                // 1.业务代码
                JSONObject userInfo = new JSONObject();
                values.add(userInfo);
                count.countDown();
            });
            executorService.submit(() -> {
               // 2.业务代码
               JSONObject intergralInfo= new JSONObject();
                values.add(intergralInfo);
                count.countDown();
        });

        count.await();// 等待计数器归零

        // 3. 合并为一个json对象
        JSONObject result = new JSONObject();
        for (JSONObject value : values) {
            result.putAll(value);
        }
        return result;
    }
}

1.统计线程执行的情况 2.压力测试中,使用countDownLatch实现最大程度的并发处理。 2.多个线程之间,相互通信,比如线程异步调用完接口,结果通知。

(四)CyclicBarrier

  • ① 介绍

CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

  • ② 场景

坐车,老板都是票卖完了才开车。 数据库的批量操作,达到一定数量批量进行插入。

  • ③ 源码
import java.util.ArrayList;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

// 循环屏障(栅栏),示例:数据库批量插入
// 游戏大厅... 5人组队打副本
public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>();
        // 任务1+2+3...1000  拆分为100个任务(1+..10,  11+20) -> 100线程去处理。

        // 每当有4个线程处于await状态的时候,则会触发barrierAction执行
        CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                // 这是每满足4次数据库操作,就触发一次批量执行
                System.out.println("有4个线程执行了,开始批量插入: " + Thread.currentThread());
                for (int i = 0; i < 4; i++) {
                    System.out.println(sqls.poll());
                }
            }
        });

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    sqls.add("data - " + Thread.currentThread()); // 缓存起来
                    Thread.sleep(1000L); // 模拟数据库操作耗时
                    barrier.await(); // 等待栅栏打开,有4个线程都执行到这段代码的时候,才会继续往下执行
                    System.out.println(Thread.currentThread() + "插入完毕");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }

        Thread.sleep(2000);
    }
}

(五)Semaphore

  • ①介绍

又称“信号量”,控制多个线程争抢许可。 acquire: 获取一个许可,如果没有就等待。 release: 释放一个许可。 availablePermits: 方法得到可用的许可数目。

  • ② 场景

代码并发处理限流 举个例子,去洗浴中心的时候都会给一个手环,这个手环很多时候就是为了限制熟练,因为柜子是有限的,每个人一个柜子,如果没有手环了就是没有柜子了,手环归还后柜子就出现了,基本就是这个原理。

  • ③ 源码
import com.study.lock.aqs.NeteaseAqs;

// 自定义的信号量实现
public class NeteaseSemaphore {
    NeteaseAqs aqs = new NeteaseAqs() {
        @Override
        public int tryAcquireShared() { // 信号量获取, 数量 - 1
            for(;;) {
                int count =  getState().get();
                int n = count - 1;
                if(count <= 0 || n < 0) {
                    return -1;
                }
                if(getState().compareAndSet(count, n)) {
                    return 1;
                }
            }
        }

        @Override
        public boolean tryReleaseShared() { // state + 1
            return getState().incrementAndGet() >= 0;
        }
    };

    /** 许可数量 */
    public NeteaseSemaphore(int count) {
        aqs.getState().set(count); // 设置资源的状态
    }

    public void acquire() {
        aqs.acquireShared();
    } // 获取令牌

    public void release() {
        aqs.releaseShared();
    } // 释放令牌
}
import com.study.lock.aqs.AQSdemo;

import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

// 信号量机制
public class SemaphoreDemo {
    public static void main(String[] args) {
        SemaphoreDemo semaphoreTest = new SemaphoreDemo();
        int N = 9;            // 客人数量
        NeteaseSemaphore semaphore = new NeteaseSemaphore(5); // 手牌数量,限制请求数量
        for (int i = 0; i < N; i++) {
            String vipNo = "vip-00" + i;
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取令牌

                    semaphoreTest.service(vipNo);

                    semaphore.release(); // 释放令牌
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    // 限流 控制5个线程 同时访问
    public void service(String vipNo) throws InterruptedException {
        System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
        Thread.sleep(new Random().nextInt(3000));
        System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
    }

}
package com.study.lock.aqs;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

// 抽象队列同步器
// state, owner, waiters
public class NeteaseAqs {
    // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
    // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
    // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
    // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。

    // 1、 如何判断一个资源的拥有者
    public volatile AtomicReference<Thread> owner = new AtomicReference<>();
    // 保存 正在等待的线程
    public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    // 记录资源状态
    public volatile AtomicInteger state = new AtomicInteger(0);

    // 共享资源占用的逻辑,返回资源的占用情况
    public int tryAcquireShared(){
        throw new UnsupportedOperationException();
    }

    public void acquireShared(){
        boolean addQ = true;
        while(tryAcquireShared() < 0) {
            if (addQ) {
                // 没拿到锁,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起当前的线程,不要继续往下跑了
                LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把线程移除
    }

    public boolean tryReleaseShared(){
        throw new UnsupportedOperationException();
    }

    public void releaseShared(){
        if (tryReleaseShared()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 唤醒
            }
        }
    }

    // 独占资源相关的代码

    public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
        throw new UnsupportedOperationException();
    }

    public void acquire() {
        boolean addQ = true;
        while (!tryAcquire()) {
            if (addQ) {
                // 没拿到锁,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起当前的线程,不要继续往下跑了
                LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把线程移除
    }

    public boolean tryRelease() {
        throw new UnsupportedOperationException();
    }

    public void release() { // 定义了 释放资源之后要做的操作
        if (tryRelease()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 唤醒
            }
        }
    }

    public AtomicInteger getState() {
        return state;
    }

    public void setState(AtomicInteger state) {
        this.state = state;
    }
}

PS:工具是根据场景来的,达到某个场景这个工具才有它的价值,如果你不存在这个场景这个工具也就没有价值。多线程这块设计到3块的知识:筑基阶段(JMM,lock,cas,atomic,sync),并发容器(。里面都涉及到数据结构,我已经开通了专辑数据结构与算法,数据结构并不是一两篇文章就可以搞定的东西,大学可是一门学科。),工具类阶段(多线程工具类阶段,设计模式的体现。不同的源码都有自己的设计模式的体现)