baeldung 2024-01-08
1. 概述
Spring 的 ThreadPoolTaskExecutor 是一个 JavaBean,它对 java.util.concurrent.ThreadPoolExecutor 实例进行了封装,并将其暴露为 Spring 的 org.springframework.core.task.TaskExecutor。此外,它通过 corePoolSize、maxPoolSize、queueCapacity、allowCoreThreadTimeOut 和 keepAliveSeconds 等属性提供了高度可配置性。在本教程中,我们将重点探讨 corePoolSize 和 maxPoolSize 这两个属性。
2. corePoolSize 与 maxPoolSize
刚接触这一抽象的用户很容易混淆这两个配置属性的区别。因此,我们分别来看一下它们各自的作用。
2.1 corePoolSize
corePoolSize 表示线程池中保持存活(不会因空闲而被回收)的最小线程数量。它是 ThreadPoolTaskExecutor 的一个可配置属性。不过,ThreadPoolTaskExecutor 抽象层会将该值委托给底层的 java.util.concurrent.ThreadPoolExecutor 来设置。需要特别说明的是:如果我们将 allowCoreThreadTimeOut 设置为 true,那么即使是核心线程也可能超时终止,从而实际上将 corePoolSize 的效果等同于 0。
2.2 maxPoolSize
相比之下,maxPoolSize 定义了线程池中最多可以创建的线程数量。同样地,ThreadPoolTaskExecutor 的 maxPoolSize 属性也会委托给底层的 ThreadPoolExecutor。需要注意的是,maxPoolSize 的实际生效还依赖于 queueCapacity:只有当任务队列已满(即队列中的任务数量达到 queueCapacity)时,线程池才会创建超出 corePoolSize 的新线程,直到达到 maxPoolSize。
3. 二者究竟有何区别?
虽然 corePoolSize 和 maxPoolSize 的区别看似明显,但它们的行为仍有一些细微之处。
当我们向 ThreadPoolTaskExecutor 提交一个新任务时:
- 如果当前运行的线程数 少于
corePoolSize,即使池中有空闲线程,也会创建一个新线程来处理任务; - 如果当前运行的线程数 已经达到或超过
corePoolSize,但任务队列未满,则新任务会被放入队列中等待; - 只有当队列已满且当前线程数 小于
maxPoolSize时,才会继续创建新线程; - 如果线程数已达到
maxPoolSize且队列也满了,则根据拒绝策略处理新任务(默认抛出异常)。
接下来,我们通过代码示例来具体观察这两个属性在不同场景下的行为。
4. 示例
首先,假设我们有一个方法 startThreads,用于通过 ThreadPoolTaskExecutor 启动多个线程:
public void startThreads(ThreadPoolTaskExecutor taskExecutor, CountDownLatch countDownLatch,
int numThreads) {
for (int i = 0; i < numThreads; i++) {
taskExecutor.execute(() -> {
try {
Thread.sleep(100L * ThreadLocalRandom.current().nextLong(1, 10));
countDownLatch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
示例 1:使用默认配置
默认情况下,ThreadPoolTaskExecutor 的 corePoolSize 为 1,maxPoolSize 和 queueCapacity 均为无界(即 Integer.MAX_VALUE)。因此,无论提交多少任务,线程池都只会使用一个线程来顺序执行:
@Test
public void whenUsingDefaults_thenSingleThread() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.afterPropertiesSet();
CountDownLatch countDownLatch = new CountDownLatch(10);
this.startThreads(taskExecutor, countDownLatch, 10);
while (countDownLatch.getCount() > 0) {
Assert.assertEquals(1, taskExecutor.getPoolSize());
}
}
示例 2:设置 corePoolSize 为 5
现在我们将 corePoolSize 设为 5,其他保持默认。此时,无论提交多少任务(只要 ≥5),线程池都会立即启动 5 个线程:
@Test
public void whenCorePoolSizeFive_thenFiveThreads() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.afterPropertiesSet();
CountDownLatch countDownLatch = new CountDownLatch(10);
this.startThreads(taskExecutor, countDownLatch, 10);
while (countDownLatch.getCount() > 0) {
Assert.assertEquals(5, taskExecutor.getPoolSize());
}
}
示例 3:corePoolSize=5,maxPoolSize=10,但 queueCapacity 仍为无界
此时,尽管 maxPoolSize 是 10,但由于队列是无界的,所有超出核心线程的任务都会被放入队列,而不会触发创建额外线程。因此,仍然只有 5 个线程运行:
@Test
public void whenCorePoolSizeFiveAndMaxPoolSizeTen_thenFiveThreads() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.afterPropertiesSet();
CountDownLatch countDownLatch = new CountDownLatch(10);
this.startThreads(taskExecutor, countDownLatch, 10);
while (countDownLatch.getCount() > 0) {
Assert.assertEquals(5, taskExecutor.getPoolSize());
}
}
示例 4:设置 queueCapacity=10,并提交 20 个任务
现在我们显式设置 queueCapacity=10,并提交 20 个任务:
- 前 5 个任务 → 创建 5 个核心线程;
- 接下来的 10 个任务 → 放入队列(队列容量为 10);
- 第 16 到第 20 个任务 → 队列已满,因此线程池会继续创建新线程,直到达到
maxPoolSize=10。
最终,线程池中将有 10 个线程:
@Test
public void whenCorePoolSizeFiveAndMaxPoolSizeTenAndQueueCapacityTen_thenTenThreads() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(10);
taskExecutor.afterPropertiesSet();
CountDownLatch countDownLatch = new CountDownLatch(20);
this.startThreads(taskExecutor, countDownLatch, 20);
while (countDownLatch.getCount() > 0) {
Assert.assertEquals(10, taskExecutor.getPoolSize());
}
}
同理,如果我们把
queueCapacity设为 0(即使用SynchronousQueue),那么每提交一个任务都会立即尝试创建新线程(只要未达maxPoolSize)。例如,提交 10 个任务且maxPoolSize=10,就会创建 10 个线程。
5. 结论
ThreadPoolTaskExecutor 是对 java.util.concurrent.ThreadPoolExecutor 的强大封装,提供了灵活的配置选项,包括 corePoolSize、maxPoolSize 和 queueCapacity。在本教程中,我们深入探讨了 corePoolSize 与 maxPoolSize 的区别,并阐明了 maxPoolSize 如何与 queueCapacity 协同工作,从而帮助我们针对各种应用场景轻松构建合适的线程池。