baeldung 2023-10-05
1. 概述
在本综合教程中,我们将从 Java 8 中引入 Stream 开始,一直到 Java 9 的最新增强功能,全面讲解 Java Stream 的实际用法。
要理解本文内容,读者需具备 Java 8 的基础知识(如 Lambda 表达式、Optional、方法引用)以及对 Stream API 的基本了解。若想更熟悉这些主题,请参考我们之前的文章:《Java 8 新特性》和《Java 8 Stream 入门指南》。
2. Stream 的创建
有多种方式可以从不同来源创建 Stream 实例。一旦创建,该实例不会修改其源数据,因此可以从同一个源创建多个 Stream 实例。
2.1 空 Stream
当需要创建一个空的 Stream 时,应使用 empty() 方法:
Stream<String> streamEmpty = Stream.empty();
我们经常在创建 Stream 时使用 empty() 方法,以避免在没有元素时返回 null:
public Stream<String> streamOf(List<String> list) {
return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}
2.2 集合(Collection)的 Stream
我们可以为任意类型的集合(Collection、List、Set)创建 Stream:
Collection<String> collection = Arrays.asList("a", "b", "c");
Stream<String> streamOfCollection = collection.stream();
2.3 数组(Array)的 Stream
数组也可以作为 Stream 的来源:
Stream<String> streamOfArray = Stream.of("a", "b", "c");
也可以从现有数组或数组的一部分创建 Stream:
String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3);
2.4 使用 Stream.builder()
使用 builder() 时,应在语句右侧显式指定所需类型,否则 build() 方法将创建 Stream<Object> 的实例:
Stream<String> streamBuilder =
Stream.<String>builder().add("a").add("b").add("c").build();
2.5 使用 Stream.generate()
generate() 方法接受一个 Supplier<T> 用于生成元素。由于生成的 Stream 是无限的,开发者应指定所需大小,否则 generate() 会一直运行直到内存耗尽:
Stream<String> streamGenerated =
Stream.generate(() -> "element").limit(10);
上述代码创建了包含十个值为 "element" 的字符串序列。
2.6 使用 Stream.iterate()
另一种创建无限 Stream 的方式是使用 iterate() 方法:
Stream<Integer> streamIterated = Stream.iterate(40, n -> n + 2).limit(20);
结果 Stream 的第一个元素是 iterate() 方法的第一个参数。后续每个元素都通过对前一个元素应用指定函数得到。例如,第二个元素将是 42。
2.7 基本类型(Primitive)的 Stream
Java 8 提供了从三种基本类型(int、long 和 double)创建 Stream 的能力。由于 Stream<T> 是泛型接口,而泛型不能直接使用基本类型,因此新增了三个专用接口:IntStream、LongStream 和 DoubleStream。
使用这些新接口可避免不必要的自动装箱,从而提升性能:
IntStream intStream = IntStream.range(1, 3);
LongStream longStream = LongStream.rangeClosed(1, 3);
range(int startInclusive, int endExclusive)方法创建一个从起始值(含)到结束值(不含)的有序 Stream,步长为 1。rangeClosed(int startInclusive, int endInclusive)方法与之类似,但包含结束值。
此外,自 Java 8 起,Random 类提供了多种生成基本类型 Stream 的方法。例如,以下代码创建了一个包含三个元素的 DoubleStream:
Random random = new Random();
DoubleStream doubleStream = random.doubles(3);
2.8 字符串(String)的 Stream
我们还可以使用 String 的 chars() 方法将其作为 Stream 的来源。由于 JDK 中没有 CharStream 接口,因此使用 IntStream 来表示字符流:
IntStream streamOfChars = "abc".chars();
以下示例根据指定的正则表达式将字符串拆分为子字符串:
Stream<String> streamOfString =
Pattern.compile(", ").splitAsStream("a, b, c");
2.9 文件(File)的 Stream
此外,Java NIO 的 Files 类允许通过 lines() 方法从文本文件生成 Stream<String>,文件中的每一行成为一个 Stream 元素:
Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset =
Files.lines(path, Charset.forName("UTF-8"));
可以在 lines() 方法中指定字符集(Charset)。
3. Stream 的引用
只要只调用了中间操作(intermediate operations),我们就可以实例化一个 Stream 并持有对其的引用。一旦执行了终端操作(terminal operation),该 Stream 就不可再用。
为说明这一点,我们暂时忽略“最佳实践是将操作链式调用”的原则。尽管以下代码冗长且不推荐,但从技术上讲是合法的:
Stream<String> stream =
Stream.of("a", "b", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny();
然而,如果在调用终端操作后尝试重用同一引用,将抛出 IllegalStateException:
Optional<String> firstElement = stream.findFirst(); // 抛出异常!
由于 IllegalStateException 是 RuntimeException,编译器不会提示错误。因此务必牢记:Java 8 的 Stream 不能被重复使用。
这种行为是合理的。Stream 的设计初衷是以函数式风格对元素源执行有限的操作序列,而不是用于存储元素。
要使上述代码正常工作,应做如下修改:
List<String> elements =
Stream.of("a", "b", "c").filter(element -> element.contains("b"))
.collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();
4. Stream 管道(Pipeline)
要在数据源的元素上执行一系列操作并聚合结果,我们需要三部分:源(source)、中间操作(intermediate operations) 和 终端操作(terminal operation)。
中间操作会返回一个新的、经过修改的 Stream。例如,要从现有 Stream 中跳过若干元素,可使用 skip() 方法:
Stream<String> onceModifiedStream =
Stream.of("abcd", "bbcd", "cbcd").skip(1);
如果需要多次修改,可以链式调用多个中间操作。假设我们还需要将每个元素替换为其前几个字符的子串,可通过链式调用 skip() 和 map() 实现:
Stream<String> twiceModifiedStream =
stream.skip(1).map(element -> element.substring(0, 3));
如上所示,map() 方法接受一个 Lambda 表达式作为参数。若想深入了解 Lambda,可参考我们的教程《Lambda 表达式与函数式接口:技巧与最佳实践》。
Stream 本身并无意义;用户真正关心的是终端操作的结果——可能是某种类型的值,也可能是对每个元素执行的操作。每个 Stream 只能使用一个终端操作。
使用 Stream 最正确且便捷的方式是构建 Stream 管道,即:源 → 中间操作 → 终端操作 的链式结构:
List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1)
.map(element -> element.substring(0, 3)).sorted().count();
5. 惰性求值(Lazy Invocation)
中间操作是惰性求值的,这意味着它们仅在终端操作需要时才会被调用。
例如,定义一个每次调用都会递增内部计数器的方法:
private long counter;
private void wasCalled() {
counter++;
}
现在在 filter() 操作中调用该方法:
List<String> list = Arrays.asList("abc1", "abc2", "abc3");
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
wasCalled();
return element.contains("2");
});
虽然源中有三个元素,我们可能认为 filter() 会被调用三次,计数器变为 3。但实际上,计数器仍为 0,因为缺少终端操作,filter() 根本未被调用。
稍作修改,添加 map() 和终端操作 findFirst(),并加入日志记录调用顺序:
Optional<String> stream = list.stream().filter(element -> {
log.info("filter() was called");
return element.contains("2");
}).map(element -> {
log.info("map() was called");
return element.toUpperCase();
}).findFirst();
日志显示:filter() 被调用了两次,map() 被调用了一次。这是因为管道是垂直执行的:第一个元素不满足条件;第二个元素满足,于是继续向下执行 map();由于 findFirst() 只需一个元素,第三个元素甚至未被检查。
这种惰性机制帮助我们避免了不必要的计算。
6. 执行顺序
从性能角度看,在 Stream 管道中操作的顺序至关重要。
考虑以下代码:
long size = list.stream().map(element -> {
wasCalled();
return element.substring(0, 3);
}).skip(2).count();
执行后,计数器增加了 3,说明 map() 被调用了三次,但最终 size 仅为 1。这意味着我们对两个无用元素执行了昂贵的 map() 操作。
如果交换 skip() 和 map() 的顺序:
long size = list.stream().skip(2).map(element -> {
wasCalled();
return element.substring(0, 3);
}).count();
此时计数器仅增加 1,map() 只被调用一次。
结论:应将减少 Stream 大小的中间操作(如 skip()、filter()、distinct())放在管道的前面,以避免对后续被丢弃的元素执行不必要的计算。
7. Stream 归约(Reduction)
API 提供了许多用于聚合 Stream 的终端操作,如 count()、max()、min() 和 sum()。但这些操作使用预定义逻辑。如果需要自定义归约机制,可使用 reduce() 和 collect() 方法。
7.1 reduce() 方法
reduce() 有三种变体,区别在于参数和返回类型:
- identity:累加器的初始值,或 Stream 为空时的默认值。
- accumulator:指定元素聚合逻辑的函数。每一步都会创建新值,只有最后一个值有用,可能影响性能。
- combiner:在并行模式下合并多个累加器结果的函数。
示例:
OptionalInt reduced =
IntStream.range(1, 4).reduce((a, b) -> a + b);
// 结果:6 (1+2+3)
int reducedTwoParams =
IntStream.range(1, 4).reduce(10, (a, b) -> a + b);
// 结果:16 (10+1+2+3)
int reducedParams = Stream.of(1, 2, 3)
.reduce(10, (a, b) -> a + b, (a, b) -> {
log.info("combiner was called");
return a + b;
});
// 结果仍是 16,combiner 未被调用(因为是串行流)
要使 combiner 生效,需使用并行流:
int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
.reduce(10, (a, b) -> a + b, (a, b) -> {
log.info("combiner was called");
return a + b;
});
// 结果:36,combiner 被调用两次
并行归约过程:
- 每个线程独立执行
accumulator:10+1=11, 10+2=12, 10+3=13 - combiner 合并结果:12+13=25, 25+11=36
7.2 collect() 方法
collect() 是另一种归约方式,接受 Collector 类型参数,指定归约机制。Collectors 工具类提供了大量预定义收集器。
使用以下 Product 列表作为示例源:
List<Product> productList = Arrays.asList(
new Product(23, "potatoes"),
new Product(14, "orange"),
new Product(13, "lemon"),
new Product(23, "bread"),
new Product(13, "sugar")
);
转换为集合(Collection/List/Set):
List<String> collectorCollection =
productList.stream().map(Product::getName).collect(Collectors.toList());
归约为字符串:
String listToString = productList.stream().map(Product::getName)
.collect(Collectors.joining(", ", "[", "]"));
// 结果:"[potatoes, orange, lemon, bread, sugar]"
joining() 支持分隔符、前缀和后缀,无需手动处理末尾逻辑。
计算平均值:
double averagePrice = productList.stream()
.collect(Collectors.averagingInt(Product::getPrice));
计算总和:
int summingPrice = productList.stream()
.collect(Collectors.summingInt(Product::getPrice));
averagingXX()、summingXX() 和 summarizingXX() 支持基本类型及其包装类,并内置映射功能,无需额外 map()。
获取统计信息:
IntSummaryStatistics statistics = productList.stream()
.collect(Collectors.summarizingInt(Product::getPrice));
// 输出:IntSummaryStatistics{count=5, sum=86, min=13, average=17.200000, max=23}
可通过 getCount()、getSum() 等方法提取具体值。
分组:
Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
.collect(Collectors.groupingBy(Product::getPrice));
按价格分组。
分区(Partitioning):
Map<Boolean, List<Product>> mapPartioned = productList.stream()
.collect(Collectors.partitioningBy(element -> element.getPrice() > 15));
根据谓词将元素分为两组。
链式转换:
Set<Product> unmodifiableSet = productList.stream()
.collect(Collectors.collectingAndThen(Collectors.toSet(),
Collections::unmodifiableSet));
先转为 Set,再转为不可变集合。
自定义 Collector:
Collector<Product, ?, LinkedList<Product>> toLinkedList =
Collector.of(LinkedList::new, LinkedList::add,
(first, second) -> {
first.addAll(second);
return first;
});
LinkedList<Product> linkedListOfPersons =
productList.stream().collect(toLinkedList);
8. 并行 Stream(Parallel Streams)
Java 8 之前,并行化很复杂。Java 8 引入了函数式并行处理方式。
可通过 parallelStream() 从集合或数组创建并行 Stream:
Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel();
boolean bigPrice = streamOfCollection
.map(product -> product.getPrice() * 12)
.anyMatch(price -> price > 200);
对于非集合/数组源,使用 parallel():
IntStream intStreamParallel = IntStream.range(1, 150).parallel();
boolean isParallel = intStreamParallel.isParallel();
底层使用 ForkJoin 框架,默认使用公共线程池,目前无法指定自定义线程池。
注意事项:
- 避免在并行 Stream 中使用阻塞操作。
- 适合各任务执行时间相近的场景。
- 可通过
sequential()转回串行模式:
IntStream intStreamSequential = intStreamParallel.sequential();
boolean isParallel = intStreamSequential.isParallel(); // false
9. Java 9 对 Stream API 的增强
Java 9 为 Stream API 引入了多项改进,使代码更简洁高效。
9.1 takeWhile() 和 dropWhile()
这两个方法使用 Predicate 按条件包含或排除元素,特别适用于有序 Stream。
takeWhile():从开头收集元素,直到条件不再满足:
Stream<String> stream = Stream.iterate("", s -> s + "s")
.takeWhile(s -> s.length() < 10);
dropWhile():从开头丢弃满足条件的元素,遇到第一个不满足条件的元素后停止丢弃:
Stream<String> stream = Stream.of("a", "aa", "aaa", "aaaaa")
.dropWhile(s -> s.length() < 5);
// 结果:["aaaaa"]
9.2 增强版 iterate()
Java 9 的 iterate() 可指定终止条件,直接生成有限 Stream:
Stream.iterate(0, i -> i < 10, i -> i + 1)
.forEach(System.out::println);
// 输出 0 到 9
比 Java 8 的 iterate().limit() 更清晰。
9.3 ofNullable() 处理可能为 null 的元素
ofNullable() 在元素为 null 时返回空 Stream,避免繁琐的 null 检查:
collection.stream()
.flatMap(s -> Stream.ofNullable(map.get(s)))
.collect(Collectors.toList());
简化了条件性元素添加的逻辑。
10. 结论
Stream API 是一套强大而易懂的工具,用于处理元素序列。合理使用可大幅减少样板代码、提升程序可读性与性能。
重要提示:本文多数示例未消费 Stream(未调用
close()或终端操作)。在实际应用中,切勿让已实例化的 Stream 未被消费,否则会导致内存泄漏。