springBoot 线程池异步编程

时间:2022-07-23
本文章向大家介绍springBoot 线程池异步编程,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

线程池中的两个概念

线程和任务,任务是需要线程去执行的。 这里写一个支付相关的异步线程池的栗子:

1、在application.properties中添加线程池的配置参数:

 pay.threadNamePrefix=pay-exec-
 pay.maxPoolSize=20
 pay.corePoolSize=10
 pay.queueCapacity=1000

2、基于注解进行参数的配置 在config包下,创建PayThreadPoolConfig.java配置类:

/**

  • 配置支付线程池 */
@EnableAsync
    @Configuration
    public class PayThreadPoolsConfig {
    /**
     * 支付线程相关参数
     */
    @Value("${pay.threadNamePrefix}")
    private String threadNamePrefix;    // 配置线程池中的线程名称前缀
 
    @Value("${pay.corePoolSize}")
    private Integer corePoolSize;       // 配置线程池中的核心线程数
 
    @Value("${pay.maxPoolSize}")
    private Integer maxPoolSize;        // 配置最大线程数
 
    @Value("${pay.queueCapacity}")
    private Integer queueCapacity;      // 配置队列大小
 
    /**
     * 支付线程池配置
     * @return
     */
    @Bean
    public AsyncTaskExecutor paymentTaskExexutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix(threadNamePrefix);
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        // 设
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // TODO
            }
        });
        return executor;
    }
}

分析: 问题1: 上述的核心线程数corePoolSize、最大线程数maxPoolSize以及队列大小queueCapacity,该如何设置? corePoolSize:核心线程数。

1)核心线程会一直存活,即使没有任务需要执行。

2)当线程数小于核心线程时,即使有线程空闲,线程池也会优先创建新线程处理。

3)设置allowCoreThreadTimeOut=true(默认为false)时,核心线程会超时关闭。

4)当所有核心线程都忙碌时,此时如果系统需要新的线程执行别的任务,线程池不会创建新的线程,而是把任务放入任务队列(与queueCapacity相关)

queueCapacity:任务队列的容量(阻塞队列) 1)当核心线程数达到最大时,新任务会放在队列中等待执行

queueCapacity:最大线程数 1)当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务。(创建新线程的时机)

2)当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常

keepAliveTime:线程空闲时间

1)当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize

2)如果allowCoreThreadTimeOut=true,则会直到线程数量=0

allowCoreThreadTime:允许核心线程超时

首先看下源码的缺省配置: 线程数的配置应该和业务的繁忙程度以及当前CPU的核数有关进行最合理的设置。 上述中缺省的corePoolSize是1,如果因此,如果异步要开启多个线程需要配置多个。

问题2:拒绝策略executor.setRejectedExecutionHandler中的对象该如何设置? setRejectedExecutionHandler设置当线程池的数量达到maxPoolSize,如何处理任务。 通常看到的代码是设置为:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

上述new ThreadPoolExecutor.CallerRunsPolicy()表示当线程池中线程的数量达到maxPoolSize时,不在新线程中执行任务,而是有调用者所在的线程来执行。 rejectedExecutionExecutionHandler:任务拒绝处理器

两种情况下会拒绝处理任务:

1)当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务

2)当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown()之间提交任务,会拒绝新任务 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认为AborPolicy,会抛出异常。 ThreadPoolExecutor类有几个内部类来处理这类情况:

1)AbortPolicy 丢弃任务,抛运行时异常(缺省)

2)CallerRunsPolicy 执行任务(只用调用者所在线程运行任务)

3)DiscardPolicy 忽视,什么都不会发生

4)DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行的)任务

5)实现RejectedExecutionHandler接口,可自定义处理器

ThreadPoolExecutor执行顺序 线程池按以下行为执行任务

1)当线程数小于核心线程数时,创建线程

2)当线程数大于核心线程数,且任务队列未满时,将任务放入任务队列

3)当线程数大于等于核心线程数,且任务队列已满

1、若线程数小于最大线程数,创建线程
 2、若线程数大于最大线程数,抛出异常,拒绝任务

如何设置参数

1、默认值

corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeOut=false
rejectedExecutionHandler=AbortPolicy()

