Java Stream API 教程

更新于 2025-12-29

baeldung 2023-10-05

1. 概述

在本综合教程中,我们将从 Java 8 中引入 Stream 开始,一直到 Java 9 的最新增强功能,全面讲解 Java Stream 的实际用法。

要理解本文内容,读者需具备 Java 8 的基础知识(如 Lambda 表达式、Optional、方法引用)以及对 Stream API 的基本了解。若想更熟悉这些主题,请参考我们之前的文章:《Java 8 新特性》和《Java 8 Stream 入门指南》。


2. Stream 的创建

有多种方式可以从不同来源创建 Stream 实例。一旦创建,该实例不会修改其源数据,因此可以从同一个源创建多个 Stream 实例。

2.1 空 Stream

当需要创建一个空的 Stream 时,应使用 empty() 方法:

Stream<String> streamEmpty = Stream.empty();

我们经常在创建 Stream 时使用 empty() 方法,以避免在没有元素时返回 null

public Stream<String> streamOf(List<String> list) {
    return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}

2.2 集合(Collection)的 Stream

我们可以为任意类型的集合(CollectionListSet)创建 Stream:

Collection<String> collection = Arrays.asList("a", "b", "c");
Stream<String> streamOfCollection = collection.stream();

2.3 数组(Array)的 Stream

数组也可以作为 Stream 的来源:

Stream<String> streamOfArray = Stream.of("a", "b", "c");

也可以从现有数组或数组的一部分创建 Stream:

String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3);

2.4 使用 Stream.builder()

使用 builder() 时,应在语句右侧显式指定所需类型,否则 build() 方法将创建 Stream<Object> 的实例:

Stream<String> streamBuilder =
  Stream.<String>builder().add("a").add("b").add("c").build();

2.5 使用 Stream.generate()

generate() 方法接受一个 Supplier<T> 用于生成元素。由于生成的 Stream 是无限的,开发者应指定所需大小,否则 generate() 会一直运行直到内存耗尽:

Stream<String> streamGenerated =
  Stream.generate(() -> "element").limit(10);

上述代码创建了包含十个值为 "element" 的字符串序列。

2.6 使用 Stream.iterate()

另一种创建无限 Stream 的方式是使用 iterate() 方法:

Stream<Integer> streamIterated = Stream.iterate(40, n -> n + 2).limit(20);

结果 Stream 的第一个元素是 iterate() 方法的第一个参数。后续每个元素都通过对前一个元素应用指定函数得到。例如,第二个元素将是 42。

2.7 基本类型(Primitive)的 Stream

Java 8 提供了从三种基本类型(intlongdouble)创建 Stream 的能力。由于 Stream<T> 是泛型接口,而泛型不能直接使用基本类型,因此新增了三个专用接口:IntStreamLongStreamDoubleStream

使用这些新接口可避免不必要的自动装箱,从而提升性能:

IntStream intStream = IntStream.range(1, 3);
LongStream longStream = LongStream.rangeClosed(1, 3);
  • range(int startInclusive, int endExclusive) 方法创建一个从起始值(含)到结束值(不含)的有序 Stream,步长为 1。
  • rangeClosed(int startInclusive, int endInclusive) 方法与之类似,但包含结束值。

此外,自 Java 8 起,Random 类提供了多种生成基本类型 Stream 的方法。例如,以下代码创建了一个包含三个元素的 DoubleStream

Random random = new Random();
DoubleStream doubleStream = random.doubles(3);

2.8 字符串(String)的 Stream

我们还可以使用 Stringchars() 方法将其作为 Stream 的来源。由于 JDK 中没有 CharStream 接口,因此使用 IntStream 来表示字符流:

IntStream streamOfChars = "abc".chars();

以下示例根据指定的正则表达式将字符串拆分为子字符串:

Stream<String> streamOfString =
  Pattern.compile(", ").splitAsStream("a, b, c");

2.9 文件(File)的 Stream

此外,Java NIO 的 Files 类允许通过 lines() 方法从文本文件生成 Stream<String>,文件中的每一行成为一个 Stream 元素:

