Skip to content

Commit ec13782

Browse files
committed
完善操作叠加过程
1 parent cba9442 commit ec13782

File tree

1 file changed

+79
-9
lines changed

1 file changed

+79
-9
lines changed

6-Stream Pipelines.md

+79-9
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ int longestStringLengthStartingWithA
2929

3030
上述代码求出以字母*A*开头的字符串的最大长度,一种直白的方式是为每一次函数调用都执一次迭代,这样做能够实现功能,但效率上肯定是无法接受的。类库的实现着使用流水线(*Pipeline*)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。为讲解方便我们汇总了Stream的所有操作。
3131

32-
<table align="center"><tr><td colspan="3" align="center" border="0">Stream操作分类</td></tr><tr><td rowspan="2" border="1">中间操作(Intermediate operations)</td><td>无状态(Stateless)</td><td>unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()</td></tr><tr><td>有状态(Stateful)</td><td>distinct() sorted() sorted() limit() skip() </td></tr><tr><td rowspan="2" border="1">结束操作(Terminal operations)</td><td>非短路操作</td><td>forEach() forEachOrdered() toArray() reduce() collect() max() min() count() allMatch() noneMatch() </td></tr><tr><td>短路操作(short-circuiting)</td><td>anyMatch() findFirst() findAny() </td></tr></table>
32+
<table align="center"><tr><td colspan="3" align="center" border="0">Stream操作分类</td></tr><tr><td rowspan="2" border="1">中间操作(Intermediate operations)</td><td>无状态(Stateless)</td><td>unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()</td></tr><tr><td>有状态(Stateful)</td><td>distinct() sorted() sorted() limit() skip() </td></tr><tr><td rowspan="2" border="1">结束操作(Terminal operations)</td><td>非短路操作</td><td>forEach() forEachOrdered() toArray() reduce() collect() max() min() count()</td></tr><tr><td>短路操作(short-circuiting)</td><td>anyMatch() allMatch() noneMatch() findFirst() findAny()</td></tr></table>
3333

3434
Stream上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(*Stateless*)和有状态的(*Stateful*),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以得到结果,比如*找到第一个满足条件的元素*。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。
3535

