Skip to content

Commit

Permalink
feat: Add queue fetch balance strategy
Browse files Browse the repository at this point in the history
The Redis API used to fetch jobs
(([brpop](https://redis.io/docs/latest/commands/brpop/)) checks queues
for jobs in the order the queues are provided. This means that if the
first queue in the list provided to `Processor::new` always has an
item, the other queues will never have their jobs run.

Add a `BalanceStrategy` to allow ensuring that no queue is starved
indefinitely. The initial (and default) algorithm implemented is a
basic round robin algorithm. Before each fetch from Redis, the
`Processor#queues` list is rotated by 1, ensuring that every queue
has a chance to have its jobs run.
  • Loading branch information
spencewenski committed Jan 3, 2025
1 parent c35bb46 commit ab61a8f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use crate::redis::{
};
pub use ::redis as redis_rs;
pub use middleware::{ChainIter, ServerMiddleware};
pub use processor::{Processor, ProcessorConfig, QueueConfig, WorkFetcher};
pub use processor::{BalanceStrategy, Processor, ProcessorConfig, QueueConfig, WorkFetcher};
pub use scheduled::Scheduled;
pub use stats::{Counter, StatsPublisher};

Expand Down
49 changes: 46 additions & 3 deletions src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
periodic::PeriodicJob, Chain, Counter, Job, RedisPool, Scheduled, ServerMiddleware,
StatsPublisher, UnitOfWork, Worker, WorkerRef,
};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use tokio::select;
use tokio::task::JoinSet;
Expand All @@ -19,7 +19,7 @@ pub enum WorkFetcher {
#[derive(Clone)]
pub struct Processor {
redis: RedisPool,
queues: Vec<String>,
queues: VecDeque<String>,
human_readable_queues: Vec<String>,
periodic_jobs: Vec<PeriodicJob>,
workers: BTreeMap<String, Arc<WorkerRef>>,
Expand All @@ -45,11 +45,34 @@ pub struct ProcessorConfig {
/// waiting for responses, etc), this can probably be quite a bit higher than your CPU count.
pub num_workers: usize,

/// The strategy for balancing the priority of fetching queues' jobs from Redis. Defaults
/// to [`BalanceStrategy::RoundRobin`].
///
/// The Redis API used to fetch jobs ([brpop](https://redis.io/docs/latest/commands/brpop/))
/// checks queues for jobs in the order the queues are provided. This means that if the first
/// queue in the list provided to [`Processor::new`] always has an item, the other queues
/// will never have their jobs run. To mitigate this, a [`BalanceStrategy`] can be provided
/// to allow ensuring that no queue is starved indefinitely.
pub balance_strategy: BalanceStrategy,

/// Queue-specific configurations. The queues specified in this field do not need to match
/// the list of queues provided to [`Processor::new`].
pub queue_configs: BTreeMap<String, QueueConfig>,
}

#[derive(Default, Clone)]
#[non_exhaustive]
pub enum BalanceStrategy {
/// Rotate the list of queues by 1 every time jobs are fetched from Redis. This allows each
/// queue in the list to have an equal opportunity to have its jobs run.
#[default]
RoundRobin,
/// Do not modify the list of queues. Warning: This can lead to queue starvation! For example,
/// if the first queue in the list provided to [`Processor::new`] is heavily used and always
/// has a job available to run, then the jobs in the other queues will never run.
None,
}

#[derive(Default, Clone)]
#[non_exhaustive]
pub struct QueueConfig {
Expand All @@ -66,6 +89,12 @@ impl ProcessorConfig {
self
}

#[must_use]
pub fn balance_strategy(mut self, balance_strategy: BalanceStrategy) -> Self {
self.balance_strategy = balance_strategy;
self
}

#[must_use]
pub fn queue_config(mut self, queue: String, config: QueueConfig) -> Self {
self.queue_configs.insert(queue, config);
Expand Down Expand Up @@ -109,11 +138,13 @@ impl Processor {
}

pub async fn fetch(&mut self) -> Result<Option<UnitOfWork>> {
self.run_balance_strategy();

let response: Option<(String, String)> = self
.redis
.get()
.await?
.brpop(self.queues.clone(), 2)
.brpop(self.queues.clone().into(), 2)
.await?;

if let Some((queue, job_raw)) = response {
Expand All @@ -124,6 +155,18 @@ impl Processor {
Ok(None)
}

/// Re-order the `Processor#queues` based on the `ProcessorConfig#balance_strategy`.
fn run_balance_strategy(&mut self) {
if self.queues.is_empty() {
return;
}

match self.config.balance_strategy {
BalanceStrategy::RoundRobin => self.queues.rotate_right(1),
BalanceStrategy::None => {}
}
}

pub async fn process_one(&mut self) -> Result<()> {
loop {
if self.cancellation_token.is_cancelled() {
Expand Down
5 changes: 3 additions & 2 deletions tests/process_async_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ mod test {
use async_trait::async_trait;
use bb8::Pool;
use sidekiq::{
Processor, ProcessorConfig, QueueConfig, RedisConnectionManager, RedisPool, Result,
WorkFetcher, Worker,
BalanceStrategy, Processor, ProcessorConfig, QueueConfig, RedisConnectionManager,
RedisPool, Result, WorkFetcher, Worker,
};
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -34,6 +34,7 @@ mod test {
let p = Processor::new(redis.clone(), vec![queue]).with_config(
ProcessorConfig::default()
.num_workers(1)
.balance_strategy(BalanceStrategy::RoundRobin)
.queue_config(
"dedicated queue 1".to_string(),
QueueConfig::default().num_workers(10),
Expand Down

0 comments on commit ab61a8f

Please sign in to comment.