Path path = Paths.get("C:\\file.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset = 
  Files.lines(path, Charset.forName("UTF-8"));

可以在 lines() 方法中指定字符集(Charset)。


3. Stream 的引用

只要只调用了中间操作(intermediate operations),我们就可以实例化一个 Stream 并持有对其的引用。一旦执行了终端操作(terminal operation),该 Stream 就不可再用。

为说明这一点,我们暂时忽略“最佳实践是将操作链式调用”的原则。尽管以下代码冗长且不推荐,但从技术上讲是合法的:

Stream<String> stream = 
  Stream.of("a", "b", "c").filter(element -> element.contains("b"));
Optional<String> anyElement = stream.findAny();

然而,如果在调用终端操作后尝试重用同一引用,将抛出 IllegalStateException

Optional<String> firstElement = stream.findFirst(); // 抛出异常!

由于 IllegalStateExceptionRuntimeException,编译器不会提示错误。因此务必牢记:Java 8 的 Stream 不能被重复使用

这种行为是合理的。Stream 的设计初衷是以函数式风格对元素源执行有限的操作序列,而不是用于存储元素。

要使上述代码正常工作,应做如下修改:

List<String> elements =
  Stream.of("a", "b", "c").filter(element -> element.contains("b"))
    .collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();

4. Stream 管道(Pipeline)

要在数据源的元素上执行一系列操作并聚合结果,我们需要三部分:源(source)中间操作(intermediate operations)终端操作(terminal operation)

中间操作会返回一个新的、经过修改的 Stream。例如,要从现有 Stream 中跳过若干元素,可使用 skip() 方法:

Stream<String> onceModifiedStream =
  Stream.of("abcd", "bbcd", "cbcd").skip(1);

如果需要多次修改,可以链式调用多个中间操作。假设我们还需要将每个元素替换为其前几个字符的子串,可通过链式调用 skip()map() 实现:

Stream<String> twiceModifiedStream =
  stream.skip(1).map(element -> element.substring(0, 3));

如上所示,map() 方法接受一个 Lambda 表达式作为参数。若想深入了解 Lambda,可参考我们的教程《Lambda 表达式与函数式接口:技巧与最佳实践》。

Stream 本身并无意义;用户真正关心的是终端操作的结果——可能是某种类型的值,也可能是对每个元素执行的操作。每个 Stream 只能使用一个终端操作

使用 Stream 最正确且便捷的方式是构建 Stream 管道,即:源 → 中间操作 → 终端操作 的链式结构:

List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1)
  .map(element -> element.substring(0, 3)).sorted().count();

5. 惰性求值(Lazy Invocation)

中间操作是惰性求值的,这意味着它们仅在终端操作需要时才会被调用。

例如,定义一个每次调用都会递增内部计数器的方法:

private long counter;
 
private void wasCalled() {
    counter++;
}

现在在 filter() 操作中调用该方法:

List<String> list = Arrays.asList("abc1", "abc2", "abc3");
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
    wasCalled();
    return element.contains("2");
});

虽然源中有三个元素,我们可能认为 filter() 会被调用三次,计数器变为 3。但实际上,计数器仍为 0,因为缺少终端操作,filter() 根本未被调用。

稍作修改,添加 map() 和终端操作 findFirst(),并加入日志记录调用顺序:

Optional<String> stream = list.stream().filter(element -> {
    log.info("filter() was called");
    return element.contains("2");
}).map(element -> {
    log.info("map() was called");
    return element.toUpperCase();
}).findFirst();

日志显示:filter() 被调用了两次,map() 被调用了一次。这是因为管道是垂直执行的:第一个元素不满足条件;第二个元素满足,于是继续向下执行 map();由于 findFirst() 只需一个元素,第三个元素甚至未被检查。

这种惰性机制帮助我们避免了不必要的计算。


6. 执行顺序

从性能角度看,在 Stream 管道中操作的顺序至关重要

考虑以下代码:

long size = list.stream().map(element -> {
    wasCalled();
    return element.substring(0, 3);
}).skip(2).count();

执行后,计数器增加了 3,说明 map() 被调用了三次,但最终 size 仅为 1。这意味着我们对两个无用元素执行了昂贵的 map() 操作。

如果交换 skip()map() 的顺序:

long size = list.stream().skip(2).map(element -> {
    wasCalled();
    return element.substring(0, 3);
}).count();

此时计数器仅增加 1,map() 只被调用一次。

结论:应将减少 Stream 大小的中间操作(如 skip()filter()distinct())放在管道的前面,以避免对后续被丢弃的元素执行不必要的计算。


7. Stream 归约(Reduction)

API 提供了许多用于聚合 Stream 的终端操作,如 count()max()min()sum()。但这些操作使用预定义逻辑。如果需要自定义归约机制,可使用 reduce()collect() 方法。

7.1 reduce() 方法

reduce() 有三种变体,区别在于参数和返回类型:

  • identity:累加器的初始值,或 Stream 为空时的默认值。
  • accumulator:指定元素聚合逻辑的函数。每一步都会创建新值,只有最后一个值有用,可能影响性能。
  • combiner:在并行模式下合并多个累加器结果的函数。

示例:

OptionalInt reduced =
  IntStream.range(1, 4).reduce((a, b) -> a + b);
// 结果:6 (1+2+3)

int reducedTwoParams =
  IntStream.range(1, 4).reduce(10, (a, b) -> a + b);
// 结果:16 (10+1+2+3)

int reducedParams = Stream.of(1, 2, 3)
  .reduce(10, (a, b) -> a + b, (a, b) -> {
     log.info("combiner was called");
     return a + b;
  });
// 结果仍是 16,combiner 未被调用(因为是串行流)

要使 combiner 生效,需使用并行流:

int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
    .reduce(10, (a, b) -> a + b, (a, b) -> {
       log.info("combiner was called");
       return a + b;
    });
// 结果:36,combiner 被调用两次

