Java8 之 Stream

使用Java8的Stream可以让你的代码:更简洁,更易读;更灵活;可并行

引言

下面两段代码都是用来返回低热量的菜肴名称 ,一个是用Java 7写的,另一个是用Java 8的流 。

  • Java7:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// List<Dish> menu = ....
List<Dish> lowCaloricDishes = new ArrayList<>();
// 筛选出低热量的菜肴
for(Dish d: menu)
if(d.getCalories() < 400)
lowCaloricDishes.add(d);
// 根据热量对菜肴排序
Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
public int compare(Dish d1, Dish d2){
return Integer.compare(d1.getCalories(), d2.getCalories());
}
});
// 收集排序好的菜肴的名称
List<String> lowCaloricDishesName = new ArrayList<>();
for(Dish d: lowCaloricDishes)
lowCaloricDishesName.add(d.getName());

//在这段代码中,你用了一个“垃圾变量” lowCaloricDishes。它唯一的作用就是作为一次性的中间容器
  • Java8:
1
2
3
4
5
6
7
// List<Dish> menu = ....
List<String> lowCaloricDishesName =
menu.stream()// 从List获取流
.filter(d -> d.getCalories() < 400) // 筛选出低热量的菜肴
.sorted(comparing(Dish::getCalories))// 根据热量对菜肴排序
.map(Dish::getName)// 将每个Dish映射成String
.collect(toList());// 收集排序好的菜肴的名称

为了利用多核架构并行执行这段代码,你只需要把stream()换成parallelStream()

1
2
3
4
5
6
List<String> lowCaloricDishesName =
menu.parallelStream()// 使用并行流
.filter(d -> d.getCalories() < 400)
.sorted(comparing(Dishes::getCalories))
.map(Dish::getName)
.collect(toList());
  • 代码是以声明性方式写的:说明想要完成什么(筛选热量低的菜肴)而不是说明如何实现(通过if和循环等)。这种方法加上行为参数化让你可以轻松应对变化:你很容易再创建一个代码版本,利用 Lambda表达式来筛选高卡路里的菜肴,而用不着去复制粘贴代码
  • 可以把几个基础操作链接起来,来表达复杂的数据处理流程,同时保持代码清晰可读 。
  • 方便的进行并行处理,无需自己实现多线程代码。

外部迭代与内部迭代

使用Collection接口需要用户去做迭代(比如用for-each),这称为外部迭代。 相反,Streams库使用内部迭代——它帮你把迭代做了,还把得到的流值存在了某个地方,你只要给出一个函数说要干什么就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 用for-each循环外部迭代
List<String> names = new ArrayList<>();
for(Dish d: menu){
names.add(d.getName());
}
// 用背后的迭代器做外部迭代
List<String> names = new ArrayList<>();
Iterator<String> iterator = menu.iterator();
while(iterator.hasNext()) {
Dish d = iterator.next();
names.add(d.getName());
}
//使用流,内部迭代
List<String> names = menu.stream()
.map(Dish::getName)
.collect(toList());

让我们用一个比喻来解释内部迭代的差异和好处吧。比方说你和你两岁的女儿索菲亚说,把玩具收起来

外部迭代:

你:“索菲亚,我们把玩具收起来吧。地上还有玩具吗?” 索菲亚:“有,球。” 你:“好,把球放进盒子里。还有吗?”

索菲亚:“有,那是我的娃娃。” 你:“好,把娃娃放进盒子里。还有吗?” 索菲亚:“有,有我的书。” 你:“好,把书放进盒子里。还有吗?” 索菲亚:“没了,没有了。” 你:“好,我们收好啦。”

这正是你每天都要对Java集合做的。你外部迭代一个集合,显式地取出每个项目再加以处理 。

而使用内部迭代,只需要说你的意图就好了:

把地上所有的玩具都放进盒子里

内部迭代比较好的原因有二: 第一,索非亚可以选择一只手拿娃娃,另一只手拿球;第二,她可以决定先拿离盒子最近的那个 东西,然后再拿别的。

同样的道理Streams库的内部迭代可以自动选择一种适合你硬件的数据表示和并行实现。与此相反,一旦通过写for-each而选择了外部迭代,那你基本上就要自己管理所有的并行问题了 。

流操作

java.util.stream.Stream中的Stream接口定义了许多操作。它们可以分为两大类。 中间操作与终端操作。

1
2
3
4
5
List<String> names = menu.stream()
.filter(d -> d.getCalories() > 300)// 中间操作
.map(Dish::getName)// 中间操作
.limit(3)// 中间操作
.collect(toList());// 终端操作

中间操作

诸如filter或sorted等中间操作会返回另一个流。这让多个操作可以连接起来形成一个查询。重要的是,除非流水线上触发一个终端操作,否则中间操作不会执行任何处理 。这是因为中间操作一般都可以合并起来,在终端操作时一次性全部处理 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<Dish> menu = Arrays.asList(
new Dish("pork", false, 800, Dish.Type.MEAT),
new Dish("beef", false, 700, Dish.Type.MEAT),
new Dish("season fruit", true, 120, Dish.Type.OTHER),
new Dish("chicken", false, 400, Dish.Type.MEAT),
new Dish("french fries", true, 530, Dish.Type.OTHER));

