Java Stream API

更新于 2025-12-26

Jakob Jenkov 2019-04-28

Java Stream API 提供了一种函数式的方式来处理对象集合。Java Stream API 是在 Java 8 中与若干其他函数式编程特性一同引入的。本教程将解释这些函数式流(Stream)的工作原理以及如何使用它们。

注意:Java Stream API 与 Java IO 中的 Java InputStreamJava OutputStream 没有关系InputStreamOutputStream 处理的是字节流,而 Java Stream API 处理的是对象流。

Java Stream 的定义

Java Stream 是一个能够对其元素进行内部迭代(internal iteration)的组件,也就是说,它自己可以遍历其元素。相比之下,当你使用 Java 集合 的迭代功能(例如 Java Iterator 或配合 Java Iterable 使用的 for-each 循环)时,你需要自己实现元素的遍历逻辑。


流处理(Stream Processing)

你可以向 Stream 添加监听器(listener)。当 Stream 内部遍历其元素时,这些监听器会被调用。每个监听器都会被调用一次以处理流中的每个元素,这种方式称为流处理(stream processing)。

流的监听器形成一条(chain)。链中的第一个监听器可以处理当前元素,并返回一个新元素供链中下一个监听器处理。监听器可以返回相同的元素,也可以返回一个新元素,这取决于该监听器(处理器)的目的。


获取 Stream

获取 Java Stream 的方式有很多。最常见的方式之一是从 Java 集合 中获取。以下是从 Java List 获取 Stream 的示例:

List<String> items = new ArrayList<String>();
items.add("one");
items.add("two");
items.add("three");

Stream<String> stream = items.stream();

此示例首先创建一个 Java List,然后向其中添加三个 Java 字符串,最后调用 stream() 方法获取一个 Stream 实例。


终端操作与非终端操作

Stream 接口包含一系列终端操作(terminal operations)和非终端操作(non-terminal operations)。

  • 非终端操作:向流添加一个监听器,但不执行任何其他操作。
  • 终端操作:启动流的内部迭代,调用所有监听器,并返回结果。

下面是一个同时包含非终端操作和终端操作的 Java Stream 示例:

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

public class StreamExamples {
    public static void main(String[] args) {
        List<String> stringList = new ArrayList<String>();
        stringList.add("ONE");
        stringList.add("TWO");
        stringList.add("THREE");

        Stream<String> stream = stringList.stream();

        long count = stream
                .map((value) -> {
                    return value.toLowerCase();
                })
                .count();

        System.out.println("count = " + count);
    }
}
  • map() 方法的调用是一个非终端操作。它只是在流上设置了一个 lambda 表达式,用于将每个元素转换为小写。
  • count() 方法的调用是一个终端操作。它启动了内部迭代,导致每个元素先被转为小写,然后被计数。

注意:元素转为小写实际上并不影响元素数量。这里只是为了演示非终端操作的作用。


非终端操作

Java Stream API 的非终端操作用于转换过滤流中的元素。当你向流添加一个非终端操作时,会返回一个新的流作为结果。这个新流代表了原始流在应用该非终端操作后产生的元素流。

以下是一个向流添加非终端操作并获得新流的示例:

List<String> stringList = new ArrayList<String>();
stringList.add("ONE");
stringList.add("TWO");
stringList.add("THREE");

Stream<String> stream = stringList.stream();

Stream<String> stringStream = stream.map((value) -> {
    return value.toLowerCase();
});

注意对 stream.map() 的调用实际上返回了一个新的 Stream 实例,表示应用了 map 操作后的原始字符串流。

重要提示:你只能向一个给定的 Stream 实例添加一个操作。如果需要链式调用多个操作,必须将第二个操作应用于第一个操作返回的 Stream。如下所示:

Stream<String> stringStream1 = stream.map((value) -> {
    return value.toLowerCase();
});

Stream<String> stringStream2 = stringStream1.map((value) -> {
    return value.toUpperCase();
});

通常我们会直接链式调用非终端操作,如下所示:

Stream<String> stream1 = stream
    .map((value) -> { return value.toLowerCase(); })
    .map((value) -> { return value.toUpperCase(); })
    .map((value) -> { return value.substring(0,3); });

许多非终端 Stream 操作可以接受一个 Java Lambda 表达式 作为参数。该 lambda 表达式实现了一个适合该非终端操作的 Java 函数式接口,例如 FunctionPredicate。这是因为非终端操作的方法参数通常是函数式接口。

filter()

filter() 方法用于从 Java Stream 中过滤元素。它接收一个 Predicate,对流中的每个元素调用该 Predicate。如果元素应包含在结果流中,则 Predicate 应返回 true;否则返回 false

示例:

Stream<String> longStringsStream = stream.filter((value) -> {
    return value.length() >= 3;
});

map()

map() 方法将一个元素转换(映射)为另一个对象。例如,可将字符串列表中的每个字符串转换为小写、大写或子字符串等。

示例:

List<String> list = new ArrayList<String>();
Stream<String> stream = list.stream();
Stream<String> streamMapped = stream.map((value) -> value.toUpperCase());

flatMap()

flatMap() 方法将单个元素映射为多个元素。其思想是“展平”具有多个内部元素的复杂结构,变成仅包含这些内部元素的“扁平”流。

