ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
20,
TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread();
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
});
按照上面参数顺序讲解
- $\color{#FF0000}{corePoolSize 核心线程池数量 }$
- 创建线程池的时候没有线程, 当提交任务的时候会陆续创建线程, 当corePoolSize 满的时候, 会将任务放到队列中去, 队列满了, 那么会继续创建 corePoolIsze 到maxPoolSize之间的线程 .
- 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
- prestartAllCoreThread 或者prestartAllCoreThread 初始化核心线程
- $\color{#FF0000}{maxPoolSize最大线程池数量}$
- 当线程数量 = maxPoolSize , 且任务队列已经满了, 线程池会拒绝任务抛出一场
- 我们的项目 这个参数是 Runtime.getRuntime().availableProcessors() * 4 +1 , 根据压力测试获得最好的最大核心数量
- $\color{#FF0000}{keepAliveTime 线程空闲时间}$
- 空闲的线程能够保持空闲的时间, 超过这个时间, 这一部分线程将被回收.
- 核心线程到最大线程数量的差异, 如果两个值相等, 那么这个参数毫无意义.
- $\color{#FF0000}{BlockingQueue 阻塞队列}$
- 当核心线程数达到最大时,新任务会放在队列中排队等待执行
- 常见的队列
- ArrayBlockingQueue: 有界队列,基于数组结构,按照队列FIFO原则对元素排序;
- LinkedBlockingQueue:无界队列,基于链表结构,按照队列FIFO原则对元素排序,Executors.newFixedThreadPool()使用了这个队列
- SynchronousQueue 同步队列, 该队列不存储元素,每个插入操作必须等待另一个线程调用移除操作,否则插入操作会一直被阻塞,Executors.newCachedThreadPool()使用了这个队列;
- PriorityBlockingQueue:优先级队列,具有优先级的无限阻塞队列。
6 . $\color{#FF0000}{ThreadFactory 线程工厂}$
- 自定义线程的名字,daemon,优先级等
- $\color{#FF0000}{rejectedExecutionHandler 拒绝策略}$
- 这里的拒绝可以采取你自定义的办法, 比如使用second 线程池处理 r , 或者丢弃r, 或者打印出日志, 取决于你的业务.
- 线程池执行拒绝的两种情况
- 当线程数量达到maxPoolSize , 队列已经满了, 会拒绝新任务
- 当线程池调用shutdown 的时候, 会等待线程池的任务执行完毕, 再shutdown, 这是一种优雅的方式. 调用shutdown和shutdown关闭之前, 这段时间会拒绝新任务. shutdownNow, 会立刻关闭, 并且停止执行任务. 和shutdown 有很大区别.
- 几种拒绝策略
- AbortPolicy 丢弃任务, 抛运行异常(如果不设置, 默认就是这一个策略)
- CallerRunsPolicy 执行任务
- DiscardPolicy 忽视,什么都不会发生
- DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
线程执行的一些流程
- 如果当前线程池中的线程数目 小于 < corePoolSize 。则对于每个新任务,都会创建一个线程去执行这个任务(即使core线程中也有空闲线程, 也会新创建线程会执行)。
- 如果当前线程池中的线程数目 大于等于 >= corePoolSize 。 对于每个新任务,会将其添加到任务队列中。若添加成功,则任务由空闲的线程主动将其从队列中移除后执行。若添加失败(任务队列已满),则尝试创建新的线程执行。
- 若当前线程池中的线程数目到达 maximumPoolSize ,则对于新任务采取拒绝策略。
- 如果线程池中的数量大于 corePoolSize 时,如果某个线程空闲时间超过 keepAliveTime ,线程会被终止,直到线程池中的线程数目不超过 corePoolSize 。
- 如果为核心线程池中的线程设置存活时间,则当核心线程池中的线程空闲时间超过 keepAliveTime ,则该线程也会被终止
- 如果核心线程数已经达到, 如果没有队列没有满的话, 是不会创建新的线程. 有时候取决你使用什么队列. 比如使用 ArrayBlockingQueue(10), 当核心线程已经创建完成, 只有当队列满了之后才会继续创建新的线程. 如果你使用的是SynchronousQueue, 内部只能一个的队列,那么只有队列直接创建core线程到maxpoolSize 之间的线程
线程池性能优化建议
- 建议使用 prestartAllCoreThread 或者prestartAllCoreThread 初始化核心线程
- 考虑 allowCoreThreadTimeOut 允许核心线程能够回收节约机器的使用.
- 拒绝策略可以将任务, 提交给 second 线程池处理.
- 自定义ThreadFactory ,标识线程, 方便排查线程的使用情况
关于线程异常是否处理的问题:
- execute : 如果不手动捕获一场, 线程池只能重新创建新的异常来填补空白,重新创建线程这是有代价的
- submit: 因为能够调用 future.get(). 所以有异常也会捕获, 不会造成线程终止.
// 证明execute 的异常
@Test
public void test1() throws InterruptedException {
Thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e) -> {
log.warn("Exception in thread {}", t, e);
});
String prefix = "test";
ExecutorService threadPool = Executors.newFixedThreadPool(1, new ThreadUtil.ThreadFactoryImpl(prefix));
IntStream.rangeClosed(1, 10).forEach(i -> threadPool.execute(() -> {
if (i == 5) {
throw new RuntimeException("error");
}
log.info("I'm done : {}", i);
System.out.println(Thread.currentThread().getName() + " I'm done : " + i);
if (i < 5) {
Assert.assertEquals(prefix + "1", Thread.currentThread().getName());
} else {
Assert.assertEquals(prefix + "2", Thread.currentThread().getName());
}
}));
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
// 本来是通过test 1 线程执行的, 后面出现异常 确是test2 执行的, 说明x线程已经终止2, 并且重新创建线程
}
// 线程名称没有变, 说明已经帮你捕获异常.
String prefix = "test";
ExecutorService threadPool = Executors.newFixedThreadPool(1, new ThreadUtil.ThreadFactoryImpl(prefix));
List<Future> futures = new ArrayList<>();
IntStream.rangeClosed(1, 10).forEach(i -> futures.add(threadPool.submit(() -> {
if (i == 5) {
throw new RuntimeException("error");
}
log.info("I'm done : {}", i);
// if (i < 5) Assert.assertEquals(prefix + "1", Thread.currentThread().getName());
// else Assert.assertEquals(prefix + "2", Thread.currentThread().getName());
})));
for (Future future : futures) {
try {
future.get();
} catch (ExecutionException e) {
log.warn("future ExecutionException", e);
}
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
面试问题
- 说说线程池的核心参数有哪些
- 说说你们的corePoolSize 的数量是如何设置,超时时间如何设置
- 你们使用的是什么队列, 为什么使用这个队列.
- 你们项目是如何优化自己的线程池参数的.
- 当线程池还没达到 corePoolSize 的时候, 线程池里面有空闲线程, 这个时候来了一个新的任务, 线程池是创建新的线程还是使用空闲线程 ?
路过点赞, 月入10w