List<String> names = menu.stream()
.filter(d -> {
System.out.println("filtering" + d.getName());
return d.getCalories() > 300;
})
.map(d -> {
System.out.println("mapping" + d.getName());
return d.getName();
})
.limit(3)
.collect(toList());
System.out.println(names);

打印结果如下:

1
2
3
4
5
6
7
8
filtering pork
mapping pork
filtering beef
mapping beef
filtering season fruit
filtering chicken
mapping chicken
[pork, beef, chicken]

你会发现,有好几种优化利用了流的延迟性质。第一,尽管很多菜的热量都高于300卡路里,但只选出了前三个!这是因为limit操作和一种称为短路的技巧,我们会在后文中解释。第二,尽管filter和map是两个独立的操作,但它们合并到同一次遍历中了(我们把这种技术叫作循环合并)。

终端操作

终端操作会从流的流水线生成结果。其结果是任何不是流的值,比如List、 Integer,甚至void。

1
2
3
menu.stream().forEach(System.out::println);// forEach是一个返回void的终端操作
menu.stream().collect(toList()); // 返回List
menu.stream().collect(toMap(Dish::getName, Function.identity()));// 返回Map

使用流

总而言之,流的使用一般包括三件事:

  1. 一个数据源(如集合)来执行一个查询;
  2. 一个中间操作链,形成一条流的流水线;
  3. 一个终端操作,执行流水线,并能生成结果。

详细的中间操作和终端操作请参见API文档。

使用流

Stream API支持的许多操作。这些操作能让你快速完成复杂的数据查询,如筛选、切片、映射、查找、匹配和归约。

最后,我们会看看一些特殊的流:数值流、来自文件和数组等多种来源的流,最后是无限流 。

筛选

filter(Predicate<T>)
接受一个谓词(一个返回boolean的函数)作为参数,并返回一个包括所有符合谓词的元素的流。
1
2
3
4
// 筛选出所有素菜,创建一张素食菜单
List<Dish> vegetarianMenu = menu.stream()
.filter(Dish::isVegetarian)
.collect(toList());
distinct()
它会返回一个元素各异(根据流所成元素的hashCode和equals方法实现)的流
1
2
3
4
5
6
// 筛选出列表中所有的偶数,并确保没有重复
List<Integer> numbers = Arrays.asList(1, 2, 1, 3, 3, 2, 4);
numbers.stream()
.filter(i -> i % 2 == 0)
.distinct()
.forEach(System.out::println);
limit(int)
该方法会返回一个不超过给定长度的流。所需的长度作为参数传递给limit。如果流是有序的,则最多会返回前n个元素。
1
2
3
4
5
// 选出热量超过300卡路里的头三道菜
List<Dish> dishes = menu.stream()
.filter(d -> d.getCalories() > 300)
.limit(3)
.collect(toList());

请注意limit也可以用在无序流上,比如源是一个Set。这种情况下, limit的结果不会以任何顺序排列。

skip(n)
返回一个扔掉了前n个元素的流。如果流中元素不足n个,则返回一个空流。
1
2
3
4
5
// 跳过超过300卡路里的头两道菜,并返回剩下的
List<Dish> dishes = menu.stream()
.filter(d -> d.getCalories() > 300)
.skip(2)
.collect(toList());

映射

一个非常常见的数据处理套路就是从某些对象中选择信息。比如在SQL里,你可以从表中选择一列。 Stream API也通过map和flatMap方法提供了类似的工具。

map (Function<T,R>)
接受一个函数作为参数。这个函数会被应用到每个元素上,并将其映射成一个新的元素
1
2
3
4
// 方法引用Dish::getName传给了map方法,来提取流中菜肴的名称
List<String> dishNames = menu.stream()
.map(Dish::getName)
.collect(toList());

因为getName方法返回一个String,所以map方法输出的流的类型就是Stream

1
2
3
4
5
//提取每个菜肴名称的长度
List<Integer> dishNameLengths = menu.stream()
.map(Dish::getName)
.map(String::length)
.collect(toList());

第一个map将Stream<Dish>映射为Stream<String>,然后第二个map又将Stream<String>映射为Stream<Integer>,在IDEA中可以看到给出的提示,如下图:

idea对Stream的提示

flatMap(Function<T,R>)
将函数返回的Stream<T>并不是分别映射成一个流(导致最终映射的结果是Stream<Stream<T>>),而是映射成流的内容(最终映射结果为Stream<T>)。

对于一张单表,如何返回一张列表,列出里面各不相同的字符呢?

例如,给定单词列表["Hello", "World"],你想要返回列表["H","e","l","o","W","r","d"]

