Skip to content

Commit 90ec75b

Browse files
basic combine_latest_2
1 parent f023dc3 commit 90ec75b

File tree

3 files changed

+86
-0
lines changed

3 files changed

+86
-0
lines changed

src/stream.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod controller;
22
pub mod event;
33
pub mod observable;
4+
pub mod rx;

src/stream/rx.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod combine_latest;

src/stream/rx/combine_latest.rs

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use futures::stream::Stream;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
pub struct CombineLatest<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1, T2> {
6+
stream_1: Pin<Box<S1>>,
7+
stream_2: Pin<Box<S2>>,
8+
latest_value_1: Option<T1>,
9+
latest_value_2: Option<T2>,
10+
}
11+
12+
impl<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1, T2> CombineLatest<S1, S2, T1, T2> {
13+
pub fn new(stream_1: S1, stream_2: S2) -> Self {
14+
CombineLatest {
15+
stream_1: Box::pin(stream_1),
16+
stream_2: Box::pin(stream_2),
17+
latest_value_1: None,
18+
latest_value_2: None,
19+
}
20+
}
21+
}
22+
23+
impl<S1: Stream<Item = T1>, S2: Stream<Item = T2>, T1: Clone + Unpin, T2: Clone + Unpin> Stream
24+
for CombineLatest<S1, S2, T1, T2>
25+
{
26+
type Item = (T1, T2);
27+
28+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
29+
let this = self.get_mut();
30+
31+
fn poll_next<S: Stream<Item = T>, T>(
32+
stream: &mut Pin<Box<S>>,
33+
cx: &mut Context<'_>,
34+
) -> (Option<T>, bool) {
35+
match stream.as_mut().poll_next(cx) {
36+
Poll::Ready(Some(it)) => (Some(it), false),
37+
Poll::Ready(None) => (None, true),
38+
Poll::Pending => (None, false),
39+
}
40+
}
41+
42+
let (event_1, is_done_1) = poll_next(&mut this.stream_1, cx);
43+
let (event_2, is_done_2) = poll_next(&mut this.stream_2, cx);
44+
let did_update_value = event_1.is_some() || event_2.is_some();
45+
46+
if event_1.is_some() {
47+
this.latest_value_1 = event_1;
48+
}
49+
50+
if event_2.is_some() {
51+
this.latest_value_2 = event_2;
52+
}
53+
54+
let all_done = is_done_1 && is_done_2;
55+
let all_emitted = this.latest_value_1.is_some() && this.latest_value_2.is_some();
56+
let should_emit_next = all_emitted && did_update_value;
57+
58+
match (all_done, should_emit_next) {
59+
(true, _) => Poll::Ready(None),
60+
(false, true) => Poll::Ready(Some((
61+
this.latest_value_1.as_ref().unwrap().to_owned(),
62+
this.latest_value_2.as_ref().unwrap().to_owned(),
63+
))),
64+
_ => Poll::Pending,
65+
}
66+
}
67+
}
68+
69+
#[test]
70+
fn test() {
71+
use futures::executor::block_on;
72+
use futures::stream::{self};
73+
use futures::StreamExt;
74+
75+
let s1 = stream::iter([1, 2, 3]);
76+
let s2 = stream::iter([6, 7, 8, 9]);
77+
let mut stream = CombineLatest::new(s1, s2);
78+
79+
block_on(async {
80+
while let Some(it) = stream.next().await {
81+
println!("{:?}", it);
82+
}
83+
});
84+
}

0 commit comments

Comments
 (0)