并行归约过程:

  • 每个线程独立执行 accumulator:10+1=11, 10+2=12, 10+3=13
  • combiner 合并结果:12+13=25, 25+11=36

7.2 collect() 方法

collect() 是另一种归约方式,接受 Collector 类型参数,指定归约机制。Collectors 工具类提供了大量预定义收集器。

使用以下 Product 列表作为示例源:

List<Product> productList = Arrays.asList(
  new Product(23, "potatoes"),
  new Product(14, "orange"),
  new Product(13, "lemon"),
  new Product(23, "bread"),
  new Product(13, "sugar")
);

转换为集合(Collection/List/Set):

List<String> collectorCollection = 
  productList.stream().map(Product::getName).collect(Collectors.toList());

归约为字符串:

String listToString = productList.stream().map(Product::getName)
  .collect(Collectors.joining(", ", "[", "]"));
// 结果:"[potatoes, orange, lemon, bread, sugar]"

joining() 支持分隔符、前缀和后缀,无需手动处理末尾逻辑。

计算平均值:

double averagePrice = productList.stream()
  .collect(Collectors.averagingInt(Product::getPrice));

计算总和:

int summingPrice = productList.stream()
  .collect(Collectors.summingInt(Product::getPrice));

averagingXX()summingXX()summarizingXX() 支持基本类型及其包装类,并内置映射功能,无需额外 map()

获取统计信息:

IntSummaryStatistics statistics = productList.stream()
  .collect(Collectors.summarizingInt(Product::getPrice));
// 输出:IntSummaryStatistics{count=5, sum=86, min=13, average=17.200000, max=23}

可通过 getCount()getSum() 等方法提取具体值。

分组:

Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
  .collect(Collectors.groupingBy(Product::getPrice));

按价格分组。

分区(Partitioning):

Map<Boolean, List<Product>> mapPartioned = productList.stream()
  .collect(Collectors.partitioningBy(element -> element.getPrice() > 15));

根据谓词将元素分为两组。

链式转换:

Set<Product> unmodifiableSet = productList.stream()
  .collect(Collectors.collectingAndThen(Collectors.toSet(),
  Collections::unmodifiableSet));

先转为 Set,再转为不可变集合。

自定义 Collector:

Collector<Product, ?, LinkedList<Product>> toLinkedList =
  Collector.of(LinkedList::new, LinkedList::add, 
    (first, second) -> { 
       first.addAll(second); 
       return first; 
    });

LinkedList<Product> linkedListOfPersons =
  productList.stream().collect(toLinkedList);

8. 并行 Stream(Parallel Streams)

Java 8 之前,并行化很复杂。Java 8 引入了函数式并行处理方式。

可通过 parallelStream() 从集合或数组创建并行 Stream:

Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel();
boolean bigPrice = streamOfCollection
  .map(product -> product.getPrice() * 12)
  .anyMatch(price -> price > 200);

对于非集合/数组源,使用 parallel()

IntStream intStreamParallel = IntStream.range(1, 150).parallel();
boolean isParallel = intStreamParallel.isParallel();

底层使用 ForkJoin 框架,默认使用公共线程池,目前无法指定自定义线程池。

注意事项

  • 避免在并行 Stream 中使用阻塞操作。
  • 适合各任务执行时间相近的场景。
  • 可通过 sequential() 转回串行模式:
IntStream intStreamSequential = intStreamParallel.sequential();
boolean isParallel = intStreamSequential.isParallel(); // false

9. Java 9 对 Stream API 的增强

Java 9 为 Stream API 引入了多项改进,使代码更简洁高效。

9.1 takeWhile()dropWhile()

这两个方法使用 Predicate 按条件包含或排除元素,特别适用于有序 Stream。

  • takeWhile():从开头收集元素,直到条件不再满足:
Stream<String> stream = Stream.iterate("", s -> s + "s")
  .takeWhile(s -> s.length() < 10);
  • dropWhile():从开头丢弃满足条件的元素,遇到第一个不满足条件的元素后停止丢弃:
Stream<String> stream = Stream.of("a", "aa", "aaa", "aaaaa")
  .dropWhile(s -> s.length() < 5);
// 结果:["aaaaa"]

9.2 增强版 iterate()

Java 9 的 iterate() 可指定终止条件,直接生成有限 Stream:

Stream.iterate(0, i -> i < 10, i -> i + 1)
  .forEach(System.out::println);
// 输出 0 到 9

比 Java 8 的 iterate().limit() 更清晰。

9.3 ofNullable() 处理可能为 null 的元素

ofNullable() 在元素为 null 时返回空 Stream,避免繁琐的 null 检查:

collection.stream()
  .flatMap(s -> Stream.ofNullable(map.get(s)))
  .collect(Collectors.toList());

简化了条件性元素添加的逻辑。


10. 结论

Stream API 是一套强大而易懂的工具,用于处理元素序列。合理使用可大幅减少样板代码、提升程序可读性与性能。

重要提示:本文多数示例未消费 Stream(未调用 close() 或终端操作)。在实际应用中,切勿让已实例化的 Stream 未被消费,否则会导致内存泄漏。