你可能会认为这很容易,你可以把每个单词映射成一张字符表,然后调用distinct来过滤重复的字符。第一个版本可能是这样的:

1
2
3
4
5
List<String> words = Arrays.asList("Hello", "World");
words.stream()
.map(word -> word.split(""))
.distinct()
.collect(toList());

传递给map方法的Lambda为每个单词返回了一个String[]。因此,map返回的流实际上是Stream<String[]>类型的。你真正想要的是用Stream<String>来表示一个字符流。

不正确的使用map1

首先,你需要一个字符流,而不是数组流。有一个叫作Arrays.stream()的方法可以接受一个数组并产生一个流 :

1
2
3
4
5
words.stream()
.map(word -> word.split(""))// 每个单词转换为由其字母构成的数组
.map(Arrays::stream)// 让每个数组变成一个单独的流
.distinct()
.collect(toList());

当前的解决方案仍然搞不定! 这是因为,你现在得到的是 Stream<Stream<String>>。

现在flatMap终于派上用场了:

1
2
3
4
5
List<String> uniqueCharacters = words.stream()
.map(w -> w.split(""))
.flatMap(Arrays::stream)
.distinct()
.collect(Collectors.toList());

使用flatMap找出单词列表中各不相同的字符

上面的代码也可以直接写成:

1
2
3
4
List<String> uniqueCharacters = words.stream()
.flatMap(w -> Arrays.stream(w.split("")))
.distinct()
.collect(toList());

查找和匹配

数据集中的某些元素是否匹配一个给定的属性。 Stream通过allMatch、 anyMatch、 noneMatch、 findFirst和findAny方法提供了这样的工具 。

anyMatch(Predicate<T>)
检查谓词是否至少匹配一个元素
1
boolean hasVegetarian = menu.stream().anyMatch(Dish::isVegetarian);
allMatch(Predicate<T>)
检查谓词是否匹配所有元素
1
boolean isHealthy = menu.stream().allMatch(d -> d.getCalories() < 1000);
noneMatch(Predicate<T>)
没有任何元素与给定的谓词匹配

查找元素

findAny(Predicate<T>)
将返回当前流中的任意元素
1
2
3
Optional<Dish> dish = menu.stream()
.filter(Dish::isVegetarian)
.findAny();

流水线将在后台进行优化使其只需走一遍,并在利用短路找到结果时立即结束。

Optional 是什么?

Optional<T>类(java.util.Optional)是一个容器类,代表一个值存在或不存在。在上面的代码中, findAny可能什么元素都没找到。 Java 8的库设计人员引入了Optional<T>,这样就不用返回众所周知容易出问题的null了。

我们在这里不会详细讨论Optional,以后的文章中会进行介绍。这里只给出几个常见的API。

  • isPresent()将在Optional包含值的时候返回true, 否则返回false。
  • ifPresent(Consumer<T> block)会在值存在的时候执行给定的代码块。
  • T get()会在值存在时返回值,否则抛出一个NoSuchElement异常。
  • T orElse(T other)会在值存在时返回值,否则返回一个默认值

例如,在前面的代码中你需要显式地检查Optional对象中是否存在一道菜可以访问其名称:

1
2
3
4
menu.stream()
.filter(Dish::isVegetarian)
.findAny()// 返回一个Optional<Dish>
.ifPresent(d -> System.out.println(d.getName());// 如果包含一个值就打印它,否则什么都不做
findFirst (Predicate<T>)
有些流有一个出现顺序(encounter order)来指定流中项目出现的逻辑顺序(比如由List或排序好的数据列生成的流)。对于这种流,你可能想要找到第一个元素。
1
2
3
4
5
List<Integer> someNumbers = Arrays.asList(1, 2, 3, 4, 5);
Optional<Integer> firstSquareDivisibleByThree = someNumbers.stream()
.map(x -> x * x)
.filter(x -> x % 3 == 0)
.findFirst(); // 9

何时使用findFirst和findAny你可能会想,为什么会同时有findFirst和findAny呢?答案是并行。找到第一个元素在并行上限制更多。如果你不关心返回的元素是哪个,请使用findAny,因为它在使用并行流时限制较少。

归约

reduce(T identity, BinaryOperator<T> accumulator)
在流上进行规约操作,接收一个初始值T,和一个BinaryOperator。后者将两个元素结合起来产生一个新值 。
1
2
3
4
5
6
7
8
// 所有元素的和
int sum = numbers.stream().reduce(0, (a, b) -> a + b);
// 所有元素的积
int product = numbers.stream().reduce(1, (a, b) -> a * b);
// 使用方法引用
int sum = numbers.stream().reduce(0, Integer::sum);
// 使用无初试值的版本,当流中没有值时,Optional中为null
Optional<Integer> sum = numbers.stream().reduce((a, b) -> (a + b));

求最大值和最小值:

1
2
Optional<Integer> max = numbers.stream().reduce(Integer::max);
Optional<Integer> min = numbers.stream().reduce(Integer::min);

数值流

原始类型流特化

我们在前面看到了可以使用reduce方法计算流中元素的总和。例如,你可以像下面这样计算菜单的热量:

1
2
3
int calories = menu.stream()
.map(Dish::getCalories)
.reduce(0, Integer::sum);

这段代码的问题是,它有一个暗含的装箱成本。每个Integer都必须拆箱成一个原始类型,再进行求和。要是可以直接像下面这样调用sum方法,岂不是更好?

1
2
3
int calories = menu.stream()
.map(Dish::getCalories)
.sum();

但这是不可能的。问题在于map方法会生成一个Stream<T>。虽然流中的元素是Integer类型,但Stream没有定义sum方法。

Java 8引入了三个原始类型特化流接口来解决这个问题: IntStream、 DoubleStream和LongStream,分别将流中的元素特化为int、 long和double,从而避免了暗含的装箱成本。每个接口都带来了进行常用数值归约的新方法,比如sum、max。此外还有在必要时再把它们转换回对象流的方法。

1
2
3
4
// 映射到数值流
int calories = menu.stream()
.mapToInt(Dish::getCalories)
.sum();

一旦有了数值流,你可能会想把它转换回非特化流。例如,IntStream上的操作只能产生原始整数: IntStream 的 map操作接受的Lambda必须接受int并返回int。但是你可能想要生成另一类值。

1
2
IntStream intStream = menu.stream().mapToInt(Dish::getCalories);
Stream<Integer> stream = intStream.boxed();

数值范围

假设你想要生成1和100之间的所 有数字。

Java 8引入了IntStream和LongStream,帮助生成这种范围。他们都有两个方法range和rangeClosed。这两个方法都是第一个参数接受起始值,第二个参数接受结束值。但range是不包含结束值的,而rangeClosed则包含结束值。

1
2
//生成[1,100]之间的偶数
IntStream evenNumbers = IntStream.rangeClosed(1, 100).filter(n -> n % 2 == 0);

构建流

由值创建流

1
2
3
4
5
// Stream.of创建流
Stream<String> stream = Stream.of("Java 8 ", "Lambdas ", "In ", "Action");
stream.map(String::toUpperCase).forEach(System.out::println);
// empty()得到一个空流
Stream<String> emptyStream = Stream.empty();

由数组创建流

1
2
3
// Arrays.stream从数组创建一个流
int[] numbers = {2, 3, 5, 7, 11, 13};
int sum = Arrays.stream(numbers).sum();

由文件生成流

Java中用于处理文件等I/O操作的NIO API(非阻塞 I/O)已更新,以便利用Stream API。java.nio.file.Files中的很多静态方法都会返回一个流。例如,一个很有用的方法是Files.lines,它会返回一个由指定文件中的各行构成的字符串流。

1
2
3
4
5
6
7
8
9
// 统计一个文件中有多少各不相同的词
long uniqueWords = 0;
try(Stream<String> lines =
Files.lines(Paths.get("data.txt"), Charset.defaultCharset())){
uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(" ")))
.distinct()
.count();
}catch(IOException e){
}

由函数生成流:创建无限流

  • 迭代

iterate方法接受一个初始值,还有一个依次应用在每个产生的新值上的Lambda

1
2
3
Stream.iterate(0, n -> n + 2)
.limit(10)
.forEach(System.out::println);

请注意,iterate将生成一个无限流——这个流没有结尾,因为值是按需计算的,可以永远计算下去。我们说这个流是无界的。正如我们前面所讨论的,这是流和集合之间的一个关键区别。我们使用limit方法来显式限制流的大小。

一般来说,在需要依次生成一系列值的时候应该使用iterate 。

1
2
3
4
5
6
Stream.iterate(new int[]{0, 1},
t -> new int[]{t[1],t[0] + t[1]})
.limit(10)
.map(t -> t[0])
.forEach(System.out::println);
//这段代码将打印斐波纳契数列: 0, 1, 1, 2, 3, 5, 8, 13, 21, 34…
  • 生成

与iterate方法类似, generate方法也可让你按需生成一个无限流。但generate不是依次对每个新生成的值应用函数的。它接受一个Supplie<T>类型的Lambda提供新的值 。

1
2
3
4
5
6
7
8
9
10
Stream.generate(Math::random)
.limit(5)
.forEach(System.out::println);
/*
0.9410810294106129
0.6586270755634592
0.9592859117266873
0.13743396659487006
0.3942776037651241
*/

我们使用的供应源(指向Math.random的方法引用)是无状态的:它不会在任何地方记录任何值。但供应源不一定是无状态的。你可以创建存储状态的供应源,它可以修改状态,并在为流生成下一个值时使用。

1
2
3
4
5
6
7
8
9
10
11
12
IntSupplier fib = new IntSupplier(){
private int previous = 0;
private int current = 1;
public int getAsInt(){
int oldPrevious = this.previous;
int nextValue = this.previous + this.current;
this.previous = this.current;
this.current = nextValue;
return oldPrevious;
}
};
IntStream.generate(fib).limit(10).forEach(System.out::println);

