Skip to content

Commit c77ee52

Browse files
make combine_latest use Fused Streams
1 parent d1c767b commit c77ee52

File tree

1 file changed

+50
-37
lines changed

1 file changed

+50
-37
lines changed

src/stream/rx/combine_latest.rs

+50-37
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
1-
use futures::stream::Stream;
1+
use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
22
use paste::paste;
3+
use pin_project_lite::pin_project;
34
use std::pin::Pin;
45
use std::task::{Context, Poll};
56

67
macro_rules! combine_latest {
78
($name:ident; $($stream:ident),+; $($type:ident),+) => {
89
paste! {
9-
pub struct $name<$($stream: Stream<Item = $type>),+, $($type),+> {
10-
$(
11-
[<$stream:lower>]: Pin<Box<$stream>>,
12-
[<$type:lower>]: Option<$type>,
13-
)+
10+
pin_project! {
11+
pub struct $name<$($stream: Stream<Item = $type>),+, $($type),+> {
12+
$(
13+
#[pin]
14+
[<$stream:lower>]: Fuse<$stream>,
15+
[<$type:lower>]: Option<$type>,
16+
)+
17+
}
1418
}
1519
}
1620

@@ -22,61 +26,70 @@ macro_rules! combine_latest {
2226
) -> Self {
2327
$name {
2428
$(
25-
[<$stream:lower>]: Box::pin([<$stream:lower>]),
29+
[<$stream:lower>]: [<$stream:lower>].fuse(),
2630
[<$type:lower>]: None,
2731
)+
2832
}
2933
}
3034
}
3135
}
3236

33-
impl<$($stream: Stream<Item = $type>),+, $($type: Clone + Unpin),+> Stream for $name<$($stream),+, $($type),+>
37+
impl<$($stream: Stream<Item = $type>),+, $($type: ToOwned<Owned = $type>),+> FusedStream for $name<$($stream),+, $($type),+>
38+
{
39+
fn is_terminated(&self) -> bool {
40+
paste! {
41+
$(
42+
self.[<$stream:lower>].is_terminated()
43+
)&&+
44+
}
45+
}
46+
}
47+
48+
impl<$($stream: Stream<Item = $type>),+, $($type: ToOwned<Owned = $type>),+> Stream for $name<$($stream),+, $($type),+>
3449
{
3550
type Item = ($($type),+);
3651

3752
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38-
let this = self.get_mut();
53+
let mut this = self.project();
3954

4055
fn poll_next<S: Stream<Item = T>, T>(
41-
stream: &mut Pin<Box<S>>,
56+
stream: Pin<&mut Fuse<S>>,
4257
cx: &mut Context<'_>,
43-
) -> (Option<T>, bool) {
44-
match stream.as_mut().poll_next(cx) {
45-
Poll::Ready(Some(it)) => (Some(it), false),
46-
Poll::Ready(None) => (None, true),
47-
Poll::Pending => (None, false),
58+
) -> Option<T> {
59+
match stream.poll_next(cx) {
60+
Poll::Ready(Some(it)) => Some(it),
61+
_ => None,
4862
}
4963
}
5064

5165
paste! {
66+
let mut did_update_value = false;
5267
$(
53-
let ([<event_ $stream:lower>], [<is_done_ $stream:lower>]) = poll_next(&mut this.[<$stream:lower>], cx);
54-
)+
55-
let did_update_value = $(
56-
[<event_ $stream:lower>].is_some()
57-
)||+;
58-
$(
59-
if [<event_ $stream:lower>].is_some() {
60-
this.[<$type:lower>] = [<event_ $stream:lower>];
61-
}
62-
)+
63-
let all_done = $(
64-
[<is_done_ $stream:lower>]
65-
)&&+;
66-
let all_emitted = $(
67-
this.[<$type:lower>].is_some()
68-
)&&+;
68+
if !this.[<$stream:lower>].is_done() {
69+
let next = poll_next(this.[<$stream:lower>].as_mut(), cx);
70+
71+
if !did_update_value {
72+
did_update_value = next.is_some();
73+
}
6974

70-
let should_emit_next = all_emitted && did_update_value;
75+
if next.is_some() {
76+
*this.[<$type:lower>] = next;
77+
}
78+
};
79+
)+
80+
let should_emit_next = did_update_value && $(this.[<$type:lower>].is_some())&&+;
7181

72-
match (all_done, should_emit_next) {
73-
(true, _) => Poll::Ready(None),
74-
(false, true) => Poll::Ready(Some((
82+
if should_emit_next {
83+
// maybe to_owned can be avoided? Event/Rc?
84+
Poll::Ready(Some((
7585
$(
7686
this.[<$type:lower>].as_ref().unwrap().to_owned()
7787
),+
78-
))),
79-
_ => Poll::Pending,
88+
)))
89+
} else if $(this.[<$stream:lower>].is_done())&&+ {
90+
Poll::Ready(None)
91+
} else {
92+
Poll::Pending
8093
}
8194
}
8295
}

0 commit comments

Comments
 (0)