@@ -68,15 +68,15 @@ for(String str : strings){
6868

6969
### >> 操作如何记录
7070

71-
注意这里使用的是*“操作(operation)”*一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是*<数据来源,操作,回调函数>*构成的三元组。Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的*PipelineHelper*来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系如下:
71+
<img src="./Figures/Java_stream_pipeline_classes.png" width="400px" align="right" hspace="10px" alt="Java_stream_pipeline_classes"/>
7272

73-
<img src="./Figures/Java_stream_pipeline_classes.png" width="500px" align="right" hspace="10px" alt="Java_stream_pipeline_classes"/>
73+
注意这里使用的是“*操作(operation)*”一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是*<数据来源,操作,回调函数>*构成的三元组。Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的*PipelineHelper*来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。
7474

75-
图中还有*IntPipeline*, *LongPipeline*, *DoublePipeline*没有画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟*ReferencePipeline*是并列关系。图中*Head*用于表示第一个Stage,即调用调用诸如*Collection.stream()*方法产生的Stage,很显然这个Stage里不包含任何操作;*StatelessOp**StatefulOp*分别表示有状态和无状态的Stage,对应与有状态和无状态的中间操作
75+
还有*IntPipeline*, *LongPipeline*, *DoublePipeline*没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟*ReferencePipeline*是并列关系。图中*Head*用于表示第一个Stage,即调用调用诸如*Collection.stream()*方法产生的Stage,很显然这个Stage里不包含任何操作;*StatelessOp**StatefulOp*分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作
7676

7777
一个可能的流水线示意图如下:
7878

79-
<img src="./Figures/Stream_pipeline_example.png" width="600px" align="right" alt="Stream_pipeline_example"/>
79+
<img src="./Figures/Stream_pipeline_example.png" width="600px" alt="Stream_pipeline_example"/>
8080

8181
图中通过*Collection.stream()*方法得到*Head*也就是stage0,紧接着调用一系列的中间操作,不断产生新的Stream。**这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作**。这就是Stream记录操作的方式。
8282

@@ -86,18 +86,88 @@ for(String str : strings){
8686

8787
这种协议由*Sink*接口完成,*Sink*接口包含的方法如下表所示:
8888

89-
<table><tr><td>方法名</td><td>作用</td></tr><tr><td>void begin(long size)</td><td>开始遍历元素之前调用该方法,通知Sink做好准备。</td></tr><tr><td>void end()</td><td>所有元素遍历完成之后调用,通知Sink没有更多的元素了。</td></tr><tr><td>boolean cancellationRequested()</td><td>是否可以结束操作,可以让短路操作尽早结束。</td></tr><tr><td>void accept(T t)</td><td>接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了</td></tr></table>
89+
<table><tr><td>方法名</td><td>作用</td></tr><tr><td>void begin(long size)</td><td>开始遍历元素之前调用该方法,通知Sink做好准备。</td></tr><tr><td>void end()</td><td>所有元素遍历完成之后调用,通知Sink没有更多的元素了。</td></tr><tr><td>boolean cancellationRequested()</td><td>是否可以结束操作,可以让短路操作尽早结束。</td></tr><tr><td>void accept(T t)</td><td>遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了</td></tr></table>
9090

91-
有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的*accept()*方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的*begin()**end()*方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的*Sink.begin()*方法可能创建一个乘放结果的容器,而*accept()*方法负责将元素添加到该容器,最后*end()*负责对容器进行排序。对于短路操作,*Sink.cancellationRequested()*也是必须实现的,比如*Stream.findFirst()*是短路操作,只要找到一个元素,*cancellationRequested()*就应该返回*true*,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。
91+
有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的*accept()*方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的*begin()**end()*方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的*Sink.begin()*方法可能创建一个乘放结果的容器,而*accept()*方法负责将元素添加到该容器,最后*end()*负责对容器进行排序。对于短路操作,*Sink.cancellationRequested()*也是必须实现的,比如*Stream.findFirst()*是短路操作,只要找到一个元素,*cancellationRequested()*就应该返回*true*,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。**实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法**
9292

93-
有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始依次调用每个Stage对应的{Sink.begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的*Sink.accept()*方法流程是这样的:
93+
有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的*Sink.accept()*方法流程是这样的:
9494

9595
```Java
9696
void accept(U u){
97-
1. 使用当前Sink包装的回调函数处理u
97+
1. 使用当前Sink包装的回调函数处理u
9898
2. 将处理结果传递给流水线下游的Sink
9999
}
100100
```
101101

102+
Sink接口的其他几个方法也是按照这种[处理->转发]的模型实现。下面我们结合具体例子看看Stream的中间操作是如何将自身的操作包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。先看*Stream.map()*方法:
103+
104+
```Java
105+
// Stream.map(),调用该方法将产生一个新的Stream
106+
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
107+
...
108+
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
109+
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
110+
@Override /*opWripSink()方法返回由回调函数包装而成Sink*/
111+
Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) {
112+
return new Sink.ChainedReference<P_OUT, R>(downstream) {
113+
@Override
114+
public void accept(P_OUT u) {
115+
R r = mapper.apply(u);// 1. 使用当前Sink包装的回调函数mapper处理u
116+
downstream.accept(r);// 2. 将处理结果传递给流水线下游的Sink
117+
}
118+
};
119+
}
120+
};
121+
}
122+
```
123+
124+
上述代码看似复杂,其实逻辑很简单,就是将回调函数*mapper*包装到一个Sink当中。由于*Stream.map()*是一个无状态的中间操作,所以map()方法返回了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将得到一个包装了当前回调函数的Sink。
125+
126+
再来看一个复杂一点的例子。*Stream.sorted()*方法将对Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成Sink的呢?sorted()一种可能封装的Sink代码如下:
127+
128+
```Java
129+
// Stream.sort()方法用到的Sink实现
130+
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
131+
private ArrayList<T> list;// 存放用于排序的元素
132+
RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
133+
super(downstream, comparator);
134+
}
135+
@Override
136+
public void begin(long size) {
137+
...
138+
// 创建一个存放排序元素的列表
139+
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
140+
}
141+
@Override
142+
public void end() {
143+
list.sort(comparator);// 只有元素全部接收之后才能开始排序
144+
downstream.begin(list.size());
145+
if (!cancellationWasRequested) {// 下游Sink不包含短路操作
146+
list.forEach(downstream::accept);// 2. 将处理结果传递给流水线下游的Sink
147+
}
148+
else {// 下游Sink包含短路操作
149+
for (T t : list) {// 每次都调用cancellationRequested()询问是否可以结束处理。
150+
if (downstream.cancellationRequested()) break;
151+
downstream.accept(t);// 2. 将处理结果传递给流水线下游的Sink
152+
}
153+
}
154+
downstream.end();
155+
list = null;
156+
}
157+
@Override
158+
public void accept(T t) {
159+
list.add(t);// 1. 使用当前Sink包装动作处理t,只是简单的将元素添加到中间列表当中
160+
}
161+
}
162+
```
163+
164+
上述代码完美的展现了Sink的四个接口方法是如何协同工作的。1) 首先beging()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小,2) 之后不断调用accept()方法将元素添加到中间结果当中,3) 最后当调用end()时Sink得知元素已经遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink。如果下游的Sink是短路操作,遍历元素时询问下游cancellationRequested()是否可以结束处理。
165+
166+
### >> 叠加之后的操作如何执行
167+
168+
Sink完美封装了Stream每一步操作,并给出了[执行->转发]的模式来叠加操作。但我们还没有看到整个Stream Pipeline是如何执行起来的。
169+
170+
171+
102172

103173

0 commit comments

Comments
 (0)