例如:

  • 将包含嵌套对象的对象展平为其自身及子对象;
  • List 的流展平为元素本身的流;
  • 将字符串流展平为单词流或字符流。

示例:将字符串列表展平为单词流:

List<String> stringList = new ArrayList<String>();
stringList.add("One flew over the cuckoo's nest");
stringList.add("To kill a muckingbird");
stringList.add("Gone with the wind");

Stream<String> stream = stringList.stream();

stream.flatMap((value) -> {
        String[] split = value.split(" ");
        return Arrays.asList(split).stream();
    })
    .forEach((value) -> System.out.println(value));

注意:此示例以 forEach() 结尾,这是一个终端操作,用于触发内部迭代。若无终端操作,flatMap() 不会实际执行。

distinct()

distinct() 方法返回一个仅包含原始流中不重复元素的新流,自动去除重复项。

示例:

List<String> stringList = new ArrayList<String>();
stringList.add("one");
stringList.add("two");
stringList.add("three");
stringList.add("one");

Stream<String> stream = stringList.stream();
List<String> distinctStrings = stream
    .distinct()
    .collect(Collectors.toList());

System.out.println(distinctStrings); // 输出: [one, two, three]

limit()

limit(n) 方法将流中的元素数量限制为最多 n 个。

示例:

stream
    .limit(2)
    .forEach(element -> System.out.println(element));

peek()

peek() 方法接收一个 Consumer,对流中每个元素调用该 Consumer,主要用于调试(“窥视”元素),不用于转换。它返回包含原始元素的新流。

示例:

Stream<String> streamPeeked = stream.peek((value) -> {
    System.out.println(value);
});

注意:peek() 不会启动内部迭代,仍需调用终端操作。


终端操作

终端操作通常返回单个值。一旦在流上调用终端操作,流及其链式流的内部迭代就会开始,最终返回结果。

终端操作通常不会返回新的 Stream 实例,因此调用终端操作后,非终端操作的链式调用就结束了。

示例:

long count = stream
    .map((value) -> value.toLowerCase())
    .map((value) -> value.toUpperCase())
    .map((value) -> value.substring(0,3))
    .count(); // 终端操作

anyMatch()

检查流中是否存在至少一个元素满足给定 Predicate

boolean anyMatch = stream.anyMatch(value -> value.startsWith("One"));

allMatch()

检查流中是否所有元素都满足给定 Predicate

boolean allMatch = stream.allMatch(value -> value.startsWith("One"));

noneMatch()

检查流中是否没有任何元素满足给定 Predicate

boolean noneMatch = stream.noneMatch(element -> "xyz".equals(element));

collect()

将流中的元素收集到集合或其他对象中。通常与 Collectors 工具类一起使用。

List<String> stringsAsUppercaseList = stream
    .map(value -> value.toUpperCase())
    .collect(Collectors.toList());

count()

统计流中元素的数量。

long count = stream
    .flatMap(value -> Arrays.asList(value.split(" ")).stream())
    .count();

findAny()

从流中任意获取一个元素(不保证顺序),返回 Optional<T>

Optional<String> anyElement = stream.findAny();

findFirst()

获取流中的第一个元素,返回 Optional<T>

Optional<String> result = stream.findFirst();

forEach()

对流中每个元素执行指定操作(Consumer),无返回值。

stream.forEach(element -> System.out.println(element));

min() / max()

根据提供的 Comparator 返回流中的最小或最大元素。

Optional<String> min = stream.min((val1, val2) -> val1.compareTo(val2));
Optional<String> max = stream.max((val1, val2) -> val1.compareTo(val2));

reduce()

将流中所有元素归约为单个值。

Optional<String> reduced = stream.reduce((value, combinedValue) -> {
    return combinedValue + " + " + value;
});

toArray()

将流中所有元素转为 Object[] 数组。

Object[] objects = stream.toArray();

连接流(Concatenate Streams)

使用 Stream.concat(stream1, stream2) 可将两个流连接成一个新流。

Stream<String> concatStream = Stream.concat(stream1, stream2);
List<String> result = concatStream.collect(Collectors.toList());

从数组创建流

使用 Stream.of() 可从一个或多个对象创建流:

Stream<String> streamOf = Stream.of("one", "two", "three");

对 Java Stream API 的批评

尽管 Java Stream API 功能强大,但与其他真正的流处理 API(如 Apache Kafka Streams)相比,存在一些局限性:

1. 批处理,而非流处理

Java Stream API 实际上是批处理 API,而非真正的流处理 API。它的终端操作只有在处理完最后一个元素后才返回结果。这意味着它必须知道流有“结尾”——这在真正的无限数据流中是不可能的。

2. 链式结构,而非图结构

Java Stream 只能形成线性链(chain),每个流只能有一个后续操作。而真正的流处理系统(如 Kafka Streams)支持图结构(graph),允许多个监听器并行处理同一数据源,形成复杂的 DAG(有向无环图)。

3. 内部迭代,而非外部迭代

Java Stream 采用内部迭代(由终端操作触发),用户无法控制迭代过程。相比之下,外部迭代(如手动调用 next())更易于测试和调试,也支持从图中多个节点注入数据。


总结:Java Stream API 是一个强大的集合处理工具,适用于有限数据集的函数式批处理,但不适合构建真正的实时流处理系统。