收集器

你有一个由Transaction构成的List,并且想按照名义货币进行分组。在没有Lambda的Java里,哪怕像这种简单的用例实现起来都很啰嗦,就像下面这样。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 建立累积交易分组的Map
Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();
for (Transaction transaction : transactions) {
// 提取Transaction的货币
Currency currency = transaction.getCurrency();
List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);
if (transactionsForCurrency == null) { //如果分组Map中没有这种货币的条目,就创建一个
transactionsForCurrency = new ArrayList<>();
transactionsByCurrencies.put(currency, transactionsForCurrency);
}
// 将当前遍历的Transaction加入同一货币的Transaction的List
transactionsForCurrency.add(transaction);
}

尽管代码的目的很简单——把列表中的交易按货币分组——但是写起来却很麻烦。更糟糕的是,读起来比写起来更费劲!

而用Stream中collect方法,你就可以用一句话实现 :

1
2
Map<Currency, List<Transaction>> transactionsByCurrencies =
transactions.stream().collect(groupingBy(Transaction::getCurrency));

收集器简介

传递给collect()的参数是Collector接口的一个实现 。刚才是通过groupingBy方法获得了一个收集器的实现。我们之前也见过collect(Collectos.toList())的用法,所以显然Collectos.toList()也是返回了Collector接口的一种实现。

我们先看一些JDK预定义的收集器,它们就可以处理大部分实际情况。

预定义收集器

分组

Collectors.groupingBy(Function<T,K>)
接收一个分类函数,返回一个收集器。该收集器能根据分组函数对流中的元素进行分组,收集到一个Map<K,List<T>>。
1
2
3
4
Map<Dish.Type, List<Dish>> dishesByType =
menu.stream().collect(groupingBy(Dish::getType));
System.out.println(dishesByType);
// {OTHER=[season fruit, french fries, rice, pizza], MEAT=[pork, beef, chicken], FISH=[prawns, salmon]}

上面的代码中,分类函数是提取每个DIsh的Type。分类函数还可以做的更复杂,而不仅是属性提取。

1
2
3
4
5
6
7
8
9
10
public enum CaloricLevel { DIET, NORMAL, FAT }
Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream()
.collect(
groupingBy(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return
CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
})
);

上面的分类函数根据热量进行分组。可以看到,因为分类函数返回的是CaloricLevel类型,所以最终的Map的键的类型就是CaloricLevel。

多级分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel =
menu.stream().collect(
groupingBy(Dish::getType,
groupingBy(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
})
)
);

System.out.println(dishesByTypeCaloricLevel);
/*
OTHER={DIET=[season fruit, rice], NORMAL=[french fries, pizza]},
MEAT={DIET=[chicken], FAT=[pork], NORMAL=[beef]},
FISH={DIET=[prawns], NORMAL=[salmon]}}
*/

按子组收集数据

观察一下,在多级分组中,外层的groupingBy方法接收了两个参数,第一个参数是外层分类函数,第二个参数实际上是一个Collector。所以第二个参数实际可以传入任意的收集器而不一定是groupingBy()返回的收集器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 分别统计每种菜肴的个数
Map<Dish.Type, Long> typesCount = menu.stream()
.collect( groupingBy(Dish::getType, counting()) );
// {MEAT=3, FISH=2, OTHER=4}

// 分别统计每种菜肴的卡路里总数
Map<Dish.Type, Integer> totalCaloriesByType =menu.stream()
.collect( groupingBy(Dish::getType, summingInt(Dish::getCalories)) );

// 分别统计每种菜肴中,卡路里最高的菜肴
Map<Dish.Type, Optional<Dish>> mostCaloricByType = menu.stream()
.collect(
groupingBy(Dish::getType,
maxBy(comparingInt(Dish::getCalories)))
);
// {FISH=Optional[salmon], OTHER=Optional[pizza], MEAT=Optional[pork]}

实际上,普通的单参数groupingBy(f)(其中f是分类函数)是groupingBy(f, toList())的简便写法 。

  • 把收集器的结果转换为另一种类型 :

注意到上面的例子中,分组后取卡路里最高的菜肴。最后的结果是Map<Dish.Type, Optional<Dish>>,Map的value是Option<Dish>类型的,你可能想把它转换成Dish类型

1
2
3
4
5
6
7
8
9
Map<Dish.Type, Dish> mostCaloricByType = menu.stream()
.collect(
groupingBy(Dish::getType,
collectingAndThen(
maxBy(comparingInt(Dish::getCalories)),
Optional::get)
)
);
// {FISH=salmon, OTHER=pizza, MEAT=pork}
Collectors.collectingAndThen()
这个工厂方法接受两个参数——要转换的收集器以及转换函数,并返回另一个新收集器。新收集器相当于旧收集器的一个包装,使得collect操作的最后一步就是将返回值用转换函数做一个映射。

