Skip to content

Commit b81e98b

Browse files
basic CI
1 parent 4c5d4d9 commit b81e98b

File tree

8 files changed

+168
-51
lines changed

8 files changed

+168
-51
lines changed

.github/workflows/main.yml

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
on:
2+
merge_group:
3+
pull_request:
4+
push:
5+
branches:
6+
- main
7+
8+
name: base check
9+
jobs:
10+
check:
11+
name: Check
12+
runs-on: ubuntu-latest
13+
steps:
14+
- name: Checkout sources
15+
uses: actions/checkout@v1
16+
17+
- name: Install toolchain
18+
uses: actions-rs/toolchain@v1
19+
with:
20+
profile: minimal
21+
toolchain: stable
22+
override: true
23+
- name: Run cargo check
24+
uses: actions-rs/cargo@v1
25+
with:
26+
command: check
27+
28+
test:
29+
name: Test Suite
30+
runs-on: ubuntu-latest
31+
steps:
32+
- name: Checkout sources
33+
uses: actions/checkout@v1
34+
35+
- name: Install toolchain
36+
uses: actions-rs/toolchain@v1
37+
with:
38+
profile: minimal
39+
toolchain: stable
40+
override: true
41+
42+
- name: Run cargo test
43+
uses: actions-rs/cargo@v1
44+
with:
45+
command: test
46+
args: --all-features
47+
48+
rustfmt:
49+
name: rust code format style check
50+
runs-on: ubuntu-latest
51+
steps:
52+
- name: Checkout sources
53+
uses: actions/checkout@v1
54+
55+
- name: Install toolchain
56+
uses: actions-rs/toolchain@v1
57+
with:
58+
profile: minimal
59+
toolchain: stable
60+
override: true
61+
components: rustfmt
62+
63+
- name: Run cargo fmt
64+
uses: actions-rs/cargo@v1
65+
with:
66+
command: fmt
67+
args: --all -- --check
68+
69+
clippy_check:
70+
runs-on: ubuntu-latest
71+
steps:
72+
- uses: actions/checkout@v1
73+
- uses: actions-rs/toolchain@v1
74+
with:
75+
toolchain: stable
76+
components: clippy
77+
override: true
78+
- uses: actions-rs/clippy-check@v1
79+
with:
80+
token: ${{ secrets.GITHUB_TOKEN }}
81+
args: --all-targets --all-features -- -D warnings

src/main.rs

-45
This file was deleted.

src/stream/controller.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ impl<T> Controller<T> {
1818
self.buffer.push_back(value);
1919
}
2020

21-
pub(crate) fn next(&mut self) -> Poll<Option<T>> {
21+
pub(crate) fn pop(&mut self) -> Poll<Option<T>> {
2222
match self.buffer.pop_front() {
2323
Some(it) => Poll::Ready(Some(it)),
2424
None => {

src/stream/observable.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ impl<T> Stream for Observable<T> {
2323
type Item = Event<T>;
2424

2525
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
26-
self.as_mut().inner.borrow_mut().next()
26+
self.as_mut().inner.borrow_mut().pop()
2727
}
2828
}

src/subject.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ pub trait Subject {
99

1010
fn subscribe(&mut self) -> Observable<Self::Item>;
1111
fn close(&mut self);
12-
fn push(&mut self, value: Self::Item);
12+
fn next(&mut self, value: Self::Item);
1313
}

src/subject/behavior_subject.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl<T> Subject for BehaviorSubject<T> {
4343
}
4444
}
4545

46-
fn push(&mut self, value: Self::Item) {
46+
fn next(&mut self, value: Self::Item) {
4747
let rc = Rc::new(value);
4848

4949
self.latest_event = Some(Rc::clone(&rc));

src/subject/publish_subject.rs

+82-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl<T> Subject for PublishSubject<T> {
3838
}
3939
}
4040

41-
fn push(&mut self, value: Self::Item) {
41+
fn next(&mut self, value: Self::Item) {
4242
let rc = Rc::new(value);
4343

4444
for sub in &mut self.subscriptions.iter().flat_map(|it| it.upgrade()) {
@@ -55,3 +55,84 @@ impl<T> PublishSubject<T> {
5555
}
5656
}
5757
}
58+
59+
#[cfg(test)]
60+
mod test {
61+
use futures::{executor::block_on, StreamExt};
62+
63+
use super::*;
64+
65+
#[test]
66+
fn subscribe_before_events() {
67+
let mut subject = PublishSubject::new();
68+
let obs = subject.subscribe();
69+
70+
subject.next(1);
71+
subject.next(2);
72+
subject.next(3);
73+
subject.close();
74+
75+
block_on(async {
76+
let res = obs.map(|it| *it.as_inner_ref()).collect::<Vec<i32>>().await;
77+
78+
assert_eq!(res, [1, 2, 3]);
79+
});
80+
}
81+
82+
#[test]
83+
fn subscribe_after_events() {
84+
let mut subject = PublishSubject::new();
85+
86+
subject.next(1);
87+
subject.next(2);
88+
subject.next(3);
89+
subject.close();
90+
91+
let obs = subject.subscribe();
92+
93+
block_on(async {
94+
let res = obs.map(|it| *it.as_inner_ref()).collect::<Vec<i32>>().await;
95+
96+
assert_eq!(res, []);
97+
});
98+
}
99+
100+
#[test]
101+
fn ok_event_ownership() {
102+
let mut subject = PublishSubject::new();
103+
let obs = subject.subscribe();
104+
105+
subject.next(1);
106+
subject.next(2);
107+
subject.next(3);
108+
subject.close();
109+
110+
block_on(async {
111+
let res = obs.map(|it| it.try_unwrap()).collect::<Vec<_>>().await;
112+
113+
assert_eq!(res, [Ok(1), Ok(2), Ok(3)]);
114+
});
115+
}
116+
117+
#[test]
118+
fn err_event_ownership() {
119+
let mut subject = PublishSubject::new();
120+
let obs = subject.subscribe();
121+
let some_other_obs = subject.subscribe();
122+
123+
subject.next(1);
124+
subject.next(2);
125+
subject.next(3);
126+
subject.close();
127+
128+
block_on(async {
129+
let res = obs.map(|it| it.try_unwrap()).collect::<Vec<_>>().await;
130+
131+
for it in res {
132+
assert!(it.is_err(), "Event was not Err()");
133+
}
134+
});
135+
136+
drop(some_other_obs);
137+
}
138+
}

src/subject/replay_subject.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl<T> Subject for ReplaySubject<T> {
5050
}
5151
}
5252

53-
fn push(&mut self, value: Self::Item) {
53+
fn next(&mut self, value: Self::Item) {
5454
let rc = Rc::new(value);
5555

5656
if let ReplayStrategy::BufferSize(size) = &self.replay_strategy {

0 commit comments

Comments
 (0)