Skip to content

Commit b26484b

Browse files
initial macro attempt
1 parent 90ec75b commit b26484b

File tree

1 file changed

+63
-52
lines changed

1 file changed

+63
-52
lines changed

src/stream/rx/combine_latest.rs

+63-52
Original file line numberDiff line numberDiff line change
@@ -2,70 +2,81 @@ use futures::stream::Stream;
22
use std::pin::Pin;
33
use std::task::{Context, Poll};
44

5-
pub struct CombineLatest<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1, T2> {
6-
stream_1: Pin<Box<S1>>,
7-
stream_2: Pin<Box<S2>>,
8-
latest_value_1: Option<T1>,
9-
latest_value_2: Option<T2>,
10-
}
5+
// todo: setup this macro to allow CombineLatest[2..n]
6+
macro_rules! combine_latest {
7+
($name:ident; $($stream:ident),+; $($type:ident),+) => {
8+
pub struct $name<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1, T2> {
9+
stream_1: Pin<Box<S1>>,
10+
stream_2: Pin<Box<S2>>,
11+
latest_value_1: Option<T1>,
12+
latest_value_2: Option<T2>,
13+
}
1114

12-
impl<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1, T2> CombineLatest<S1, S2, T1, T2> {
13-
pub fn new(stream_1: S1, stream_2: S2) -> Self {
14-
CombineLatest {
15-
stream_1: Box::pin(stream_1),
16-
stream_2: Box::pin(stream_2),
17-
latest_value_1: None,
18-
latest_value_2: None,
15+
impl<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1, T2> $name<S1, S2, T1, T2> {
16+
pub fn new(stream_1: S1, stream_2: S2) -> Self {
17+
$name {
18+
stream_1: Box::pin(stream_1),
19+
stream_2: Box::pin(stream_2),
20+
latest_value_1: None,
21+
latest_value_2: None,
22+
}
23+
}
1924
}
20-
}
21-
}
2225

23-
impl<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1: Clone + Unpin, T2: Clone + Unpin> Stream
24-
for CombineLatest<S1, S2, T1, T2>
25-
{
26-
type Item = (T1, T2);
26+
impl<
27+
S1: Stream<Item = T1>,
28+
S2: Stream<Item = T2>,
29+
T1: Clone + Unpin,
30+
T2: Clone + Unpin,
31+
> Stream for $name<S1, S2, T1, T2>
32+
{
33+
type Item = (T1, T2);
2734

28-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
29-
let this = self.get_mut();
35+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
let this = self.get_mut();
3037

31-
fn poll_next<S: Stream<Item = T>, T>(
32-
stream: &mut Pin<Box<S>>,
33-
cx: &mut Context<'_>,
34-
) -> (Option<T>, bool) {
35-
match stream.as_mut().poll_next(cx) {
36-
Poll::Ready(Some(it)) => (Some(it), false),
37-
Poll::Ready(None) => (None, true),
38-
Poll::Pending => (None, false),
39-
}
40-
}
38+
fn poll_next<S: Stream<Item = T>, T>(
39+
stream: &mut Pin<Box<S>>,
40+
cx: &mut Context<'_>,
41+
) -> (Option<T>, bool) {
42+
match stream.as_mut().poll_next(cx) {
43+
Poll::Ready(Some(it)) => (Some(it), false),
44+
Poll::Ready(None) => (None, true),
45+
Poll::Pending => (None, false),
46+
}
47+
}
4148

42-
let (event_1, is_done_1) = poll_next(&mut this.stream_1, cx);
43-
let (event_2, is_done_2) = poll_next(&mut this.stream_2, cx);
44-
let did_update_value = event_1.is_some() || event_2.is_some();
49+
let (event_1, is_done_1) = poll_next(&mut this.stream_1, cx);
50+
let (event_2, is_done_2) = poll_next(&mut this.stream_2, cx);
51+
let did_update_value = event_1.is_some() || event_2.is_some();
4552

46-
if event_1.is_some() {
47-
this.latest_value_1 = event_1;
48-
}
53+
if event_1.is_some() {
54+
this.latest_value_1 = event_1;
55+
}
4956

50-
if event_2.is_some() {
51-
this.latest_value_2 = event_2;
52-
}
57+
if event_2.is_some() {
58+
this.latest_value_2 = event_2;
59+
}
5360

54-
let all_done = is_done_1 && is_done_2;
55-
let all_emitted = this.latest_value_1.is_some() && this.latest_value_2.is_some();
56-
let should_emit_next = all_emitted && did_update_value;
61+
let all_done = is_done_1 && is_done_2;
62+
let all_emitted = this.latest_value_1.is_some() && this.latest_value_2.is_some();
63+
let should_emit_next = all_emitted && did_update_value;
5764

58-
match (all_done, should_emit_next) {
59-
(true, _) => Poll::Ready(None),
60-
(false, true) => Poll::Ready(Some((
61-
this.latest_value_1.as_ref().unwrap().to_owned(),
62-
this.latest_value_2.as_ref().unwrap().to_owned(),
63-
))),
64-
_ => Poll::Pending,
65+
match (all_done, should_emit_next) {
66+
(true, _) => Poll::Ready(None),
67+
(false, true) => Poll::Ready(Some((
68+
this.latest_value_1.as_ref().unwrap().to_owned(),
69+
this.latest_value_2.as_ref().unwrap().to_owned(),
70+
))),
71+
_ => Poll::Pending,
72+
}
73+
}
6574
}
66-
}
75+
};
6776
}
6877

78+
combine_latest!(CombineLatest2; S1, S2; T1, T2);
79+
6980
#[test]
7081
fn test() {
7182
use futures::executor::block_on;
@@ -74,7 +85,7 @@ fn test() {
7485

7586
let s1 = stream::iter([1, 2, 3]);
7687
let s2 = stream::iter([6, 7, 8, 9]);
77-
let mut stream = CombineLatest::new(s1, s2);
88+
let mut stream = CombineLatest2::new(s1, s2);
7889

7990
block_on(async {
8091
while let Some(it) = stream.next().await {

0 commit comments

Comments
 (0)