Skip to content

Commit

Permalink
Merge pull request #55 from spencewenski/dedicated-queue-workers
Browse files Browse the repository at this point in the history
feat: Enable spawning workers dedicated to a specific queue
  • Loading branch information
film42 authored Jan 3, 2025
2 parents 80c6ea8 + b8733e7 commit c35bb46
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use crate::redis::{
};
pub use ::redis as redis_rs;
pub use middleware::{ChainIter, ServerMiddleware};
pub use processor::{Processor, ProcessorConfig, WorkFetcher};
pub use processor::{Processor, ProcessorConfig, QueueConfig, WorkFetcher};
pub use scheduled::Scheduled;
pub use stats::{Counter, StatsPublisher};

Expand Down Expand Up @@ -456,7 +456,7 @@ impl<'de> Deserialize<'de> for RetryOpts {
{
struct RetryOptsVisitor;

impl<'de> Visitor<'de> for RetryOptsVisitor {
impl Visitor<'_> for RetryOptsVisitor {
type Value = RetryOpts;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
Expand Down
101 changes: 74 additions & 27 deletions src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,34 @@ pub struct Processor {
config: ProcessorConfig,
}

#[derive(Clone)]
#[allow(clippy::manual_non_exhaustive)]
#[derive(Default, Clone)]
#[non_exhaustive]
pub struct ProcessorConfig {
/// The number of Sidekiq workers that can run at the same time. Adjust as needed based on
/// your workload and resource (cpu/memory/etc) usage.
///
/// This config value controls how many workers are spawned to handle the queues provided
/// to [`Processor::new`]. These workers will be shared across all of these queues.
///
/// If your workload is largely CPU-bound (computationally expensive), this should probably
/// match your CPU count. This is the default.
///
/// If your workload is largely IO-bound (e.g. reading from a DB, making web requests and
/// waiting for responses, etc), this can probably be quite a bit higher than your CPU count.
pub num_workers: usize,
// Disallow consumers from directly creating a ProcessorConfig object.
_private: (),

/// Queue-specific configurations. The queues specified in this field do not need to match
/// the list of queues provided to [`Processor::new`].
pub queue_configs: BTreeMap<String, QueueConfig>,
}

#[derive(Default, Clone)]
#[non_exhaustive]
pub struct QueueConfig {
/// Similar to `ProcessorConfig#num_workers`, except allows configuring the number of
/// additional workers to dedicate to a specific queue. If provided, `num_workers` additional
/// workers will be created for this specific queue.
pub num_workers: usize,
}

impl ProcessorConfig {
Expand All @@ -51,14 +65,19 @@ impl ProcessorConfig {
self.num_workers = num_workers;
self
}

#[must_use]
pub fn queue_config(mut self, queue: String, config: QueueConfig) -> Self {
self.queue_configs.insert(queue, config);
self
}
}

impl Default for ProcessorConfig {
fn default() -> Self {
Self {
num_workers: num_cpus::get(),
_private: Default::default(),
}
impl QueueConfig {
#[must_use]
pub fn num_workers(mut self, num_workers: usize) -> Self {
self.num_workers = num_workers;
self
}
}

Expand Down Expand Up @@ -205,26 +224,54 @@ impl Processor {
pub async fn run(self) {
let mut join_set: JoinSet<()> = JoinSet::new();

// Start worker routines.
for i in 0..self.config.num_workers {
join_set.spawn({
let mut processor = self.clone();
let cancellation_token = self.cancellation_token.clone();

async move {
loop {
if let Err(err) = processor.process_one().await {
error!("Error leaked out the bottom: {:?}", err);
}

if cancellation_token.is_cancelled() {
break;
}
// Logic for spawning shared workers (workers that handles multiple queues) and dedicated
// workers (workers that handle a single queue).
let spawn_worker = |mut processor: Processor,
cancellation_token: CancellationToken,
num: usize,
dedicated_queue_name: Option<String>| {
async move {
loop {
if let Err(err) = processor.process_one().await {
error!("Error leaked out the bottom: {:?}", err);
}

debug!("Broke out of loop for worker {}", i);
if cancellation_token.is_cancelled() {
break;
}
}
});

let dedicated_queue_str = dedicated_queue_name
.map(|name| format!(" dedicated to queue '{name}'"))
.unwrap_or_default();
debug!("Broke out of loop for worker {num}{dedicated_queue_str}");
}
};

// Start worker routines.
for i in 0..self.config.num_workers {
join_set.spawn(spawn_worker(
self.clone(),
self.cancellation_token.clone(),
i,
None,
));
}

// Start dedicated worker routines.
for (queue, config) in &self.config.queue_configs {
for i in 0..config.num_workers {
join_set.spawn({
let mut processor = self.clone();
processor.queues = [queue.clone()].into();
spawn_worker(
processor,
self.cancellation_token.clone(),
i,
Some(queue.clone()),
)
});
}
}

// Start sidekiq-web metrics publisher.
Expand Down
18 changes: 16 additions & 2 deletions tests/process_async_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
mod test {
use async_trait::async_trait;
use bb8::Pool;
use sidekiq::{Processor, RedisConnectionManager, RedisPool, Result, WorkFetcher, Worker};
use sidekiq::{
Processor, ProcessorConfig, QueueConfig, RedisConnectionManager, RedisPool, Result,
WorkFetcher, Worker,
};
use std::sync::{Arc, Mutex};

#[async_trait]
Expand All @@ -28,7 +31,18 @@ mod test {
redis.flushall().await;

// Sidekiq server
let p = Processor::new(redis.clone(), vec![queue]);
let p = Processor::new(redis.clone(), vec![queue]).with_config(
ProcessorConfig::default()
.num_workers(1)
.queue_config(
"dedicated queue 1".to_string(),
QueueConfig::default().num_workers(10),
)
.queue_config(
"dedicated queue 2".to_string(),
QueueConfig::default().num_workers(100),
),
);

(p, redis)
}
Expand Down

0 comments on commit c35bb46

Please sign in to comment.