重写ThreadPoolTaskExecutor

时间:2022-07-23
本文章向大家介绍重写ThreadPoolTaskExecutor,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

ThreadPoolExecutor:JDK内置线程池实现 ThreadPoolTaskExecutor:Spring对JDK中线程池做了一层封装

参考代码:https://github.com/Noneplus/ConcurrentDemo

创建一个SpringBoot项目

主类开启异步注解

/**
 * 开启异步注解@EnableAsync
 */
@SpringBootApplication
@EnableAsync
public class AsyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(AsyncApplication.class, args);
    }

}

创建线程池配置类

主类添加注解:@EnableConfigurationProperties({AsyncThreadPoolConfig.class} )

/**
 * @Description: 线程池参数配置
 * @Author noneplus
 * @Date 2020/8/5 19:02
 */
@ConfigurationProperties("task.pool")
public class AsyncThreadPoolConfig{

    private Integer corePoolSize;

    private Integer maxPoolSize;

    private Integer keepAliveSeconds;

    private Integer queueCapacity;

    public Integer getCorePoolSize() {
        return corePoolSize;
    }

    public void setCorePoolSize(Integer corePoolSize) {
        this.corePoolSize = corePoolSize;
    }

    public Integer getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(Integer maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public Integer getKeepAliveSeconds() {
        return keepAliveSeconds;
    }

    public void setKeepAliveSeconds(Integer keepAliveSeconds) {
        this.keepAliveSeconds = keepAliveSeconds;
    }

    public Integer getQueueCapacity() {
        return queueCapacity;
    }

    public void setQueueCapacity(Integer queueCapacity) {
        this.queueCapacity = queueCapacity;
    }
}

创建线程池实现类

继承AsyncConfigurer,重写get方法

package com.noneplus.async;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Description: 重写Spring线程池
 * @Author noneplus
 * @Date 2020/8/6 10:11
 */
public class AsyncThreadPool implements AsyncConfigurer {

    @Autowired
    AsyncThreadPoolConfig asyncThreadPoolConfig;

    /**
     * ThreadPoolTaskExecutor 对比 ThreadPoolExecutor
     * ThreadPoolExecutor:JDK内置线程池
     * ThreadPoolTaskExecutor:Spring对ThreadPoolExecutor做了一层基础封装
     *
     * 相比 ThreadPoolExecutor,ThreadPoolTaskExecutor 增加了 submitListenable 方法,
     * 该方法返回 ListenableFuture 接口对象,该接口完全抄袭了 google 的 guava。
     * ListenableFuture 接口对象,增加了线程执行完毕后成功和失败的回调方法。
     * 从而避免了 Future 需要以阻塞的方式调用 get,然后再执行成功和失败的方法。
     */
    @Override
    public Executor getAsyncExecutor() {

        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();


        //设置核心线程数,最大线程数,队列容量,线程存活时间
        threadPoolTaskExecutor.setCorePoolSize(asyncThreadPoolConfig.getCorePoolSize());
        threadPoolTaskExecutor.setMaxPoolSize(asyncThreadPoolConfig.getMaxPoolSize());
        threadPoolTaskExecutor.setQueueCapacity(asyncThreadPoolConfig.getQueueCapacity());
        threadPoolTaskExecutor.setKeepAliveSeconds(asyncThreadPoolConfig.getKeepAliveSeconds());

        //设置线程名前缀
        threadPoolTaskExecutor.setThreadNamePrefix("AsyncThreadPool-");

        // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 等待所有任务结束后再关闭线程池
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

创建一个测试类Controller

定义一个forTest方法

/**
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @Author noneplus
 * @Date 2020/8/5 18:33
 */
@RestController
public class TestController {

    @Autowired
    TestService testService;

    @GetMapping("/test")
    public String forTest()
    {
        testService.forTest();

        return "success";
    }
}

创建异步Service方法

共三个线程,sendEmail,recoredLog和主线程

@Service
public class TestService {

    @Autowired
    TaskComponent taskComponent;

    public void forTest() {

        taskComponent.sendEmail();
        taskComponent.recordLog();

        for (int i = 0; i < 10; i++) {
            System.out.println("打酱油:" + i+"当前线程:"+Thread.currentThread().getName());
        }

    }
}

定义异步的实现类

@Component
public class TaskComponent {

    @Async
    public void sendEmail()
    {
        for (int i = 0; i < 10; i++) {
            System.out.println("发送短信中:" + i+"当前线程:"+Thread.currentThread().getName());
        }
    }

    @Async
    public void recordLog()
    {
        for (int i = 0; i < 10; i++) {
            System.out.println("记录日志中:" + i+"当前线程:"+ Thread.currentThread().getName());
        }
    }

}