From ab61a8f408df34d13812d55a9b23daff74c06901 Mon Sep 17 00:00:00 2001 From: Spencer Ferris <3319370+spencewenski@users.noreply.github.com> Date: Tue, 31 Dec 2024 01:42:27 -0800 Subject: [PATCH] feat: Add queue fetch balance strategy The Redis API used to fetch jobs (([brpop](https://redis.io/docs/latest/commands/brpop/)) checks queues for jobs in the order the queues are provided. This means that if the first queue in the list provided to `Processor::new` always has an item, the other queues will never have their jobs run. Add a `BalanceStrategy` to allow ensuring that no queue is starved indefinitely. The initial (and default) algorithm implemented is a basic round robin algorithm. Before each fetch from Redis, the `Processor#queues` list is rotated by 1, ensuring that every queue has a chance to have its jobs run. --- src/lib.rs | 2 +- src/processor.rs | 49 ++++++++++++++++++++++++++++++++++--- tests/process_async_test.rs | 5 ++-- 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ec945d1..15a5254 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ pub use crate::redis::{ }; pub use ::redis as redis_rs; pub use middleware::{ChainIter, ServerMiddleware}; -pub use processor::{Processor, ProcessorConfig, QueueConfig, WorkFetcher}; +pub use processor::{BalanceStrategy, Processor, ProcessorConfig, QueueConfig, WorkFetcher}; pub use scheduled::Scheduled; pub use stats::{Counter, StatsPublisher}; diff --git a/src/processor.rs b/src/processor.rs index 2ec2ec2..53ad707 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -3,7 +3,7 @@ use crate::{ periodic::PeriodicJob, Chain, Counter, Job, RedisPool, Scheduled, ServerMiddleware, StatsPublisher, UnitOfWork, Worker, WorkerRef, }; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use tokio::select; use tokio::task::JoinSet; @@ -19,7 +19,7 @@ pub enum WorkFetcher { #[derive(Clone)] pub struct Processor { redis: RedisPool, - queues: Vec, + queues: VecDeque, human_readable_queues: Vec, periodic_jobs: Vec, workers: BTreeMap>, @@ -45,11 +45,34 @@ pub struct ProcessorConfig { /// waiting for responses, etc), this can probably be quite a bit higher than your CPU count. pub num_workers: usize, + /// The strategy for balancing the priority of fetching queues' jobs from Redis. Defaults + /// to [`BalanceStrategy::RoundRobin`]. + /// + /// The Redis API used to fetch jobs ([brpop](https://redis.io/docs/latest/commands/brpop/)) + /// checks queues for jobs in the order the queues are provided. This means that if the first + /// queue in the list provided to [`Processor::new`] always has an item, the other queues + /// will never have their jobs run. To mitigate this, a [`BalanceStrategy`] can be provided + /// to allow ensuring that no queue is starved indefinitely. + pub balance_strategy: BalanceStrategy, + /// Queue-specific configurations. The queues specified in this field do not need to match /// the list of queues provided to [`Processor::new`]. pub queue_configs: BTreeMap, } +#[derive(Default, Clone)] +#[non_exhaustive] +pub enum BalanceStrategy { + /// Rotate the list of queues by 1 every time jobs are fetched from Redis. This allows each + /// queue in the list to have an equal opportunity to have its jobs run. + #[default] + RoundRobin, + /// Do not modify the list of queues. Warning: This can lead to queue starvation! For example, + /// if the first queue in the list provided to [`Processor::new`] is heavily used and always + /// has a job available to run, then the jobs in the other queues will never run. + None, +} + #[derive(Default, Clone)] #[non_exhaustive] pub struct QueueConfig { @@ -66,6 +89,12 @@ impl ProcessorConfig { self } + #[must_use] + pub fn balance_strategy(mut self, balance_strategy: BalanceStrategy) -> Self { + self.balance_strategy = balance_strategy; + self + } + #[must_use] pub fn queue_config(mut self, queue: String, config: QueueConfig) -> Self { self.queue_configs.insert(queue, config); @@ -109,11 +138,13 @@ impl Processor { } pub async fn fetch(&mut self) -> Result> { + self.run_balance_strategy(); + let response: Option<(String, String)> = self .redis .get() .await? - .brpop(self.queues.clone(), 2) + .brpop(self.queues.clone().into(), 2) .await?; if let Some((queue, job_raw)) = response { @@ -124,6 +155,18 @@ impl Processor { Ok(None) } + /// Re-order the `Processor#queues` based on the `ProcessorConfig#balance_strategy`. + fn run_balance_strategy(&mut self) { + if self.queues.is_empty() { + return; + } + + match self.config.balance_strategy { + BalanceStrategy::RoundRobin => self.queues.rotate_right(1), + BalanceStrategy::None => {} + } + } + pub async fn process_one(&mut self) -> Result<()> { loop { if self.cancellation_token.is_cancelled() { diff --git a/tests/process_async_test.rs b/tests/process_async_test.rs index 81deab5..a0e58d1 100644 --- a/tests/process_async_test.rs +++ b/tests/process_async_test.rs @@ -3,8 +3,8 @@ mod test { use async_trait::async_trait; use bb8::Pool; use sidekiq::{ - Processor, ProcessorConfig, QueueConfig, RedisConnectionManager, RedisPool, Result, - WorkFetcher, Worker, + BalanceStrategy, Processor, ProcessorConfig, QueueConfig, RedisConnectionManager, + RedisPool, Result, WorkFetcher, Worker, }; use std::sync::{Arc, Mutex}; @@ -34,6 +34,7 @@ mod test { let p = Processor::new(redis.clone(), vec![queue]).with_config( ProcessorConfig::default() .num_workers(1) + .balance_strategy(BalanceStrategy::RoundRobin) .queue_config( "dedicated queue 1".to_string(), QueueConfig::default().num_workers(10),