博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot异步线程池及异步编程
阅读量:4219 次
发布时间:2019-05-26

本文共 7100 字,大约阅读时间需要 23 分钟。

线程池中的两个概念:线程和任务,任务是需要线程去执行的。
这里写一个支付相关的异步线程池的栗子:
1、在
application.properties中添加线程池的配置参数:
# 支付相关的配置pay.threadNamePrefix=pay-exec-pay.maxPoolSize=20pay.corePoolSize=10pay.queueCapacity=1000
2、基于注解进行参数的配置
config包下,创建
PayThreadPoolConfig.java配置类:
/** *  配置支付线程池 */@EnableAsync@Configurationpublic class PayThreadPoolsConfig {    /**     * 支付线程相关参数     */    @Value("${pay.threadNamePrefix}")    private String threadNamePrefix;    // 配置线程池中的线程名称前缀    @Value("${pay.corePoolSize}")    private Integer corePoolSize;       // 配置线程池中的核心线程数    @Value("${pay.maxPoolSize}")    private Integer maxPoolSize;        // 配置最大线程数    @Value("${pay.queueCapacity}")    private Integer queueCapacity;      // 配置队列大小    /**     * 支付线程池配置     * @return     */    @Bean    public AsyncTaskExecutor paymentTaskExexutor() {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setThreadNamePrefix(threadNamePrefix);        executor.setCorePoolSize(corePoolSize);        executor.setMaxPoolSize(maxPoolSize);        executor.setQueueCapacity(queueCapacity);        // 设        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {            @Override            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                // TODO            }        });        return executor;    }}
分析:
问题1:上述的核心线程数corePoolSize、最大线程数maxPoolSize以及队列大小queueCapacity,该如何设置?
corePoolSize:核心线程数。
1)核心线程会一直存活,即使没有任务需要执行。
2)当线程数小于核心线程时,即使有线程空闲,线程池也会优先创建新线程处理。
3)设置allowCoreThreadTimeOut=true(默认为false)时,核心线程会超时关闭。
4)当所有核心线程都忙碌时,此时如果系统需要新的线程执行别的任务,线程池不会创建新的线程,而是把任务放入任务队列(与queueCapacity相关)
queueCapacity:任务队列的容量(阻塞队列)
1)当核心线程数达到最大时,新任务会放在队列中等待执行
queueCapacity:最大线程数
1) 当线程数>=corePoolSize,且任务队列已满时。
线程池会创建新线程来处理任务。(
创建新线程的时机
2)当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
keepAliveTime:线程空闲时间
1)当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
2)如果allowCoreThreadTimeOut=true,则会直到线程数量=0
allowCoreThreadTime:允许核心线程超时
首先看下源码的缺省配置:
线程数的配置应该和业务的繁忙程度以及当前CPU的核数有关进行最合理的设置。
上述中缺省的corePoolSize是1,如果因此,如果异步要开启多个线程需要配置多个。
问题2:拒绝策略executor.setRejectedExecutionHandler中的对象该如何设置?
setRejectedExecutionHandler设置当线程池的数量达到maxPoolSize,如何处理任务。
通常看到的代码是设置为:
executor.setRejectedExecutionHandler(
new
ThreadPoolExecutor.CallerRunsPolicy());
上述new ThreadPoolExecutor.CallerRunsPolicy()表示当线程池中线程的数量达到maxPoolSize时,不在新线程中执行任务,而是有调用者所在的线程来执行。
rejectedExecutionExecutionHandler:任务拒绝处理器
两种情况下会拒绝处理任务:
1)当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务
2)当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown()之间提交任务,会拒绝新任务
线程池会调用rejectedExecutionHandler来处理这个任务。
如果没有设置默认为AborPolicy,会抛出异常。
ThreadPoolExecutor类有几个内部类来处理这类情况:
1)AbortPolicy 丢弃任务,抛运行时异常(缺省)
2)CallerRunsPolicy 执行任务(
只用调用者所在线程运行任务)
3)DiscardPolicy 忽视,什么都不会发生
4)DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行的)任务
5)实现RejectedExecutionHandler接口,可自定义处理器
ThreadPoolExecutor执行顺序
线程池按以下行为执行任务
1)当线程数小于核心线程数时,创建线程
2)当线程数大于核心线程数,且任务队列未满时,将任务放入任务队列
3)当线程数大于等于核心线程数,且任务队列已满
1、若线程数小于最大线程数,创建线程
2、若线程数大于最大线程数,抛出异常,拒绝任务
如何设置参数
1、默认值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeOut=false
rejectedExecutionHandler=AbortPolicy()
2、如何来设置
需要根据几个值来设置
1)tasks:每秒的任务数,假设为500~1000
2)taskcost:每个任务花费时间,假设为0.1s
3)responsetime:系统允许容忍的最大响应时间,假设为1s
根据上述的变量进行计算
1) 计算corePoolSize
每秒需要多少个线程处理(corePoolSize)?
方法一:threadCout=tasks/(1/taskcost)=tasks*taskscount=(500~1000)*0.1=50~100个线程。corePoolSize设置应该大于50
方法二:根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
2)计算queueCapacity
1.计算可得queueCapacity=80/0.1*1=80。意思是队列里的线程可以等待1s,超过了的需要新开线程来处理。
2. 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务徒增时,不能新开线程来执行,响应时间会随之陡增。
3)计算maxPoolSize
maxPoolSize=(max(tasks)-queueCapacity)/(1/taskcst)=(10000-80)/10=92
即:(最大任务数-队列容量)/每个线程每秒处理能力=最大线程数
4)rejectedExecutionHandler
根据具体情况来决定,任务不重要可丢弃,
任务重要则要利用缓冲机制来处理
5)keepAliveTime和allowCoreThreadTimeOut采用默认通常能满足
以上都是理性值,实际情况要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件和优化代码来降低taskcost。
3、在
async包下,创建一个
PayAsyncService.java异步服务类:
/** * 支付 异步服务类 */@Service@Slf4jpublic class PayAsyncService {    /**     * 无参数回调     */    @Async(value = "paymentTaskExexutor")    public void asyncNoReturn() {        log.info("async no return");    }    /**     * 带参数的异步回调 异步方法可以传入参数     * @param arg     */    @Async(value = "paymentTaskExexutor")    public void asyncInvokeWithParam1(CountDownLatch countDownLatch, String arg) {        log.info("async1 invoke with param is :{}", arg);        countDownLatch.countDown();    }    @Async(value = "paymentTaskExexutor")    public void asyncInvokeWithParam2(CountDownLatch countDownLatch, String arg) {        log.info("async2 invoke with param is :{}", arg);        countDownLatch.countDown();    }    /**     * 异步回调返回future     * @param arg     * @return     */    @Async(value = "paymentTaskExexutor")    public Future
asyncInvokeReturnFuture(String arg) { log.info("async invoke return future success"); for (int i=0; i<10; i++) { log.info("{} output i:{}", arg, i); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } Future
future = new AsyncResult<>(arg + " success!"); return future; } @Async(value = "paymentTaskExexutor") public Future
asyncInvokeReturnFuture2(String arg) { log.info("async invoke return future failed"); for (int i=0; i<10; i++) { log.info("{} output i:{}", arg, i); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } Future
future = new AsyncResult<>(arg + "failed!"); return future; }}
4、这里通过Contorlle层的代码进行测试:
@RestController@Slf4jpublic class ExampleController {    @Autowired    private PayAsyncService payAsyncService;    @RequestMapping(value = "/test")    public String getTest() throws InterruptedException {        payAsyncService.asyncNoReturn();        log.info("get test");        return "hello3";    }    @RequestMapping(value = "/test2")    public String getTest2() {        final CountDownLatch countDownLatch = new CountDownLatch(2);        payAsyncService.asyncInvokeWithParam1(countDownLatch,"timchen");        payAsyncService.asyncInvokeWithParam2(countDownLatch,"timchen");        log.info("get test2");        return "test2";    }    @RequestMapping(value = "/test3")    public String getTest3() throws InterruptedException, ExecutionException {        Future
future1 = payAsyncService.asyncInvokeReturnFuture( "timchen"); Future
future2 = payAsyncService.asyncInvokeReturnFuture2("xixi"); while (!future1.isDone() || !future2.isDone()) {// log.info("futre1 is:{}, futre2 is:{}", future1.isDone(), future2.isDone()); } if (future1.isDone() && future1.isDone()) { log.info("future1 return result is:{}, future2 return result is:{}.", future1.get(), future2.get()); } log.info("get test3"); return "test3"; }}

转载地址:http://agomi.baihongyu.com/

你可能感兴趣的文章
eclipse 给jar库添加源码
查看>>
3.0正式版环境搭建(4)-- 运行(3)创建的工程
查看>>
C++ 枚举声明 enum 和 enum class
查看>>
Python optionParser模块的使用方法
查看>>
android 消灭星星出错
查看>>
PyCharm 教程(三)Hello world!
查看>>
PyCharm: 显示源码行号
查看>>
cocos2dx使用第三方字库.ttf,需要注意的事项
查看>>
cocos2dx 音频模块分析(4): 音效部分
查看>>
cocos2dx 音频模块分析(5): 音效部分
查看>>
19、Cocos2dx 3.0游戏开发找小三之Action:流动的水没有形状,漂流的风找不到踪迹、、、
查看>>
cocos2.X版本lua端使用定时器的方法
查看>>
lua math.fmod使用注意小数问题
查看>>
lua 时间转化
查看>>
lua学习笔记之五(Lua中的数学库)
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第一篇:互联网时代U盘化生存方式 【张振华.Jack】
查看>>
CentOS6.4配置Hadoop-2.6.0集群配置安装指南(经过实战演练)【张振华.Jack】
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第二篇:专注的力量 [张振华.Jack]
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第三篇:我的舍与得的2014[张振华.Jack]
查看>>
【屌丝程序的口才逆袭演讲稿50篇】第五篇:不要给自己找任何借口【张振华.Jack】
查看>>