Skip to content

Commit ef3c1f3

Browse files
combine_latest impl
1 parent b26484b commit ef3c1f3

File tree

2 files changed

+64
-46
lines changed

2 files changed

+64
-46
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ edition = "2021"
66
[dependencies]
77
futures = "0.3.31"
88
itertools = "0.13.0"
9+
paste = "1.0.15"

src/stream/rx/combine_latest.rs

+63-46
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,38 @@
11
use futures::stream::Stream;
2+
use paste::paste;
23
use std::pin::Pin;
34
use std::task::{Context, Poll};
45

5-
// todo: setup this macro to allow CombineLatest[2..n]
66
macro_rules! combine_latest {
77
($name:ident; $($stream:ident),+; $($type:ident),+) => {
8-
pub struct $name<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1, T2> {
9-
stream_1: Pin<Box<S1>>,
10-
stream_2: Pin<Box<S2>>,
11-
latest_value_1: Option<T1>,
12-
latest_value_2: Option<T2>,
8+
paste! {
9+
pub struct $name<$($stream: Stream<Item = $type>),+, $($type),+> {
10+
$(
11+
[<$stream:lower>]: Pin<Box<$stream>>,
12+
[<$type:lower>]: Option<$type>,
13+
)+
14+
}
1315
}
1416

15-
impl<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1, T2> $name<S1, S2, T1, T2> {
16-
pub fn new(stream_1: S1, stream_2: S2) -> Self {
17-
$name {
18-
stream_1: Box::pin(stream_1),
19-
stream_2: Box::pin(stream_2),
20-
latest_value_1: None,
21-
latest_value_2: None,
17+
impl<$($stream: Stream<Item = $type>),+, $($type),+> $name<$($stream),+, $($type),+> {
18+
paste! {
19+
#[allow(clippy::too_many_arguments)]
20+
pub fn new($(
21+
[<$stream:lower>]: $stream),+
22+
) -> Self {
23+
$name {
24+
$(
25+
[<$stream:lower>]: Box::pin([<$stream:lower>]),
26+
[<$type:lower>]: None,
27+
)+
28+
}
2229
}
2330
}
2431
}
2532

26-
impl<
27-
S1: Stream<Item = T1>,
28-
S2: Stream<Item = T2>,
29-
T1: Clone + Unpin,
30-
T2: Clone + Unpin,
31-
> Stream for $name<S1, S2, T1, T2>
33+
impl<$($stream: Stream<Item = $type>),+, $($type: Clone + Unpin),+> Stream for $name<$($stream),+, $($type),+>
3234
{
33-
type Item = (T1, T2);
35+
type Item = ($($type),+);
3436

3537
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3638
let this = self.get_mut();
@@ -46,36 +48,50 @@ macro_rules! combine_latest {
4648
}
4749
}
4850

49-
let (event_1, is_done_1) = poll_next(&mut this.stream_1, cx);
50-
let (event_2, is_done_2) = poll_next(&mut this.stream_2, cx);
51-
let did_update_value = event_1.is_some() || event_2.is_some();
51+
paste! {
52+
$(
53+
let ([<event_ $stream:lower>], [<is_done_ $stream:lower>]) = poll_next(&mut this.[<$stream:lower>], cx);
54+
)+
55+
let did_update_value = $(
56+
[<event_ $stream:lower>].is_some()
57+
)||+;
58+
$(
59+
if [<event_ $stream:lower>].is_some() {
60+
this.[<$type:lower>] = [<event_ $stream:lower>];
61+
}
62+
)+
63+
let all_done = $(
64+
[<is_done_ $stream:lower>]
65+
)&&+;
66+
let all_emitted = $(
67+
this.[<$type:lower>].is_some()
68+
)&&+;
5269

53-
if event_1.is_some() {
54-
this.latest_value_1 = event_1;
55-
}
56-
57-
if event_2.is_some() {
58-
this.latest_value_2 = event_2;
59-
}
70+
let should_emit_next = all_emitted && did_update_value;
6071

61-
let all_done = is_done_1 && is_done_2;
62-
let all_emitted = this.latest_value_1.is_some() && this.latest_value_2.is_some();
63-
let should_emit_next = all_emitted && did_update_value;
64-
65-
match (all_done, should_emit_next) {
66-
(true, _) => Poll::Ready(None),
67-
(false, true) => Poll::Ready(Some((
68-
this.latest_value_1.as_ref().unwrap().to_owned(),
69-
this.latest_value_2.as_ref().unwrap().to_owned(),
70-
))),
71-
_ => Poll::Pending,
72+
match (all_done, should_emit_next) {
73+
(true, _) => Poll::Ready(None),
74+
(false, true) => Poll::Ready(Some((
75+
$(
76+
this.[<$type:lower>].as_ref().unwrap().to_owned()
77+
),+
78+
))),
79+
_ => Poll::Pending,
80+
}
7281
}
7382
}
7483
}
7584
};
7685
}
7786

78-
combine_latest!(CombineLatest2; S1, S2; T1, T2);
87+
combine_latest!(CombineLatest2;S1,S2;T1,T2);
88+
combine_latest!(CombineLatest3;S1,S2,S3;T1,T2,T3);
89+
combine_latest!(CombineLatest4;S1,S2,S3,S4;T1,T2,T3,T4);
90+
combine_latest!(CombineLatest5;S1,S2,S3,S4,S5;T1,T2,T3,T4,T5);
91+
combine_latest!(CombineLatest6;S1,S2,S3,S4,S5,S6;T1,T2,T3,T4,T5,T6);
92+
combine_latest!(CombineLatest7;S1,S2,S3,S4,S5,S6,S7;T1,T2,T3,T4,T5,T6,T7);
93+
combine_latest!(CombineLatest8;S1,S2,S3,S4,S5,S6,S7,S8;T1,T2,T3,T4,T5,T6,T7,T8);
94+
combine_latest!(CombineLatest9;S1,S2,S3,S4,S5,S6,S7,S8,S9;T1,T2,T3,T4,T5,T6,T7,T8,T9);
7995

8096
#[test]
8197
fn test() {
@@ -85,11 +101,12 @@ fn test() {
85101

86102
let s1 = stream::iter([1, 2, 3]);
87103
let s2 = stream::iter([6, 7, 8, 9]);
88-
let mut stream = CombineLatest2::new(s1, s2);
104+
let s3 = stream::iter([0]);
105+
let stream = CombineLatest3::new(s1, s2, s3);
89106

90107
block_on(async {
91-
while let Some(it) = stream.next().await {
92-
println!("{:?}", it);
93-
}
108+
let res = stream.collect::<Vec<_>>().await;
109+
110+
assert_eq!(res, [(1, 6, 0), (2, 7, 0), (3, 8, 0), (3, 9, 0),]);
94111
});
95112
}

0 commit comments

Comments
 (0)