Jakob Jenkov 2019-04-28
Java Stream API 提供了一种函数式的方式来处理对象集合。Java Stream API 是在 Java 8 中与若干其他函数式编程特性一同引入的。本教程将解释这些函数式流(Stream)的工作原理以及如何使用它们。
注意:Java Stream API 与 Java IO 中的 Java InputStream 和 Java OutputStream 没有关系。
InputStream和OutputStream处理的是字节流,而 Java Stream API 处理的是对象流。
Java Stream 的定义
Java Stream 是一个能够对其元素进行内部迭代(internal iteration)的组件,也就是说,它自己可以遍历其元素。相比之下,当你使用 Java 集合 的迭代功能(例如 Java Iterator 或配合 Java Iterable 使用的 for-each 循环)时,你需要自己实现元素的遍历逻辑。
流处理(Stream Processing)
你可以向 Stream 添加监听器(listener)。当 Stream 内部遍历其元素时,这些监听器会被调用。每个监听器都会被调用一次以处理流中的每个元素,这种方式称为流处理(stream processing)。
流的监听器形成一条链(chain)。链中的第一个监听器可以处理当前元素,并返回一个新元素供链中下一个监听器处理。监听器可以返回相同的元素,也可以返回一个新元素,这取决于该监听器(处理器)的目的。
获取 Stream
获取 Java Stream 的方式有很多。最常见的方式之一是从 Java 集合 中获取。以下是从 Java List 获取 Stream 的示例:
List<String> items = new ArrayList<String>();
items.add("one");
items.add("two");
items.add("three");
Stream<String> stream = items.stream();
此示例首先创建一个 Java List,然后向其中添加三个 Java 字符串,最后调用 stream() 方法获取一个 Stream 实例。
终端操作与非终端操作
Stream 接口包含一系列终端操作(terminal operations)和非终端操作(non-terminal operations)。
- 非终端操作:向流添加一个监听器,但不执行任何其他操作。
- 终端操作:启动流的内部迭代,调用所有监听器,并返回结果。
下面是一个同时包含非终端操作和终端操作的 Java Stream 示例:
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
public class StreamExamples {
public static void main(String[] args) {
List<String> stringList = new ArrayList<String>();
stringList.add("ONE");
stringList.add("TWO");
stringList.add("THREE");
Stream<String> stream = stringList.stream();
long count = stream
.map((value) -> {
return value.toLowerCase();
})
.count();
System.out.println("count = " + count);
}
}
- 对
map()方法的调用是一个非终端操作。它只是在流上设置了一个 lambda 表达式,用于将每个元素转换为小写。 - 对
count()方法的调用是一个终端操作。它启动了内部迭代,导致每个元素先被转为小写,然后被计数。
注意:元素转为小写实际上并不影响元素数量。这里只是为了演示非终端操作的作用。
非终端操作
Java Stream API 的非终端操作用于转换或过滤流中的元素。当你向流添加一个非终端操作时,会返回一个新的流作为结果。这个新流代表了原始流在应用该非终端操作后产生的元素流。
以下是一个向流添加非终端操作并获得新流的示例:
List<String> stringList = new ArrayList<String>();
stringList.add("ONE");
stringList.add("TWO");
stringList.add("THREE");
Stream<String> stream = stringList.stream();
Stream<String> stringStream = stream.map((value) -> {
return value.toLowerCase();
});
注意对 stream.map() 的调用实际上返回了一个新的 Stream 实例,表示应用了 map 操作后的原始字符串流。
重要提示:你只能向一个给定的
Stream实例添加一个操作。如果需要链式调用多个操作,必须将第二个操作应用于第一个操作返回的Stream。如下所示:
Stream<String> stringStream1 = stream.map((value) -> {
return value.toLowerCase();
});
Stream<String> stringStream2 = stringStream1.map((value) -> {
return value.toUpperCase();
});
通常我们会直接链式调用非终端操作,如下所示:
Stream<String> stream1 = stream
.map((value) -> { return value.toLowerCase(); })
.map((value) -> { return value.toUpperCase(); })
.map((value) -> { return value.substring(0,3); });
许多非终端 Stream 操作可以接受一个 Java Lambda 表达式 作为参数。该 lambda 表达式实现了一个适合该非终端操作的 Java 函数式接口,例如 Function 或 Predicate。这是因为非终端操作的方法参数通常是函数式接口。
filter()
filter() 方法用于从 Java Stream 中过滤元素。它接收一个 Predicate,对流中的每个元素调用该 Predicate。如果元素应包含在结果流中,则 Predicate 应返回 true;否则返回 false。
示例:
Stream<String> longStringsStream = stream.filter((value) -> {
return value.length() >= 3;
});
map()
map() 方法将一个元素转换(映射)为另一个对象。例如,可将字符串列表中的每个字符串转换为小写、大写或子字符串等。
示例:
List<String> list = new ArrayList<String>();
Stream<String> stream = list.stream();
Stream<String> streamMapped = stream.map((value) -> value.toUpperCase());
flatMap()
flatMap() 方法将单个元素映射为多个元素。其思想是“展平”具有多个内部元素的复杂结构,变成仅包含这些内部元素的“扁平”流。
例如:
- 将包含嵌套对象的对象展平为其自身及子对象;
- 将
List的流展平为元素本身的流; - 将字符串流展平为单词流或字符流。
示例:将字符串列表展平为单词流:
List<String> stringList = new ArrayList<String>();
stringList.add("One flew over the cuckoo's nest");
stringList.add("To kill a muckingbird");
stringList.add("Gone with the wind");
Stream<String> stream = stringList.stream();
stream.flatMap((value) -> {
String[] split = value.split(" ");
return Arrays.asList(split).stream();
})
.forEach((value) -> System.out.println(value));
注意:此示例以
forEach()结尾,这是一个终端操作,用于触发内部迭代。若无终端操作,flatMap()不会实际执行。
distinct()
distinct() 方法返回一个仅包含原始流中不重复元素的新流,自动去除重复项。
示例:
List<String> stringList = new ArrayList<String>();
stringList.add("one");
stringList.add("two");
stringList.add("three");
stringList.add("one");
Stream<String> stream = stringList.stream();
List<String> distinctStrings = stream
.distinct()
.collect(Collectors.toList());
System.out.println(distinctStrings); // 输出: [one, two, three]
limit()
limit(n) 方法将流中的元素数量限制为最多 n 个。
示例:
stream
.limit(2)
.forEach(element -> System.out.println(element));
peek()
peek() 方法接收一个 Consumer,对流中每个元素调用该 Consumer,主要用于调试(“窥视”元素),不用于转换。它返回包含原始元素的新流。
示例:
Stream<String> streamPeeked = stream.peek((value) -> {
System.out.println(value);
});
注意:
peek()不会启动内部迭代,仍需调用终端操作。
终端操作
终端操作通常返回单个值。一旦在流上调用终端操作,流及其链式流的内部迭代就会开始,最终返回结果。
终端操作通常不会返回新的 Stream 实例,因此调用终端操作后,非终端操作的链式调用就结束了。
示例:
long count = stream
.map((value) -> value.toLowerCase())
.map((value) -> value.toUpperCase())
.map((value) -> value.substring(0,3))
.count(); // 终端操作
anyMatch()
检查流中是否存在至少一个元素满足给定 Predicate。
boolean anyMatch = stream.anyMatch(value -> value.startsWith("One"));
allMatch()
检查流中是否所有元素都满足给定 Predicate。
boolean allMatch = stream.allMatch(value -> value.startsWith("One"));
noneMatch()
检查流中是否没有任何元素满足给定 Predicate。
boolean noneMatch = stream.noneMatch(element -> "xyz".equals(element));
collect()
将流中的元素收集到集合或其他对象中。通常与 Collectors 工具类一起使用。
List<String> stringsAsUppercaseList = stream
.map(value -> value.toUpperCase())
.collect(Collectors.toList());
count()
统计流中元素的数量。
long count = stream
.flatMap(value -> Arrays.asList(value.split(" ")).stream())
.count();
findAny()
从流中任意获取一个元素(不保证顺序),返回 Optional<T>。
Optional<String> anyElement = stream.findAny();
findFirst()
获取流中的第一个元素,返回 Optional<T>。
Optional<String> result = stream.findFirst();
forEach()
对流中每个元素执行指定操作(Consumer),无返回值。
stream.forEach(element -> System.out.println(element));
min() / max()
根据提供的 Comparator 返回流中的最小或最大元素。
Optional<String> min = stream.min((val1, val2) -> val1.compareTo(val2));
Optional<String> max = stream.max((val1, val2) -> val1.compareTo(val2));
reduce()
将流中所有元素归约为单个值。
Optional<String> reduced = stream.reduce((value, combinedValue) -> {
return combinedValue + " + " + value;
});
toArray()
将流中所有元素转为 Object[] 数组。
Object[] objects = stream.toArray();
连接流(Concatenate Streams)
使用 Stream.concat(stream1, stream2) 可将两个流连接成一个新流。
Stream<String> concatStream = Stream.concat(stream1, stream2);
List<String> result = concatStream.collect(Collectors.toList());
从数组创建流
使用 Stream.of() 可从一个或多个对象创建流:
Stream<String> streamOf = Stream.of("one", "two", "three");
对 Java Stream API 的批评
尽管 Java Stream API 功能强大,但与其他真正的流处理 API(如 Apache Kafka Streams)相比,存在一些局限性:
1. 批处理,而非流处理
Java Stream API 实际上是批处理 API,而非真正的流处理 API。它的终端操作只有在处理完最后一个元素后才返回结果。这意味着它必须知道流有“结尾”——这在真正的无限数据流中是不可能的。
2. 链式结构,而非图结构
Java Stream 只能形成线性链(chain),每个流只能有一个后续操作。而真正的流处理系统(如 Kafka Streams)支持图结构(graph),允许多个监听器并行处理同一数据源,形成复杂的 DAG(有向无环图)。
3. 内部迭代,而非外部迭代
Java Stream 采用内部迭代(由终端操作触发),用户无法控制迭代过程。相比之下,外部迭代(如手动调用 next())更易于测试和调试,也支持从图中多个节点注入数据。
总结:Java Stream API 是一个强大的集合处理工具,适用于有限数据集的函数式批处理,但不适合构建真正的实时流处理系统。