2、如何来设置 需要根据几个值来设置 1)tasks:每秒的任务数,假设为500~1000 2)taskcost:每个任务花费时间,假设为0.1s 3)responsetime:系统允许容忍的最大响应时间,假设为1s

根据上述的变量进行计算

1) 计算corePoolSize 每秒需要多少个线程处理(corePoolSize)?

方法一:threadCout=tasks/(1/taskcost)=tasks*taskscount=(5001000)*0.1=50100个线程。corePoolSize设置应该大于50

方法二:根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可

2)计算queueCapacity

1.计算可得queueCapacity=80/0.1*1=80。意思是队列里的线程可以等待1s,超过了的需要新开线程来处理。

2.切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务徒增时,不能新开线程来执行,响应时间会随之陡增。

3)计算maxPoolSize

maxPoolSize=(max(tasks)-queueCapacity)/(1/taskcst)=(10000-80)/10=92

即:(最大任务数-队列容量)/每个线程每秒处理能力=最大线程数

4)rejectedExecutionHandler 根据具体情况来决定,任务不重要可丢弃,任务重要则要利用缓冲机制来处理

5)keepAliveTime和allowCoreThreadTimeOut采用默认通常能满足 以上都是理性值,实际情况要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件和优化代码来降低taskcost。

3、在async包下,创建一个PayAsyncService.java异步服务类:

/**
 * 支付 异步服务类
 */
@Service
@Slf4j
public class PayAsyncService {
 /**
     * 无参数回调
     */
    @Async(value = "paymentTaskExexutor")
    public void asyncNoReturn() {
        log.info("async no return");
    }
 
    /**
     * 带参数的异步回调 异步方法可以传入参数
     * @param arg
     */
    @Async(value = "paymentTaskExexutor")
    public void asyncInvokeWithParam1(CountDownLatch countDownLatch, String arg) {
        log.info("async1 invoke with param is :{}", arg);
        countDownLatch.countDown();
    }
 
    @Async(value = "paymentTaskExexutor")
    public void asyncInvokeWithParam2(CountDownLatch countDownLatch, String arg) {
        log.info("async2 invoke with param is :{}", arg);
        countDownLatch.countDown();
    }
/**
 * 异步回调返回future
 * @param arg
 * @return
 */
@Async(value = "paymentTaskExexutor")
public Future<String> asyncInvokeReturnFuture(String arg) {
    log.info("async invoke return future success");
    for (int i=0; i<10; i++) {
        log.info("{} output i:{}", arg, i);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    Future<String> future = new AsyncResult<>(arg + " success!");
    return future;
}

@Async(value = "paymentTaskExexutor")
public Future<String> asyncInvokeReturnFuture2(String arg) {
    log.info("async invoke return future failed");
    for (int i=0; i<10; i++) {
        log.info("{} output i:{}",  arg, i);
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    Future<String> future = new AsyncResult<>(arg + "failed!");
    return future;
}

}

4、这里通过Contorlle层的代码进行测试:

@RestController
@Slf4j
public class ExampleController {
@Autowired
private PayAsyncService payAsyncService;

@RequestMapping(value = "/test")
public String getTest() throws InterruptedException {
    payAsyncService.asyncNoReturn();
    log.info("get test");
    return "hello3";
}

@RequestMapping(value = "/test2")
public String getTest2() {
    final CountDownLatch countDownLatch = new CountDownLatch(2);
    payAsyncService.asyncInvokeWithParam1(countDownLatch,"timchen");
    payAsyncService.asyncInvokeWithParam2(countDownLatch,"timchen");
    log.info("get test2");
    return "test2";
}
@RequestMapping(value = "/test3")
    public String getTest3() throws InterruptedException, ExecutionException {
        Future<String> future1 = payAsyncService.asyncInvokeReturnFuture( "timchen");
        Future<String> future2 = payAsyncService.asyncInvokeReturnFuture2("xixi");
        while (!future1.isDone() || !future2.isDone()) {
//            log.info("futre1 is:{}, futre2 is:{}", future1.isDone(), future2.isDone());
        }
        if (future1.isDone() && future1.isDone()) {
            log.info("future1 return result is:{}, future2 return result is:{}.", future1.get(), future2.get());
        }
        log.info("get test3");
        return "test3";
    }
}
``