1. 概述
Java 8 引入了 Stream(流) 的概念,作为一种对数据执行批量操作的高效方式。在支持并发的环境中,我们可以获取 并行流(Parallel Streams)。
这类流可以带来性能提升,但代价是引入了多线程的开销。
在本篇简明教程中,我们将探讨 Stream API 的一个主要限制,并学习如何让并行流使用自定义的 ThreadPool 实例。此外,也有现成的库可以处理这个问题。
2. 并行流
让我们从一个简单的例子开始:在任意 Collection 类型上调用 parallelStream() 方法,即可返回一个可能是并行的 Stream:
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
默认情况下,这种流的处理使用的是 ForkJoinPool.commonPool() —— 这是一个由整个应用程序共享的线程池。
3. 自定义线程池
我们实际上可以在处理流时传入一个自定义的线程池。
下面的例子演示了如何让一个并行流使用自定义的 ThreadPool,来计算从 1 到 1,000,000(含)所有长整型数值的总和:
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
这里我们使用了 ForkJoinPool 构造函数,并设置了并行度为 4。针对不同环境,需要通过实验确定最佳并行度值,但一个经验法则是:根据 CPU 核心数来设定。
接着,我们在 reduce 调用中对并行流的内容进行了求和。
这个简单示例可能无法完全体现使用自定义线程池的优势,但在某些场景下其价值显而易见——例如,当我们不希望将公共线程池用于长时间运行的任务(如处理网络数据源),或者当公共线程池已被应用中的其他组件占用时。
如果我们运行上述测试方法,它会顺利通过。到目前为止一切正常。
然而,如果我们在普通方法中以与测试方法相同的方式实例化 ForkJoinPool,可能会导致 OutOfMemoryError。
接下来,我们深入分析内存泄漏的原因。
4. 警惕内存泄漏
如前所述,默认情况下整个应用程序共享一个公共线程池(common pool)。该公共线程池是一个静态的 ThreadPool 实例。
因此,如果我们使用默认线程池,则不会发生内存泄漏。
现在,回顾我们的测试方法。在测试方法中,我们创建了一个 ForkJoinPool 对象。当测试方法执行完毕后,customThreadPool 对象并不会被解除引用并被垃圾回收——相反,它会继续等待新的任务分配。
换句话说,每次调用该测试方法时,都会创建一个新的 customThreadPool 对象,而这些对象不会被释放。
解决这个问题非常简单:在执行完任务后,调用 customThreadPool 的 shutdown() 方法:
try {
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
customThreadPool.shutdown();
}
5. 结论
我们简要探讨了如何使用自定义线程池来运行并行流。在合适的环境下,并合理设置并行度,可以在某些场景中获得性能提升。
如果创建了自定义线程池,请务必记得调用其 shutdown() 方法,以避免内存泄漏。