上面的例子中,被包起来的收集器就是用maxBy建立的那个,而转换函数Optional::get则把返回的Optional中的值提取出来。

  • 与groupingBy联合使用的其他收集器的例子 :

之前讲过,groupingBy方法的第二个参数可以是任意的收集器。再举个复杂点的例子。我们希望将菜单根据Type分组,然后分别统计每个组中都有那些CaloricLevel 。就是说,我们希望的得到如下的分组结果:

1
{OTHER=[DIET, NORMAL], MEAT=[DIET, NORMAL, FAT], FISH=[DIET, NORMAL]}

这样,如果你想吃鱼并且在减肥,那很容易找到一道菜 。

看下代码是如何实现的:

1
2
3
4
5
6
7
8
9
10
11
12
Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType = menu.stream()
.collect(
groupingBy(Dish::getType,
mapping(this::judgeCaloricLevel, toSet())
)
);

private CaloricLevel judgeCaloricLevel(Dish dish){
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}

传递给mapping方法的转换函数将Dish映射成了CaloricLevel:生成的CaloricLevel流传递给一个toSet收集器,它和toList类似,不过是把流中的元素累积到Set而不是List中,以便仅保留不相同的值。

请注意在上示例中,对于返回的Set是什么类型并没有任何保证。但通过使用toCollection,你就可 以有更多的控制。例如,你可以给它传递一个构造函数引用来要求HashSet:

1
2
3
4
5
6
Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType = menu.stream()
.collect(
groupingBy(Dish::getType,
mapping(this::judgeCaloricLevel, toCollection(HashSet::new))
)
);

分区

分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true是一组, false是一组。

1
2
3
4
5
6
7
8
// 把菜单按照素食和非素食分开
Map<Boolean, List<Dish>> partitionedMenu =
menu.stream().collect(partitioningBy(Dish::isVegetarian));
// {false=[pork, beef, chicken, prawns, salmon],true=[french fries, rice, season fruit, pizza]}
// 那么通过Map中键为true的值,就可以找出所有的素食菜肴了:
List<Dish> vegetarianDishes = partitionedMenu.get(true);
// 请注意,用同样的分区谓词,对菜单流作筛选,也可以获得相同的结果:
List<Dish> vegetarianDishes = menu.stream().filter(Dish::isVegetarian).collect(toList());

自定义收集器

我们已经看过JDK中预定义了Collector接口的许多收集器实现,例如通过工厂方法Collectors.toList()或Collectors.groupingBy()获得的预定义收集器。另外,我们也可以为Collector接口提供自己的实现。

而在学习自定义收集器前,我们先看下collect方法的其他使用方式。

在之前的代码中,比如menu.stream().filter(Dish::isVegetarian).collect(toList());,我们直接向collect方法传递了一个收集器,然后collect方法就可以拿着这个收集器进行收集工作了。在这里,collect方法拿着toList()提供的收集器,将上游流中的每个元素收集到一个List中。

然而,collect方法还有其它重写版本:

1
2
3
4
5
6
List<Dish> vegetarianDishs = 
menu.stream()
.filter(Dish::isVegetarian)
.collect(ArrayList::new,
(list, dish) -> list.add(dish),
(list1, list2) -> list1.addAll(list2));

看下collect该重写版本的定义:

1
2
3
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
  • supplier : 该函数定义如何获得一个用于保存最终元素的容器。
  • accumulator:该函数定义如何将流中的元素追加到结果容器
  • combiner: 该函数定义如何将两个部分结果容器,合并为一个结果容器
  • collect函数最终返回一个结果容器。

在使用串行流的情况下,该collect方法执行的步骤类似于:

1
2
3
4
R result = supplier.get();// 从supplier函数获得一个用于保存最终元素的容器。
for (T element : this stream)
accumulator.accept(result, element);// accumulator函数式接口中会将元素追加到结果容器
return result;

注意到上面只调用了一次supplier函数,并且没有使用到combiner,这是应为如果在串行流中收集,只会生成一个结果容器。但是如果使用并行流时,会调用多次supplier来获得多个部分结果容器,然后combiner函数会将这些部分结果容器最终组合到一起。

