Skip to content

Commit b576988

Browse files
committed
Add a builder for arbiter
1 parent 95ca8f0 commit b576988

File tree

4 files changed

+139
-27
lines changed

4 files changed

+139
-27
lines changed

actix-rt/CHANGES.md

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## Unreleased
44

5+
- Add `actix_rt::ArbiterBuilder` to allow user to configure the thread spawned for the arbiter.
6+
57
## 2.9.0
68

79
- Add `actix_rt::System::runtime()` method to retrieve the underlying `actix_rt::Runtime` runtime.

actix-rt/src/arbiter.rs

+114-26
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55
pin::Pin,
66
sync::atomic::{AtomicUsize, Ordering},
77
task::{Context, Poll},
8-
thread,
8+
thread, usize,
99
};
1010

1111
use futures_core::ready;
@@ -80,42 +80,79 @@ impl ArbiterHandle {
8080
}
8181
}
8282

83-
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
84-
/// and functions.
85-
///
86-
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
87-
#[derive(Debug)]
88-
pub struct Arbiter {
89-
tx: mpsc::UnboundedSender<ArbiterCommand>,
90-
thread_handle: thread::JoinHandle<()>,
83+
/// A builder for configuring and spawning a new [Arbiter] thread.
84+
pub struct ArbiterBuilder {
85+
name_factory: Option<Box<dyn Fn(usize, usize) -> String + 'static>>,
86+
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
87+
runtime_factory: Option<Box<dyn Fn() -> tokio::runtime::Runtime + Send + 'static>>,
9188
}
9289

93-
impl Arbiter {
94-
/// Spawn a new Arbiter thread and start its event loop.
95-
///
96-
/// # Panics
97-
/// Panics if a [System] is not registered on the current thread.
98-
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
90+
impl ArbiterBuilder {
91+
/// Create a new [ArbiterBuilder].
9992
#[allow(clippy::new_without_default)]
100-
pub fn new() -> Arbiter {
101-
Self::with_tokio_rt(|| {
102-
crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
103-
})
93+
pub fn new() -> Self {
94+
Self {
95+
name_factory: None,
96+
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
97+
runtime_factory: None,
98+
}
10499
}
105100

106-
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
101+
/// Specify a factory function for generating the name of the Arbiter thread.
102+
///
103+
/// Defaults to `actix-rt|system:<system_id>|arbiter:<arb_id>`
104+
///
105+
/// # Example
106+
///
107+
/// ```no_run
108+
/// let _ = actix_rt::System::new();
109+
/// actix_rt::ArbiterBuilder::new()
110+
/// .name(|system_id, arb_id| {
111+
/// format!("some-prefix|system:{}|arbiter:{}", system_id, arb_id)
112+
/// })
113+
/// .build();
114+
/// ```
115+
pub fn name<N>(mut self, name_factory: N) -> Self
116+
where
117+
N: Fn(usize, usize) -> String + 'static,
118+
{
119+
self.name_factory = Some(Box::new(name_factory));
120+
self
121+
}
122+
123+
/// Specify a factory function for generating the [Tokio Runtime](tokio-runtime) used by the Arbiter.
107124
///
108125
/// [tokio-runtime]: tokio::runtime::Runtime
109126
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
110-
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
127+
pub fn runtime<R>(mut self, runtime_factory: R) -> Self
111128
where
112-
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
129+
R: Fn() -> tokio::runtime::Runtime + Send + 'static,
113130
{
131+
self.runtime_factory = Some(Box::new(runtime_factory));
132+
self
133+
}
134+
135+
/// Spawn a new Arbiter thread and start its event loop.
136+
///
137+
/// # Panics
138+
/// Panics if a [System] is not registered on the current thread.
139+
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
140+
pub fn build(self) -> Arbiter {
114141
let sys = System::current();
115142
let system_id = sys.id();
116143
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
117144

118-
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
145+
let name = self.name_factory.unwrap_or_else(|| {
146+
Box::new(|system_id, arb_id| {
147+
format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id)
148+
})
149+
})(system_id, arb_id);
150+
let runtime_factory = self.runtime_factory.unwrap_or_else(|| {
151+
Box::new(|| {
152+
crate::runtime::default_tokio_runtime()
153+
.expect("Cannot create new Arbiter's Runtime.")
154+
})
155+
});
119156
let (tx, rx) = mpsc::unbounded_channel();
120157

121158
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
@@ -160,13 +197,16 @@ impl Arbiter {
160197
/// # Panics
161198
/// Panics if a [System] is not registered on the current thread.
162199
#[cfg(all(target_os = "linux", feature = "io-uring"))]
163-
#[allow(clippy::new_without_default)]
164-
pub fn new() -> Arbiter {
200+
pub fn build(self) -> Arbiter {
165201
let sys = System::current();
166202
let system_id = sys.id();
167203
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
168204

169-
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
205+
let name = self.name_factory.unwrap_or_else(|| {
206+
Box::new(|system_id, arb_id| {
207+
format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id)
208+
})
209+
})(system_id, arb_id);
170210
let (tx, rx) = mpsc::unbounded_channel();
171211

172212
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
@@ -204,6 +244,54 @@ impl Arbiter {
204244

205245
Arbiter { tx, thread_handle }
206246
}
247+
}
248+
249+
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
250+
/// and functions.
251+
///
252+
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
253+
#[derive(Debug)]
254+
pub struct Arbiter {
255+
tx: mpsc::UnboundedSender<ArbiterCommand>,
256+
thread_handle: thread::JoinHandle<()>,
257+
}
258+
259+
impl Arbiter {
260+
/// Create an [ArbiterBuilder] to configure and spawn a new Arbiter thread.
261+
pub fn builder() -> ArbiterBuilder {
262+
ArbiterBuilder::new()
263+
}
264+
265+
/// Spawn a new Arbiter thread and start its event loop.
266+
///
267+
/// # Panics
268+
/// Panics if a [System] is not registered on the current thread.
269+
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
270+
#[allow(clippy::new_without_default)]
271+
pub fn new() -> Arbiter {
272+
ArbiterBuilder::new().build()
273+
}
274+
275+
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
276+
///
277+
/// [tokio-runtime]: tokio::runtime::Runtime
278+
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
279+
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
280+
where
281+
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
282+
{
283+
ArbiterBuilder::new().runtime(runtime_factory).build()
284+
}
285+
286+
/// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
287+
///
288+
/// # Panics
289+
/// Panics if a [System] is not registered on the current thread.
290+
#[cfg(all(target_os = "linux", feature = "io-uring"))]
291+
#[allow(clippy::new_without_default)]
292+
pub fn new() -> Arbiter {
293+
ArbiterBuilder::new().build()
294+
}
207295

208296
/// Sets up an Arbiter runner in a new System using the environment's local set.
209297
pub(crate) fn in_new_system() -> ArbiterHandle {

actix-rt/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub use tokio::pin;
6767
use tokio::task::JoinHandle;
6868

6969
pub use self::{
70-
arbiter::{Arbiter, ArbiterHandle},
70+
arbiter::{Arbiter, ArbiterBuilder, ArbiterHandle},
7171
runtime::Runtime,
7272
system::{System, SystemRunner},
7373
};

actix-rt/tests/tests.rs

+22
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,28 @@ fn new_arbiter_with_tokio() {
301301
assert!(!counter.load(Ordering::SeqCst));
302302
}
303303

304+
#[test]
305+
fn arbiter_builder_name() {
306+
let _ = System::new();
307+
308+
let arbiter = Arbiter::builder()
309+
.name(|_, _| "test_thread".to_string())
310+
.build();
311+
312+
let (tx, rx) = std::sync::mpsc::channel();
313+
arbiter.spawn(async move {
314+
let current_thread = std::thread::current();
315+
let thread_name = current_thread.name().unwrap().to_string();
316+
tx.send(thread_name).unwrap();
317+
});
318+
319+
let name = rx.recv().unwrap();
320+
assert_eq!(name, "test_thread");
321+
322+
arbiter.stop();
323+
arbiter.join().unwrap();
324+
}
325+
304326
#[test]
305327
#[should_panic]
306328
fn no_system_current_panic() {

0 commit comments

Comments
 (0)