-
Notifications
You must be signed in to change notification settings - Fork 35
Open
Description
Greetings!
Why Err
close stream? I can't use .filter(..)
in that case.
use async_stream::{stream, try_stream};
use futures_util::{pin_mut, Stream, StreamExt};
use std::future::ready;
fn error_or_ok(rcx: usize) -> Result<usize, String> {
if rcx % 2 == 0 {
Ok(rcx)
} else {
Err(format!("Error{rcx}"))
}
}
fn return_stream() -> impl Stream<Item = Result<usize, String>> {
//stream! {
try_stream! {
let mut rcx = 0usize;
loop {
//yield Ok(error_or_ok(rcx)?); // with stream! closes
//yield {move || { Ok(error_or_ok(rcx)?) }}(); // with stream! ok
yield error_or_ok(rcx)?; // with try_stream! closes
rcx += 1;
}
}
.filter(|data| {
let ret = if let Err(error) = data {
eprintln!("Got error: `{error}`.");
false
} else {
true
};
ready(ret)
})
}
#[tokio::main]
async fn main() {
let stream = return_stream();
pin_mut!(stream);
println!("Some: `{:?}`", stream.next().await);
println!("None: `{:?}`", stream.next().await);
println!("Some: `{:?}`", stream.next().await);
println!("None: `{:?}`", stream.next().await);
}
bruno-ortiz
Metadata
Metadata
Assignees
Labels
No labels