可以通过例子来看下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
List<Dish> vegetarianDishes =
menu.parallelStream()
.filter(Dish::isVegetarian)
.collect(() -> {
synchronized (CollectorTest.class) {
System.out.println("create new list");
return new ArrayList<>();
}
}/*supplier*/,
(list, dish) -> {
synchronized (CollectorTest.class) {
System.out.println("add dish: " + dish);
list.add(dish);
}
}/*accumulator*/,
(list1, list2) -> {
synchronized (CollectorTest.class) {
System.out.println("combine list");
System.out.println("list1: " + list1);
System.out.println("list2: " + list2);
list1.addAll(list2);
System.out.println("after combine: " + list1);
}
}/*combiner*/);
/*加上synchronized是为了防止多线程会并发调用supplier、accumulator、combiner,打印的时候发生混乱。
控制台输出如下(倒着看更容易观察清楚。。):
create new list
create new list
create new list
add dish: rice
create new list
add dish: season fruit
create new list
combine list
list1: []
list2: []
after combine: []
create new list
create new list
add dish: pizza
create new list
create new list
combine list
list1: []
list2: []
after combine: []
combine list
list1: [season fruit]
list2: []
after combine: [season fruit]
combine list
list1: []
list2: [season fruit]
after combine: [season fruit]
add dish: french fries
combine list
list1: [pizza]
list2: []
after combine: [pizza]
combine list
list1: [french fries]
list2: [rice]
after combine: [french fries, rice]
combine list
list1: [french fries, rice]
list2: [pizza]
after combine: [french fries, rice, pizza]
combine list
list1: [season fruit]
list2: [french fries, rice, pizza]
after combine: [season fruit, french fries, rice, pizza]
*/

可以看到,创建了不只两个中间结果容器,然后最终他们都被合并到一个结果容器中。至于到底会创建几个中间结果容器(即同时创建的子任务个数),与子任务的大小和CPU核心线程数有关。具体内容可参考《Java8实战》关于Spliterator和ForkJoinPool相关部分。

理解 Collector 接口声明的方法

理解了上面的内容,在来看Collector就容易的多了。

之前说过,toList()返回了一个预定义的收集器实现,当我们调用stream.collect(toList())时,collect方法会使用该收集器,将流中的所有元素收集成一个List。我们通过研究这个收集器是怎么实现的,可以很好地了解:

  1. Collector接口是怎么定义的
  2. 收集器被传入到collect()方法后,是如何被使用的。

先看下Collector接口的定义。

1
2
3
4
5
6
7
8
9
10
11
12
/*
T是流中要收集的项目的泛型。
A是累加器本身的类型。累加器在收集过程中用于累积部分结果。
R是收集操作得到的对象的类型。(通常但并不一定是集合)
*/
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
Function<A, R> finisher();
BinaryOperator<A> combiner();
Set<Characteristics> characteristics();
}

例如,你可以实现一个ToListCollector<T>类,将Stream<T>中的所有元素收集到一个List<T>里:

1
2
3
4
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> 
// T 对应 T
// List<T> 对应A
// List<T> 对应R

下面我们来看Collector接口中每个方法的作用。

Supplier<A> supplier()
在调用时它会创建一个空的累加器实例,供数据收集过程使用。

在我们的ToListCollector中, supplier返回一个空的List,如下所示:

1
2
3
public Supplier<List<T>> supplier() {
return () -> new ArrayList<T>();
}

你也可以只传递一个构造函数引用:

1
2
3
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
BiConsumer<A, T> accumulator();
accumulator方法会返回执行归约操作的函数。当遍历到流中第n个元素时,这个函数执行时会有两个参数:保存归约结果的累加器(已收集了流中的前 n - 1 个项目), 还有第n个元素本身。

对于ToListCollector而言:

1
2
3
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
Function<A, R> finisher();
方法返回在累积过程的最后要调用的一个函数,该函数将累加器对象转换为整个集合操作的最终结果

通常,就像ToListCollector的情况一样,累加器对象恰好符合预期的最终结果,因此无需进行转换。所以finisher方法只需返回identity函数:

1
2
3
public Function<List<T>, List<T>> finisher() {
return Function.identity();
}

对于顺序流的处理过程,理解supplier、accumulator和finisher三个函数就够了:

顺序归约过程的逻辑步骤

对于并行流而言,由于会将流分割成几个子任务并发收集,所以同时会存在多个收集重启,所以最终需要使用combiner函数将这些部分结果容器合并成一个最终的结果容器。

BinaryOperator<A> combiner();
方法会返回一个供归约操作使用的函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得的累加器要如何合并。

对于toList而言,这个方法的实现非常简单,只要把从流的第二个部分收集到的项目列表加到遍历第一部分时得到 的列表后面就行了:

1
2
3
4
public BinaryOperator<List<T>> combiner() {
return (list1, list2) ->
{list1.addAll(list2);return list1; }
}

使用combiner方法来并行化归约过程

Set<Characteristics> characteristics()

: 会返回一个不可变的Characteristics集合,提供了一系列特征,也就是一个提示列表,告诉collect方法在执行归约操作的时候可以应用哪些优化(比如并行化)。

Characteristics是一个包含三个项目的枚举:

  • UNORDERED——收集后的顺序可不保持流中元素的“遭遇顺序”(encounter-order)
  • CONCURRENT——accumulator函数可以从多个线程同时调用,即收集容器是线程安全的,如果收集器是CONCURRENT,则Supplier只会被调用一次,即自始至终只会创建一个收集容器。如果收集器没有标为UNORDERED,那它仅在用于无序数据源时才可以并发收集。(如果为UNORDERED,则对于有序/无序源都进行并发收集)
  • IDENTITY_FINISH——这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器A不加检查地转换为结果R是安全的。

