Skip to main content

ThreadPoolExecutor参数详解和优化建议

· 9 min read
jeesk
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
20,
TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread();
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

}
});

按照上面参数顺序讲解

  1. $\color{#FF0000}{corePoolSize 核心线程池数量 }$
  • 创建线程池的时候没有线程, 当提交任务的时候会陆续创建线程, 当corePoolSize 满的时候, 会将任务放到队列中去, 队列满了, 那么会继续创建 corePoolIsze 到maxPoolSize之间的线程 .
  • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
  • prestartAllCoreThread 或者prestartAllCoreThread 初始化核心线程
  1. $\color{#FF0000}{maxPoolSize最大线程池数量}$
  • 当线程数量 = maxPoolSize , 且任务队列已经满了, 线程池会拒绝任务抛出一场
  • 我们的项目 这个参数是 Runtime.getRuntime().availableProcessors() * 4 +1 , 根据压力测试获得最好的最大核心数量
  1. $\color{#FF0000}{keepAliveTime 线程空闲时间}$
  • 空闲的线程能够保持空闲的时间, 超过这个时间, 这一部分线程将被回收.
  • 核心线程到最大线程数量的差异, 如果两个值相等, 那么这个参数毫无意义.
  1. $\color{#FF0000}{BlockingQueue 阻塞队列}$
  • 当核心线程数达到最大时,新任务会放在队列中排队等待执行
  • 常见的队列
    1. ArrayBlockingQueue: 有界队列,基于数组结构,按照队列FIFO原则对元素排序;
    2. LinkedBlockingQueue:无界队列,基于链表结构,按照队列FIFO原则对元素排序,Executors.newFixedThreadPool()使用了这个队列
    3. SynchronousQueue 同步队列, 该队列不存储元素,每个插入操作必须等待另一个线程调用移除操作,否则插入操作会一直被阻塞,Executors.newCachedThreadPool()使用了这个队列;
    4. PriorityBlockingQueue:优先级队列,具有优先级的无限阻塞队列。

6 . $\color{#FF0000}{ThreadFactory 线程工厂}$

  • 自定义线程的名字,daemon,优先级等
  1. $\color{#FF0000}{rejectedExecutionHandler 拒绝策略}$
  • 这里的拒绝可以采取你自定义的办法, 比如使用second 线程池处理 r , 或者丢弃r, 或者打印出日志, 取决于你的业务.
  • 线程池执行拒绝的两种情况
    1. 当线程数量达到maxPoolSize , 队列已经满了, 会拒绝新任务
    2. 当线程池调用shutdown 的时候, 会等待线程池的任务执行完毕, 再shutdown, 这是一种优雅的方式. 调用shutdown和shutdown关闭之前, 这段时间会拒绝新任务. shutdownNow, 会立刻关闭, 并且停止执行任务. 和shutdown 有很大区别.
  • 几种拒绝策略
    1. AbortPolicy 丢弃任务, 抛运行异常(如果不设置, 默认就是这一个策略)
    2. CallerRunsPolicy 执行任务
    3. DiscardPolicy 忽视,什么都不会发生
    4. DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务

线程执行的一些流程

  1. 如果当前线程池中的线程数目 小于 < corePoolSize 。则对于每个新任务,都会创建一个线程去执行这个任务(即使core线程中也有空闲线程, 也会新创建线程会执行)。
  2. 如果当前线程池中的线程数目 大于等于 >= corePoolSize 。 对于每个新任务,会将其添加到任务队列中。若添加成功,则任务由空闲的线程主动将其从队列中移除后执行。若添加失败(任务队列已满),则尝试创建新的线程执行。
  3. 若当前线程池中的线程数目到达 maximumPoolSize ,则对于新任务采取拒绝策略。
  4. 如果线程池中的数量大于 corePoolSize 时,如果某个线程空闲时间超过 keepAliveTime ,线程会被终止,直到线程池中的线程数目不超过 corePoolSize 。
  5. 如果为核心线程池中的线程设置存活时间,则当核心线程池中的线程空闲时间超过 keepAliveTime ,则该线程也会被终止
  6. 如果核心线程数已经达到, 如果没有队列没有满的话, 是不会创建新的线程. 有时候取决你使用什么队列. 比如使用 ArrayBlockingQueue(10), 当核心线程已经创建完成, 只有当队列满了之后才会继续创建新的线程. 如果你使用的是SynchronousQueue, 内部只能一个的队列,那么只有队列直接创建core线程到maxpoolSize 之间的线程

线程池性能优化建议

  1. 建议使用 prestartAllCoreThread 或者prestartAllCoreThread 初始化核心线程
  2. 考虑 allowCoreThreadTimeOut 允许核心线程能够回收节约机器的使用.
  3. 拒绝策略可以将任务, 提交给 second 线程池处理.
  4. 自定义ThreadFactory ,标识线程, 方便排查线程的使用情况

关于线程异常是否处理的问题:

  1. execute : 如果不手动捕获一场, 线程池只能重新创建新的异常来填补空白,重新创建线程这是有代价的
  2. submit: 因为能够调用 future.get(). 所以有异常也会捕获, 不会造成线程终止.
// 证明execute 的异常
@Test
public void test1() throws InterruptedException {
Thread.setDefaultUncaughtExceptionHandler((Thread t, Throwable e) -> {
log.warn("Exception in thread {}", t, e);
});
String prefix = "test";
ExecutorService threadPool = Executors.newFixedThreadPool(1, new ThreadUtil.ThreadFactoryImpl(prefix));
IntStream.rangeClosed(1, 10).forEach(i -> threadPool.execute(() -> {
if (i == 5) {
throw new RuntimeException("error");
}
log.info("I'm done : {}", i);
System.out.println(Thread.currentThread().getName() + " I'm done : " + i);
if (i < 5) {
Assert.assertEquals(prefix + "1", Thread.currentThread().getName());
} else {
Assert.assertEquals(prefix + "2", Thread.currentThread().getName());
}

}));
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
// 本来是通过test 1 线程执行的, 后面出现异常 确是test2 执行的, 说明x线程已经终止2, 并且重新创建线程
}
// 线程名称没有变, 说明已经帮你捕获异常.
String prefix = "test";
ExecutorService threadPool = Executors.newFixedThreadPool(1, new ThreadUtil.ThreadFactoryImpl(prefix));
List<Future> futures = new ArrayList<>();
IntStream.rangeClosed(1, 10).forEach(i -> futures.add(threadPool.submit(() -> {
if (i == 5) {
throw new RuntimeException("error");
}
log.info("I'm done : {}", i);
// if (i < 5) Assert.assertEquals(prefix + "1", Thread.currentThread().getName());
// else Assert.assertEquals(prefix + "2", Thread.currentThread().getName());
})));

for (Future future : futures) {
try {
future.get();
} catch (ExecutionException e) {
log.warn("future ExecutionException", e);
}
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);

面试问题

  1. 说说线程池的核心参数有哪些
  2. 说说你们的corePoolSize 的数量是如何设置,超时时间如何设置
  3. 你们使用的是什么队列, 为什么使用这个队列.
  4. 你们项目是如何优化自己的线程池参数的.
  5. 当线程池还没达到 corePoolSize 的时候, 线程池里面有空闲线程, 这个时候来了一个新的任务, 线程池是创建新的线程还是使用空闲线程 ?

路过点赞, 月入10w