Skip to content

Commit 2ffdc80

Browse files
authored
added start_on_thread and send_message_on (#60)
* Added spawn_on_thread and send_message_on * Improved error handling * Corrected comment * Moved a log message from error to debug and updated version
1 parent 7ec21e9 commit 2ffdc80

File tree

5 files changed

+74
-12
lines changed

5 files changed

+74
-12
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ members = [
1616
]
1717

1818
[workspace.dependencies]
19-
spawned-rt = { path = "rt", version = "0.4.1" }
20-
spawned-concurrency = { path = "concurrency", version = "0.4.1" }
19+
spawned-rt = { path = "rt", version = "0.4.2" }
20+
spawned-concurrency = { path = "concurrency", version = "0.4.2" }
2121
tracing = { version = "0.1.41", features = ["log"] }
2222
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
2323

2424
[workspace.package]
25-
version = "0.4.1"
25+
version = "0.4.2"
2626
license = "MIT"
2727
edition = "2021"

concurrency/src/tasks/gen_server.rs

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@ use crate::{
44
error::GenServerError,
55
tasks::InitResult::{NoSuccess, Success},
66
};
7-
use futures::future::FutureExt as _;
8-
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
7+
use core::pin::pin;
8+
use futures::future::{self, FutureExt as _};
9+
use spawned_rt::{
10+
tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken, JoinHandle},
11+
threads,
12+
};
913
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};
1014

1115
const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);
@@ -27,7 +31,7 @@ impl<G: GenServer> Clone for GenServerHandle<G> {
2731
}
2832

2933
impl<G: GenServer> GenServerHandle<G> {
30-
pub(crate) fn new(gen_server: G) -> Self {
34+
fn new(gen_server: G) -> Self {
3135
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
3236
let cancellation_token = CancellationToken::new();
3337
let handle = GenServerHandle {
@@ -51,7 +55,7 @@ impl<G: GenServer> GenServerHandle<G> {
5155
handle_clone
5256
}
5357

54-
pub(crate) fn new_blocking(gen_server: G) -> Self {
58+
fn new_blocking(gen_server: G) -> Self {
5559
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
5660
let cancellation_token = CancellationToken::new();
5761
let handle = GenServerHandle {
@@ -70,6 +74,25 @@ impl<G: GenServer> GenServerHandle<G> {
7074
handle_clone
7175
}
7276

77+
fn new_on_thread(gen_server: G) -> Self {
78+
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
79+
let cancellation_token = CancellationToken::new();
80+
let handle = GenServerHandle {
81+
tx,
82+
cancellation_token,
83+
};
84+
let handle_clone = handle.clone();
85+
// Ignore the JoinHandle for now. Maybe we'll use it in the future
86+
let _join_handle = threads::spawn(|| {
87+
threads::block_on(async move {
88+
if let Err(error) = gen_server.run(&handle, &mut rx).await {
89+
tracing::trace!(%error, "GenServer crashed")
90+
};
91+
})
92+
});
93+
handle_clone
94+
}
95+
7396
pub fn sender(&self) -> mpsc::Sender<GenServerInMsg<G>> {
7497
self.tx.clone()
7598
}
@@ -153,6 +176,15 @@ pub trait GenServer: Send + Sized {
153176
GenServerHandle::new_blocking(self)
154177
}
155178

179+
/// For some "singleton" GenServers that run througout the whole execution of the
180+
/// program, it makes sense to run in their own dedicated thread to avoid interference
181+
/// with the rest of the tasks' runtime.
182+
/// The use of tokio::task::spawm_blocking is not recommended for these scenarios
183+
/// as it is a limited thread pool better suited for blocking IO tasks that eventually end
184+
fn start_on_thread(self) -> GenServerHandle<Self> {
185+
GenServerHandle::new_on_thread(self)
186+
}
187+
156188
fn run(
157189
self,
158190
handle: &GenServerHandle<Self>,
@@ -300,6 +332,36 @@ pub trait GenServer: Send + Sized {
300332
}
301333
}
302334

335+
/// Spawns a task that awaits on a future and sends a message to a GenServer
336+
/// on completion.
337+
/// This function returns a handle to the spawned task.
338+
pub fn send_message_on<T, U>(
339+
handle: GenServerHandle<T>,
340+
future: U,
341+
message: T::CastMsg,
342+
) -> JoinHandle<()>
343+
where
344+
T: GenServer,
345+
U: Future + Send + 'static,
346+
<U as Future>::Output: Send,
347+
{
348+
let cancelation_token = handle.cancellation_token();
349+
let mut handle_clone = handle.clone();
350+
let join_handle = rt::spawn(async move {
351+
let is_cancelled = pin!(cancelation_token.cancelled());
352+
let signal = pin!(future);
353+
match future::select(is_cancelled, signal).await {
354+
future::Either::Left(_) => tracing::debug!("GenServer stopped"),
355+
future::Either::Right(_) => {
356+
if let Err(e) = handle_clone.cast(message).await {
357+
tracing::error!("Failed to send message: {e:?}")
358+
}
359+
}
360+
}
361+
});
362+
join_handle
363+
}
364+
303365
#[cfg(debug_assertions)]
304366
mod warn_on_block {
305367
use super::*;

concurrency/src/tasks/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ mod stream_tests;
1212
mod timer_tests;
1313

1414
pub use gen_server::{
15-
CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, InitResult,
16-
InitResult::NoSuccess, InitResult::Success,
15+
send_message_on, CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg,
16+
InitResult, InitResult::NoSuccess, InitResult::Success,
1717
};
1818
pub use process::{send, Process, ProcessInfo};
1919
pub use stream::spawn_listener;

examples/blocking_genserver/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl GenServer for WellBehavedTask {
9999
pub fn main() {
100100
rt::run(async move {
101101
// If we change BadlyBehavedTask to start instead, it can stop the entire program
102-
let mut badboy = BadlyBehavedTask::new().start_blocking();
102+
let mut badboy = BadlyBehavedTask::new().start_on_thread();
103103
let _ = badboy.cast(()).await;
104104
let mut goodboy = WellBehavedTask::new(0).start();
105105
let _ = goodboy.cast(()).await;

0 commit comments

Comments
 (0)