我们迄今开发的ToListCollector是IDENTITY_FINISH的,因为用来累积流中元素的 List已经是我们要的最终结果,用不着进一步转换了,但它并不是UNORDERED,因为用在有序流上的时候,我们还是希望顺序能够保留在得到的List中。最后,它是CONCURRENT的,但我们刚才说过了,仅仅在背后的数据源无序时才会并行处理。

笔者注:《Java8实战》中以上论述有误,ToListCollector不能是CONCURRENT的,因为采用的收集容器ArrayList是非线程安全的,经过我的测试,并行流收集时会发生问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {

@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
@Override
public Function<List<T>, List<T>> finisher() {
return Function.identity();
}
@Override
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
};
}
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(
IDENTITY_FINISH, CONCURRENT));
}
}

// 使用自定义的收集器。
Stream<Integer> integerStream = IntStream.range(1, 1000).mapToObj(Integer::new);
List<Integer> collect = integerStream.parallel().collect(new ToListCollector<>());

并行流

关于并行流,希望大家还是多看写书和文档,笔者目前也没有理解透彻。。。

总之,就是在使用并行流时要谨慎。往往你对并行流的理解并不是对的。比如如下两个例子的输出结果,大多数人的判断都是错的:

1
2
3
4
Stream<Integer> integerStream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

integerStream.parallel().forEach(System.out::println);
integerStream.parallel().collect(toList()).forEach(System.out::println);

如果你感到很奇怪,可以参考我的博客文章Java8 ParallelStream并行流不一定返回乱序结果

不过如果不依赖于顺序性时,一般都可放心大胆使用。

但是并不是使用了并行流后效率就一定高:要考虑到并行化的额外开销对于较小的数据量,选择并行流几乎从来都不是一个好的决定

另外在这些的情景下,不要使用并行流,原因可以参考《Java8实战》关于并行流的部分:

  • 收集容器是LinkedList时,不要使用并行流
  • 流是从Stream.iterate()生成时,不要使用并行流
  • findFirst等依赖于元素顺序的操作,不要使用并行流

注意

流只能遍历一次

遍历完之后,我们就说这个流已经被消费掉了。可以从原始数据源那里再获得一个新的流来重新遍历一遍(如果数据源是可重放的话,比如集合),但不能对同一个流遍历两遍。

1
2
3
4
5
//例如,以下代码会抛出一个异常
List<String> title = Arrays.asList("Java8", "In", "Action");
Stream<String> s = title.stream();
s.forEach(System.out::println);
s.forEach(System.out::println);// java.lang.IllegalStateException:流已被操作或关闭

解决方案:《Java8实战》附录:StreamForker 。

流与集合的区别

  • 比如说存在DVD里的电影,DVD就是一个集合,因为它包含了所有的数据。
  • 现在再来想想在网上通过视频流看电影。流媒体视频播放器只要下载用户观看位置的那几帧就可以了,这样不用等到流中大部分值计算出来,你就可以边下载边播放流(比如观看直播足球赛)。
  • 粗略地说,集合与流之间的差异就在于什么时候进行计算。集合是一个内存中的数据结构,它包含数据结构中目前所有的值——集合中的每个元素都得先算出来才能添加到集合中。(你可以往集合里加东西或者删东西,但是不管什么时候,集合中的每个元素都是放在内存里的,元素都得先算出来才能成为集合的一部分。)
  • 相比之下,流则是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的 。流就 像是一个延迟创建的集合:只有在消费者要求的时候才会计算值 。

收集器与规约的关系

todo。。。

总结

总结一下, Java 8中的Stream API可以让你写出这样的代码:

  • 声明性——更简洁,更易读
  • 可复合——更灵活
  • 可并行——性能更好
UNO,UNC UNO,C O,UNC O,C
unP,unO sup:1,com:0,有序 sup:1,com:0,有序 sup:1,com:0,有序 sup:1,com:0,有序
unP,O sup:1,com:0,有序 sup:1,com:0,有序 sup:1,com:0,有序 sup:1,com:0,有序
P,unO sup:n,com:m,有序(并行计算、并行收集) sup:1,com:0,无序(并行计算、并发收集) sup:n,com:m,有序(并行计算、并行收集) sup:1,com:0,无序(并行计算、并发收集)
P,O sup:n,com:m,有序(并行计算、并行收集) sup:1,com:0,无序(并行计算、并发收集) sup:n,com:m,有序(并行计算、并行收集) sup:n,com:m,有序 (如果Collect是有序的,仅当源为无序时,才并发收集)。此处为并行计算、并行收集。但是收集器会保证收集顺序保持为遭遇顺序。

If the stream is sequential, it will be sequential.

If the stream is parallel, it will be a parallel or concurrent collect. If at least either, the stream or the collector, is unordered and the collector has the CONCURRENT characteristic. it will be concurrent, otherwise it will be parallel.

https://stackoverflow.com/questions/50625544/confusion-about-characteristics-unordered-in-java-8-in-action-book

Comment