@@ -29,22 +29,22 @@ int longestStringLengthStartingWithA
29
29
30
30
上述代码求出以字母* A* 开头的字符串的最大长度,一种直白的方式是为每一次函数调用都执一次迭代,这样做能够实现功能,但效率上肯定是无法接受的。类库的实现着使用流水线(* Pipeline* )的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。为讲解方便我们汇总了Stream的所有操作。
31
31
32
- <table align =" center " ><tr ><td colspan =" 3 " align =" center " border =" 0 " >Stream操作分类</td ></tr ><tr ><td rowspan =" 2 " border =" 1 " >中间操作(Intermediate operations)</td ><td >无状态(Stateless)</td ><td >unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()</td ></tr ><tr ><td >有状态(Stateful)</td ><td >distinct() sorted() sorted() limit() skip() </td ></tr ><tr ><td rowspan =" 2 " border =" 1 " >结束操作(Terminal operations)</td ><td >非短路操作</td ><td >forEach() forEachOrdered() toArray() reduce() collect() max() min() count()</td ></tr ><tr ><td >短路操作(short-circuiting)</td ><td >anyMatch() allMatch() noneMatch() findFirst() findAny()</td ></tr ></table >
32
+ <table align =" center " ><tr ><td colspan =" 3 " align =" center " border =" 0 " >Stream操作分类</td ></tr ><tr ><td rowspan =" 2 " border =" 1 " >中间操作(Intermediate operations)</td ><td >无状态(Stateless)</td ><td >unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap()< br > flatMapToInt() flatMapToLong() flatMapToDouble() peek()</td ></tr ><tr ><td >有状态(Stateful)</td ><td >distinct() sorted() sorted() limit() skip() </td ></tr ><tr ><td rowspan =" 2 " border =" 1 " >结束操作(Terminal operations)</td ><td >非短路操作</td ><td >forEach() forEachOrdered() toArray() reduce() collect() max() min() count()</td ></tr ><tr ><td >短路操作(short-circuiting)</td ><td >anyMatch() allMatch() noneMatch() findFirst() findAny()</td ></tr ></table >
33
33
34
- Stream上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(* Stateless* )和有状态的(* Stateful* ),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序操作 ,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以得到结果 ,比如* 找到第一个满足条件的元素* 。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。
34
+ Stream上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(* Stateless* )和有状态的(* Stateful* ),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作 ,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果 ,比如* 找到第一个满足条件的元素* 。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。
35
35
36
36
## 一种直白的实现方式
37
37
38
38
<img src =" ./Figures/Stream_pipeline_naive.png " width =" 500px " align =" right " alt =" Stream_pipeline_naive " />
39
39
40
- 仍然考虑上述求最长字符串的程序,一种直白的方式是为每一次函数调用都执一次迭代 ,并将处理中间结果放到某种数据结构中(比如数组,容器等)。具体说来,就是调用* filter()* 方法后立即执行,选出所有以* A* 开头的字符串并放到一个列表list1中,之后让list1传递给* mapToInt()* 方法并立即执行,生成的结果放到list2中,最后遍历list2找出最大的数字作为最终结果。程序的执行流程如如所示:
40
+ 仍然考虑上述求最长字符串的程序,一种直白的流水线实现方式是为每一次函数调用都执一次迭代 ,并将处理中间结果放到某种数据结构中(比如数组,容器等)。具体说来,就是调用* filter()* 方法后立即执行,选出所有以* A* 开头的字符串并放到一个列表list1中,之后让list1传递给* mapToInt()* 方法并立即执行,生成的结果放到list2中,最后遍历list2找出最大的数字作为最终结果。程序的执行流程如如所示:
41
41
42
42
这样做实现起来非常简单直观,但有两个明显的弊端:
43
43
44
44
1 . 迭代次数多。迭代次数跟函数调用的次数相等。
45
45
2 . 频繁产生中间结果。每次函数调用都产生一次中间结果,存储开销无法接受。
46
46
47
- 这种弊端使得效率底下 ,根本无法接受。如果不使用Stream API我们都知道上述代码该如何在一次迭代中完成,大致是如下形式:
47
+ 这些弊端使得效率底下 ,根本无法接受。如果不使用Stream API我们都知道上述代码该如何在一次迭代中完成,大致是如下形式:
48
48
49
49
``` Java
50
50
int longest = 0 ;
@@ -161,11 +161,53 @@ class RefSortingSink<T> extends AbstractRefSortingSink<T> {
161
161
}
162
162
```
163
163
164
- 上述代码完美的展现了Sink的四个接口方法是如何协同工作的。1) 首先beging()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小,2) 之后不断调用accept()方法将元素添加到中间结果当中,3) 最后当调用end()时Sink得知元素已经遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink。如果下游的Sink是短路操作,遍历元素时询问下游cancellationRequested()是否可以结束处理。
164
+ 上述代码完美的展现了Sink的四个接口方法是如何协同工作的:
165
+ 1 . 首先beging()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小;
166
+ 2 . 之后不断调用accept()方法将元素添加到中间结果当中;
167
+ 3 . 最后当调用end()时Sink得知元素已经遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink;
168
+ 4 . 如果下游的Sink是短路操作,将结果传递给下游时不断询问下游cancellationRequested()是否可以结束处理。
165
169
166
170
### >> 叠加之后的操作如何执行
167
171
168
- Sink完美封装了Stream每一步操作,并给出了[ 执行->转发] 的模式来叠加操作。但我们还没有看到整个Stream Pipeline是如何执行起来的。
172
+ <img src =" ./Figures/Stream_pipeline_Sink.png " width =" 250px " align =" right " hspace =" 10px " alt =" Stream_pipeline_Sink " />
173
+
174
+ Sink完美封装了Stream每一步操作,并给出了[ 处理->转发] 的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。是什么启动这一连串的操作呢?也许你已经想到了启动的原始动力就是结束操作(Terminal Operation),一旦调用某个结束操作,就会触发整个流水线的执行。
175
+
176
+ 结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延伸了。结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的[ 处理->转发] 模型,结束操作的Sink就是调用链的出口。
177
+
178
+ 我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在* PipelineHelper* 中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。但Stream类库的设计者没有这么做,而是设置了一个` Sink AbstractPipeline.opWrapSink(int flags, Sink downstream) ` 方法来得到Sink,该方法的作用是产生一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?这是因为使用* opWrapSink()* 可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的* opWrapSink()* 方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:
179
+
180
+ ``` Java
181
+ // 从下游向上游不断包装Sink。如果最初传入的sink是结束操作代表的,
182
+ // 函数返回时就可以得到一个代表了流水线上所有操作的Sink。
183
+ final < P_IN > Sink<P_IN > wrapSink(Sink<E_OUT > sink) {
184
+ ...
185
+ for (AbstractPipeline p= AbstractPipeline . this ; p. depth > 0 ; p= p. previousStage) {
186
+ sink = p. opWrapSink(p. previousStage. combinedFlags, sink);
187
+ }
188
+ return (Sink<P_IN > ) sink;
189
+ }
190
+ ```
191
+
192
+ 现在流水线上从开始到结束的所有的操作都被包装到了一个Sink里,执行这个Sink就相当于执行整个流水线,执行Sink的代码如下:
193
+
194
+ ``` Java
195
+ // 对spliterator代表的数据执行wrappedSink代表的操作。
196
+ final < P_IN > void copyInto(Sink<P_IN > wrappedSink, Spliterator<P_IN > spliterator) {
197
+ ...
198
+ if (! StreamOpFlag . SHORT_CIRCUIT. isKnown(getStreamAndOpFlags())) {
199
+ wrappedSink. begin(spliterator. getExactSizeIfKnown());// 通知开始遍历
200
+ spliterator. forEachRemaining(wrappedSink);// 迭代
201
+ wrappedSink. end();// 通知遍历结束
202
+ }
203
+ ...
204
+ }
205
+ ```
206
+
207
+ 上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,然后调用spliterator.forEachRemaining()方法对数据进行迭代(Spliterator是容器的一种迭代器,参阅[ ] ),最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。
208
+
209
+
210
+
169
211
170
212
171
213
0 commit comments