Skip to content

Commit d1c767b

Browse files
into_ref_stream to wrap into Ref Events
1 parent c9d29fe commit d1c767b

File tree

5 files changed

+159
-1
lines changed

5 files changed

+159
-1
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ edition = "2021"
77
futures = "0.3.31"
88
itertools = "0.13.0"
99
paste = "1.0.15"
10+
pin-project-lite = "0.2.15"

src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
pub mod stream;
22
pub mod subject;
3+
pub mod traits;
34

45
pub mod prelude {
56
pub use crate::{
67
stream::event::*,
78
stream::rx::combine_latest::*,
89
subject::{behavior_subject::*, publish_subject::*, replay_subject::*},
10+
traits::stream_ext::*,
911
};
1012
}
1113

src/stream/event.rs

+48-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
use std::rc::Rc;
1+
use std::{
2+
pin::Pin,
3+
rc::Rc,
4+
task::{Context, Poll},
5+
};
6+
7+
use futures::{Stream, StreamExt};
8+
9+
use super::controller::Controller;
210

311
#[derive(Debug)]
412
pub struct Event<T>(pub(crate) Rc<T>);
@@ -18,3 +26,42 @@ impl<T> Clone for Event<T> {
1826
Self(Rc::clone(&self.0))
1927
}
2028
}
29+
30+
pub struct EventStream<T, S: Stream<Item = T>> {
31+
stream: Pin<Box<S>>,
32+
controller: Controller<Event<T>>,
33+
}
34+
35+
impl<T, S: Stream<Item = T>> EventStream<T, S> {
36+
pub(crate) fn new(stream: S) -> Self {
37+
Self {
38+
stream: Box::pin(stream),
39+
controller: Controller::new(),
40+
}
41+
}
42+
43+
pub(crate) fn is_done(&self) -> bool {
44+
self.controller.is_done
45+
}
46+
}
47+
48+
impl<T, S: Stream<Item = T>> Stream for EventStream<T, S> {
49+
type Item = Event<T>;
50+
51+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52+
let this = self.get_mut();
53+
54+
match this.stream.poll_next_unpin(cx) {
55+
Poll::Ready(Some(it)) => {
56+
let event = Rc::new(it);
57+
this.controller.push(Event(Rc::clone(&event)));
58+
}
59+
Poll::Ready(None) => {
60+
this.controller.is_done = true;
61+
}
62+
_ => {}
63+
};
64+
65+
this.controller.pop()
66+
}
67+
}

src/traits.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod stream_ext;

src/traits/stream_ext.rs

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use std::{
2+
pin::Pin,
3+
task::{Context, Poll},
4+
};
5+
6+
use futures::{ready, stream::FusedStream, Stream};
7+
use pin_project_lite::pin_project;
8+
9+
use crate::events::{Event, EventStream};
10+
11+
macro_rules! delegate_access_inner {
12+
($field:ident, $inner:ty, ($($ind:tt)*)) => {
13+
/// Acquires a reference to the underlying sink or stream that this combinator is
14+
/// pulling from.
15+
pub fn get_ref(&self) -> &$inner {
16+
(&self.$field) $($ind get_ref())*
17+
}
18+
19+
/// Acquires a mutable reference to the underlying sink or stream that this
20+
/// combinator is pulling from.
21+
///
22+
/// Note that care must be taken to avoid tampering with the state of the
23+
/// sink or stream which may otherwise confuse this combinator.
24+
pub fn get_mut(&mut self) -> &mut $inner {
25+
(&mut self.$field) $($ind get_mut())*
26+
}
27+
28+
/// Acquires a pinned mutable reference to the underlying sink or stream that this
29+
/// combinator is pulling from.
30+
///
31+
/// Note that care must be taken to avoid tampering with the state of the
32+
/// sink or stream which may otherwise confuse this combinator.
33+
pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> {
34+
self.project().$field $($ind get_pin_mut())*
35+
}
36+
37+
/// Consumes this combinator, returning the underlying sink or stream.
38+
///
39+
/// Note that this may discard intermediate state of this combinator, so
40+
/// care should be taken to avoid losing resources when this is called.
41+
pub fn into_inner(self) -> $inner {
42+
self.$field $($ind into_inner())*
43+
}
44+
}
45+
}
46+
47+
impl<T: ?Sized> RxStreamExt for T where T: Stream {}
48+
pub trait RxStreamExt: Stream {
49+
fn into_ref_stream(self) -> IntoRefStream<Self>
50+
where
51+
Self: Sized,
52+
{
53+
assert_stream::<Event<Self::Item>, _>(IntoRefStream::new(self))
54+
}
55+
}
56+
57+
pin_project! {
58+
/// Stream for the [`into_ref_stream`](RxStreamExt::into_ref_stream) method.
59+
#[must_use = "streams do nothing unless polled"]
60+
pub struct IntoRefStream<S: Stream> {
61+
#[pin]
62+
stream: EventStream<S::Item, S>,
63+
}
64+
}
65+
66+
impl<S: Stream> IntoRefStream<S> {
67+
pub(crate) fn new(stream: S) -> Self {
68+
Self {
69+
stream: EventStream::new(stream),
70+
}
71+
}
72+
73+
delegate_access_inner!(stream, EventStream<S::Item, S>, ());
74+
}
75+
76+
impl<S> FusedStream for IntoRefStream<S>
77+
where
78+
S: FusedStream,
79+
{
80+
fn is_terminated(&self) -> bool {
81+
self.stream.is_done()
82+
}
83+
}
84+
85+
impl<S> Stream for IntoRefStream<S>
86+
where
87+
S: Stream,
88+
{
89+
type Item = Event<S::Item>;
90+
91+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
92+
let mut this = self.project();
93+
let res = ready!(this.stream.as_mut().poll_next(cx));
94+
Poll::Ready(res)
95+
}
96+
97+
fn size_hint(&self) -> (usize, Option<usize>) {
98+
self.stream.size_hint()
99+
}
100+
}
101+
102+
pub(crate) fn assert_stream<T, S>(stream: S) -> S
103+
where
104+
S: Stream<Item = T>,
105+
{
106+
stream
107+
}

0 commit comments

Comments
 (0)