diff --git a/go.mod b/go.mod index a7b5397..731d416 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ toolchain go1.24.1 require ( github.com/go-redsync/redsync/v4 v4.15.0 github.com/redis/go-redis/v9 v9.17.2 + github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.9.0 github.com/tinh-tinh/tinhtinh/v2 v2.5.0 golang.org/x/crypto v0.45.0 diff --git a/go.sum b/go.sum index 472140a..8304535 100644 --- a/go.sum +++ b/go.sum @@ -14,10 +14,12 @@ github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOr github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redsync/redsync/v4 v4.14.0 h1:zyxzFJsmQHIPBl8iBT7KFKohWsjsghgGLiP8TnFMLNc= +github.com/go-redsync/redsync/v4 v4.14.0/go.mod h1:twMlVd19upZ/juvJyJGlQOSQxor1oeHtjs62l4pRFzo= github.com/go-redsync/redsync/v4 v4.15.0 h1:KH/XymuxSV7vyKs6z1Cxxj+N+N18JlPxgXeP6x4JY54= github.com/go-redsync/redsync/v4 v4.15.0/go.mod h1:qNp+lLs3vkfZbtA/aM/OjlZHfEr5YTAYhRktFPKHC7s= -github.com/gomodule/redigo v1.9.3 h1:dNPSXeXv6HCq2jdyWfjgmhBdqnR6PRO3m/G05nvpPC8= -github.com/gomodule/redigo v1.9.3/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw= +github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s= +github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -25,22 +27,28 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ= +github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI= github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= -github.com/redis/rueidis v1.0.69 h1:WlUefRhuDekji5LsD387ys3UCJtSFeBVf0e5yI0B8b4= -github.com/redis/rueidis v1.0.69/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA= -github.com/redis/rueidis/rueidiscompat v1.0.69 h1:IWVYY9lXdjNO3do2VpJT7aDFi8zbCUuQxZB6E2Grahs= -github.com/redis/rueidis/rueidiscompat v1.0.69/go.mod h1:iC4Y8DoN0Uth0Uezg9e2trvNRC7QAgGeuP2OPLb5ccI= +github.com/redis/rueidis v1.0.64 h1:XqgbueDuNV3qFdVdQwAHJl1uNt90zUuAJuzqjH4cw6Y= +github.com/redis/rueidis v1.0.64/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA= +github.com/redis/rueidis/rueidiscompat v1.0.64 h1:M8JbLP4LyHQhBLBRsUQIzui8/LyTtdESNIMVveqm4RY= +github.com/redis/rueidis/rueidiscompat v1.0.64/go.mod h1:8pJVPhEjpw0izZFSxYwDziUiEYEkEklTSw/nZzga61M= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= +github.com/tinh-tinh/tinhtinh/v2 v2.3.4 h1:vxhaoPnp3pGNcdXKDG7nVai+V+lYoJHWtm7pzTNapJY= +github.com/tinh-tinh/tinhtinh/v2 v2.3.4/go.mod h1:4nppE7KAIswZKutI9ElMqAD9kyash7aea0Ewowsqj5g= github.com/tinh-tinh/tinhtinh/v2 v2.5.0 h1:SqCanZJKKgbVsDwoaPe136fZGYoXSKZ6fLciGO0KsoY= github.com/tinh-tinh/tinhtinh/v2 v2.5.0/go.mod h1:4nppE7KAIswZKutI9ElMqAD9kyash7aea0Ewowsqj5g= golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= -golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= -golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/job.go b/job.go index bad662d..0b06d88 100644 --- a/job.go +++ b/job.go @@ -109,10 +109,14 @@ func (job *Job) HandlerError(reasonError string) { job.queue.formatLog(LoggerWarn, "Add job %s for retry (%d remains) ", job.Id, job.RetryFailures) } -// IsReady returns true if the job is ready to be processed. -// Jobs are ready if they are waiting or active. +// IsReady returns true if the job is ready to be processed. If the job uses a +// scheduler, it will always be ready. Otherwise, the job is ready if it is +// waiting or active. func (job *Job) IsReady() bool { - return job.Status == WaitStatus || job.Status == ActiveStatus + if job.queue.scheduler == nil { + return job.Status == WaitStatus || job.Status == ActiveStatus + } + return true } // IsFinished returns true if the job has finished, either successfully or with an error. diff --git a/pattern_parser.go b/pattern_parser.go deleted file mode 100644 index 389a106..0000000 --- a/pattern_parser.go +++ /dev/null @@ -1,116 +0,0 @@ -package queue - -import ( - "fmt" - "strconv" - "strings" - "time" -) - -// parsePattern parses a cron-like pattern and returns the polling interval. -// Supports two formats: -// 1. @every format (e.g., "@every 1s", "@every 5m") -// 2. Cron expressions (e.g., "*/5 * * * *", "0 */2 * * *") -// Returns an error if the pattern is invalid or unsupported. -func parsePattern(pattern string) (time.Duration, error) { - if pattern == "" { - return 0, fmt.Errorf("pattern cannot be empty") - } - - // Trim whitespace - pattern = strings.TrimSpace(pattern) - - // Check for @every prefix - if strings.HasPrefix(pattern, "@every ") { - return parseEveryPattern(pattern) - } - - // Try to parse as cron expression - return parseCronPattern(pattern) -} - -// parseEveryPattern parses @every format patterns. -func parseEveryPattern(pattern string) (time.Duration, error) { - // Extract the duration part after "@every " - durationStr := strings.TrimSpace(strings.TrimPrefix(pattern, "@every ")) - if durationStr == "" { - return 0, fmt.Errorf("missing duration in pattern: %s", pattern) - } - - // Parse the duration using time.ParseDuration - duration, err := time.ParseDuration(durationStr) - if err != nil { - return 0, fmt.Errorf("invalid duration '%s': %w", durationStr, err) - } - - // Validate that duration is positive - if duration <= 0 { - return 0, fmt.Errorf("duration must be positive, got: %s", duration) - } - - return duration, nil -} - -// parseCronPattern parses cron expressions and calculates the polling interval. -// Supports standard 5-field cron format: minute hour day month weekday -// Examples: -// - "*/5 * * * *" → every 5 minutes -// - "0 * * * *" → every hour -// - "0 0 * * *" → every day (24 hours) -// - "0 0 * * 0" → every week (7 days) -func parseCronPattern(pattern string) (time.Duration, error) { - fields := strings.Fields(pattern) - if len(fields) != 5 { - return 0, fmt.Errorf("invalid cron expression: expected 5 fields, got %d in '%s'", len(fields), pattern) - } - - minute, hour, day, month, weekday := fields[0], fields[1], fields[2], fields[3], fields[4] - - // Parse minute field for */N patterns - if strings.HasPrefix(minute, "*/") { - intervalStr := strings.TrimPrefix(minute, "*/") - interval, err := strconv.Atoi(intervalStr) - if err != nil { - return 0, fmt.Errorf("invalid minute interval '%s': %w", intervalStr, err) - } - if interval <= 0 || interval > 59 { - return 0, fmt.Errorf("minute interval must be between 1 and 59, got %d", interval) - } - return time.Duration(interval) * time.Minute, nil - } - - // Parse hour field for */N patterns - if strings.HasPrefix(hour, "*/") { - intervalStr := strings.TrimPrefix(hour, "*/") - interval, err := strconv.Atoi(intervalStr) - if err != nil { - return 0, fmt.Errorf("invalid hour interval '%s': %w", intervalStr, err) - } - if interval <= 0 || interval > 23 { - return 0, fmt.Errorf("hour interval must be between 1 and 23, got %d", interval) - } - return time.Duration(interval) * time.Hour, nil - } - - // Hourly: "0 * * * *" or "N * * * *" - if hour == "*" && day == "*" && month == "*" && weekday == "*" { - return 1 * time.Hour, nil - } - - // Daily: "0 0 * * *" or "N N * * *" - if day == "*" && month == "*" && weekday == "*" { - return 24 * time.Hour, nil - } - - // Weekly: "0 0 * * N" (specific weekday) - if day == "*" && month == "*" && weekday != "*" { - return 7 * 24 * time.Hour, nil - } - - // Monthly: "0 0 N * *" (specific day of month) - if month == "*" && weekday == "*" && day != "*" { - return 30 * 24 * time.Hour, nil // Approximate as 30 days - } - - return 0, fmt.Errorf("unsupported cron pattern: %s (consider using @every format)", pattern) -} diff --git a/pattern_parser_test.go b/pattern_parser_test.go deleted file mode 100644 index a1c74c7..0000000 --- a/pattern_parser_test.go +++ /dev/null @@ -1,185 +0,0 @@ -package queue_test - -import ( - "testing" - "time" - - "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/require" - "github.com/tinh-tinh/queue/v2" -) - -func Test_ParsePattern_Valid(t *testing.T) { - tests := []struct { - name string - pattern string - expected time.Duration - }{ - // @every patterns - { - name: "1 second", - pattern: "@every 1s", - expected: 1 * time.Second, - }, - { - name: "5 seconds", - pattern: "@every 5s", - expected: 5 * time.Second, - }, - { - name: "1 minute", - pattern: "@every 1m", - expected: 1 * time.Minute, - }, - { - name: "5 minutes", - pattern: "@every 5m", - expected: 5 * time.Minute, - }, - { - name: "1 hour", - pattern: "@every 1h", - expected: 1 * time.Hour, - }, - { - name: "complex duration", - pattern: "@every 1h30m45s", - expected: 1*time.Hour + 30*time.Minute + 45*time.Second, - }, - { - name: "with extra spaces", - pattern: "@every 5s ", - expected: 5 * time.Second, - }, - { - name: "milliseconds", - pattern: "@every 500ms", - expected: 500 * time.Millisecond, - }, - // Cron patterns - { - name: "every 5 minutes (cron)", - pattern: "*/5 * * * *", - expected: 5 * time.Minute, - }, - { - name: "every 15 minutes (cron)", - pattern: "*/15 * * * *", - expected: 15 * time.Minute, - }, - { - name: "every 2 hours (cron)", - pattern: "0 */2 * * *", - expected: 2 * time.Hour, - }, - { - name: "every 6 hours (cron)", - pattern: "0 */6 * * *", - expected: 6 * time.Hour, - }, - { - name: "hourly (cron)", - pattern: "0 * * * *", - expected: 1 * time.Hour, - }, - { - name: "daily (cron)", - pattern: "0 0 * * *", - expected: 24 * time.Hour, - }, - { - name: "weekly (cron)", - pattern: "0 0 * * 0", - expected: 7 * 24 * time.Hour, - }, - { - name: "monthly (cron)", - pattern: "0 0 1 * *", - expected: 30 * 24 * time.Hour, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Test indirectly through Queue creation - q := queue.New("test_pattern_"+tt.name, &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 1, - RetryFailures: 0, - Pattern: tt.pattern, - // Don't set ScheduleInterval to force pattern parsing - }) - require.NotNil(t, q) - }) - } -} - -func Test_ParsePattern_Invalid(t *testing.T) { - tests := []struct { - name string - pattern string - }{ - { - name: "empty pattern", - pattern: "", - }, - { - name: "missing duration", - pattern: "@every ", - }, - { - name: "invalid duration", - pattern: "@every abc", - }, - { - name: "negative duration", - pattern: "@every -5s", - }, - { - name: "invalid cron - too few fields", - pattern: "*/5 * *", - }, - { - name: "invalid cron - too many fields", - pattern: "*/5 * * * * *", - }, - { - name: "invalid cron - bad minute interval", - pattern: "*/abc * * * *", - }, - { - name: "invalid cron - minute out of range", - pattern: "*/60 * * * *", - }, - { - name: "invalid cron - hour out of range", - pattern: "0 */24 * * *", - }, - { - name: "unsupported cron pattern", - pattern: "5,10,15 * * * *", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // These should fall back to default 5s interval with a warning log - q := queue.New("test_invalid_"+tt.name, &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 1, - RetryFailures: 0, - Pattern: tt.pattern, - DisableLog: true, // Disable logs to avoid clutter in tests - }) - require.NotNil(t, q) - }) - } -} diff --git a/queue.go b/queue.go index c332514..886e939 100644 --- a/queue.go +++ b/queue.go @@ -13,6 +13,7 @@ import ( "github.com/go-redsync/redsync/v4" "github.com/go-redsync/redsync/v4/redis/goredis/v9" "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" "github.com/tinh-tinh/tinhtinh/v2/common" "github.com/tinh-tinh/tinhtinh/v2/common/logger" ) @@ -20,20 +21,18 @@ import ( type JobFnc func(job *Job) type Queue struct { - Name string - client *redis.Client - mutex *redsync.Mutex - jobFnc JobFnc - jobs []Job - ctx context.Context - schedulerTicker *time.Ticker - schedulerDone chan struct{} - schedulerRunning bool // Track if scheduler is currently running - schedulerKey string - running bool - config Options - Logger Logger - cachedKey string // Cache the computed key to avoid repeated string operations + Name string + client *redis.Client + mutex *redsync.Mutex + jobFnc JobFnc + jobs []Job + ctx context.Context + scheduler *cron.Cron + cronPattern string + running bool + config Options + Logger Logger + cachedKey string // Cache the computed key to avoid repeated string operations } type RateLimiter struct { @@ -46,7 +45,6 @@ type Options struct { RetryFailures int Limiter *RateLimiter Pattern string - ScheduleInterval time.Duration // Polling interval for distributed scheduler (default: 5s) Logger Logger DisableLog bool RemoveOnComplete bool @@ -86,9 +84,15 @@ func New(name string, opt *Options) *Queue { } if opt.Logger == nil { - queue.config.Logger = logger.Create(logger.Options{ - Console: !opt.DisableLog, - }) + queue.config.Logger = logger.Create(logger.Options{}) + } + + if opt.Pattern != "" { + queue.scheduler = cron.New() + queue.cronPattern = opt.Pattern + } + if opt.Timeout == 0 { + queue.config.Timeout = 1 * time.Minute } // Pre-compute and cache the key @@ -98,30 +102,6 @@ func New(name string, opt *Options) *Queue { queue.cachedKey = strings.ToLower(name) } - // Initialize scheduler key for distributed scheduling - queue.schedulerKey = queue.cachedKey + ":scheduled" - - // Start distributed scheduler if Pattern is configured - if opt.Pattern != "" { - interval := opt.ScheduleInterval - if interval == 0 { - // Try to parse the pattern to get interval - parsedInterval, err := parsePattern(opt.Pattern) - if err != nil { - // Log warning and fall back to default - queue.formatLog(LoggerWarn, "Failed to parse pattern '%s': %v, using default 5s interval", opt.Pattern, err) - interval = 5 * time.Second - } else { - interval = parsedInterval - } - } - queue.startScheduler(interval) - } - - if opt.Timeout == 0 { - queue.config.Timeout = 1 * time.Minute - } - return queue } @@ -205,10 +185,18 @@ func mergeSortedJobs(jobs1, jobs2 []Job) []Job { return result } -// Process sets the callback for the queue to process jobs. -// The distributed scheduler (if configured) is already running from New(). +// Process sets the callback for the queue to process jobs. If the queue has a +// scheduler, it will be started with the given cron pattern. Otherwise, the +// callback is simply stored. func (q *Queue) Process(jobFnc JobFnc) { q.jobFnc = jobFnc + if q.scheduler != nil { + _, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() }) + if err != nil { + q.formatLog(LoggerError, "failed to add job: %v", err) + } + q.scheduler.Start() + } } // Run runs all ready jobs in the queue. It locks the mutex, runs all ready jobs @@ -428,32 +416,15 @@ func (q *Queue) IsLimit() bool { // Pause stops the queue from running. When paused, the queue will not accept new // jobs and will not run any jobs in the queue. It will resume when Resume is -// called. The scheduler is also stopped if active. +// called. func (q *Queue) Pause() { q.running = false - q.stopScheduler() } // Resume resumes the queue from a paused state. When resumed, the queue will -// accept new jobs and run any jobs in the queue. The scheduler is also restarted -// if it was previously configured. +// accept new jobs and run any jobs in the queue. func (q *Queue) Resume() { q.running = true - if q.config.Pattern != "" { - interval := q.config.ScheduleInterval - if interval == 0 { - // Try to parse the pattern to get interval - parsedInterval, err := parsePattern(q.config.Pattern) - if err != nil { - // Log warning and fall back to default - q.formatLog(LoggerWarn, "Failed to parse pattern '%s': %v, using default 5s interval", q.config.Pattern, err) - interval = 5 * time.Second - } else { - interval = parsedInterval - } - } - q.startScheduler(interval) - } q.Run() } diff --git a/scheduler.go b/scheduler.go deleted file mode 100644 index 0a387a9..0000000 --- a/scheduler.go +++ /dev/null @@ -1,162 +0,0 @@ -package queue - -import ( - "fmt" - "time" - - "github.com/redis/go-redis/v9" -) - -// startScheduler starts the background scheduler loop that checks for scheduled jobs. -// It polls Redis at the specified interval to find jobs ready to run. -// If a scheduler is already running, this function returns early to prevent goroutine leaks. -func (q *Queue) startScheduler(interval time.Duration) { - if interval == 0 { - interval = 5 * time.Second // Default polling interval - } - - // Prevent starting scheduler if already running - if q.schedulerRunning { - return - } - - // Create new ticker and done channel - q.schedulerTicker = time.NewTicker(interval) - q.schedulerDone = make(chan struct{}) - q.schedulerRunning = true - - go func() { - ticker := q.schedulerTicker - doneChan := q.schedulerDone - for { - select { - case <-ticker.C: - q.processScheduledJobs() - case <-doneChan: - return - } - } - }() - - q.formatLog(LoggerInfo, "Scheduler started with %v interval", interval) -} - -// stopScheduler stops the scheduler gracefully. -func (q *Queue) stopScheduler() { - if !q.schedulerRunning { - return - } - - if q.schedulerDone != nil { - close(q.schedulerDone) - // Small delay to allow goroutine to exit - time.Sleep(10 * time.Millisecond) - } - if q.schedulerTicker != nil { - q.schedulerTicker.Stop() - q.schedulerTicker = nil - } - q.schedulerDone = nil - q.schedulerRunning = false - q.formatLog(LoggerInfo, "Scheduler stopped") -} - -// ScheduleJob adds a job to the scheduled set with the given run time. -// The job will be executed when the current time reaches or exceeds runAt. -func (q *Queue) ScheduleJob(jobId string, runAt time.Time) error { - score := float64(runAt.Unix()) - _, err := q.client.ZAdd(q.ctx, q.schedulerKey, redis.Z{ - Score: score, - Member: jobId, - }).Result() - if err != nil { - return fmt.Errorf("failed to schedule job: %w", err) - } - q.formatLog(LoggerInfo, "Scheduled job %s to run at %s", jobId, runAt.Format(time.RFC3339)) - return nil -} - -// GetScheduledJobs retrieves all scheduled jobs with their scheduled times. -func (q *Queue) GetScheduledJobs() ([]ScheduledJobInfo, error) { - // Get all jobs with scores - results, err := q.client.ZRangeWithScores(q.ctx, q.schedulerKey, 0, -1).Result() - if err != nil { - return nil, fmt.Errorf("failed to get scheduled jobs: %w", err) - } - - scheduledJobs := make([]ScheduledJobInfo, 0, len(results)) - for _, z := range results { - jobId, ok := z.Member.(string) - if !ok { - continue - } - scheduledJobs = append(scheduledJobs, ScheduledJobInfo{ - JobId: jobId, - RunAt: time.Unix(int64(z.Score), 0), - Timestamp: int64(z.Score), - }) - } - - return scheduledJobs, nil -} - -// RemoveScheduledJob removes a job from the scheduled set. -func (q *Queue) RemoveScheduledJob(jobId string) error { - _, err := q.client.ZRem(q.ctx, q.schedulerKey, jobId).Result() - if err != nil { - return fmt.Errorf("failed to remove scheduled job: %w", err) - } - q.formatLog(LoggerInfo, "Removed scheduled job %s", jobId) - return nil -} - -// processScheduledJobs checks for jobs ready to run and moves them to the waiting list. -// This method is called periodically by the scheduler loop. -func (q *Queue) processScheduledJobs() { - now := float64(time.Now().Unix()) - - // Find all jobs with score <= current timestamp - results, err := q.client.ZRangeByScoreWithScores(q.ctx, q.schedulerKey, &redis.ZRangeBy{ - Min: "-inf", - Max: fmt.Sprintf("%f", now), - }).Result() - - if err != nil { - q.formatLog(LoggerError, "Failed to get ready scheduled jobs: %v", err) - return - } - - if len(results) == 0 { - return - } - - // Process each ready job - for _, z := range results { - jobId, ok := z.Member.(string) - if !ok { - continue - } - - // Atomically remove from scheduled set (only one instance will succeed) - removed, err := q.client.ZRem(q.ctx, q.schedulerKey, jobId).Result() - if err != nil || removed == 0 { - // Another instance already processed this job - continue - } - - // Add job to the queue - q.AddJob(AddJobOptions{ - Id: jobId, - Data: nil, // Scheduled jobs don't have data in this implementation - }) - - q.formatLog(LoggerInfo, "Moved scheduled job %s to waiting list", jobId) - } -} - -// ScheduledJobInfo contains information about a scheduled job. -type ScheduledJobInfo struct { - JobId string - RunAt time.Time - Timestamp int64 -} diff --git a/scheduler_leak_test.go b/scheduler_leak_test.go deleted file mode 100644 index 3035544..0000000 --- a/scheduler_leak_test.go +++ /dev/null @@ -1,167 +0,0 @@ -package queue_test - -import ( - "runtime" - "testing" - "time" - - "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/require" - "github.com/tinh-tinh/queue/v2" -) - -// Test_NoGoroutineLeakOnMultipleResume verifies that calling Resume multiple times -// does not leak goroutines by ensuring the old scheduler is stopped before starting a new one. -func Test_NoGoroutineLeakOnMultipleResume(t *testing.T) { - q := queue.New("goroutine_leak_test", &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 1, - RetryFailures: 0, - Pattern: "@every 1s", - ScheduleInterval: 1 * time.Second, - }) - - // Get initial goroutine count - runtime.GC() - time.Sleep(100 * time.Millisecond) - initialGoroutines := runtime.NumGoroutine() - - // Pause and resume multiple times - for i := 0; i < 10; i++ { - q.Pause() - time.Sleep(50 * time.Millisecond) - q.Resume() - time.Sleep(50 * time.Millisecond) - } - - // Final pause to stop scheduler - q.Pause() - - // Allow time for goroutines to clean up - runtime.GC() - time.Sleep(200 * time.Millisecond) - - finalGoroutines := runtime.NumGoroutine() - - // The number of goroutines should not have increased significantly - // Allow for some variance (±2) due to runtime behavior - goroutineDiff := finalGoroutines - initialGoroutines - require.LessOrEqual(t, goroutineDiff, 2, - "Goroutine leak detected: initial=%d, final=%d, diff=%d", - initialGoroutines, finalGoroutines, goroutineDiff) -} - -// Test_SchedulerRestartOnResume verifies that the scheduler properly restarts -// with the correct interval when Resume is called. -func Test_SchedulerRestartOnResume(t *testing.T) { - q := queue.New("scheduler_restart_test", &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 1, - RetryFailures: 0, - Pattern: "@every 1s", - ScheduleInterval: 1 * time.Second, - }) - - // Schedule a job - runAt := time.Now().Add(2 * time.Second) - err := q.ScheduleJob("test_job", runAt) - require.Nil(t, err) - - // Pause and resume - q.Pause() - time.Sleep(500 * time.Millisecond) - q.Resume() - - // Verify scheduler is working by checking if job gets processed - processedJobs := make(map[string]bool) - q.Process(func(job *queue.Job) { - job.Process(func() error { - processedJobs[job.Id] = true - return nil - }) - }) - - // Wait for job to be processed - time.Sleep(3 * time.Second) - - // Verify job was processed - require.True(t, processedJobs["test_job"], "Job should have been processed after resume") -} - -// Test_MultiplePauseCalls verifies that calling Pause multiple times doesn't panic. -func Test_MultiplePauseCalls(t *testing.T) { - q := queue.New("multiple_pause_test", &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 1, - RetryFailures: 0, - Pattern: "@every 1s", - }) - - // Call Pause multiple times - should not panic - q.Pause() - q.Pause() - q.Pause() - - // Verify no panic occurred - require.True(t, true, "Multiple Pause calls should not panic") -} - -// Test_MultipleResumeCalls verifies that calling Resume multiple times doesn't -// start duplicate scheduler goroutines. -func Test_MultipleResumeCalls(t *testing.T) { - q := queue.New("multiple_resume_test", &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 1, - RetryFailures: 0, - Pattern: "@every 1s", - }) - - // Get initial goroutine count - runtime.GC() - time.Sleep(100 * time.Millisecond) - initialGoroutines := runtime.NumGoroutine() - - // Pause once - q.Pause() - time.Sleep(50 * time.Millisecond) - - // Call Resume multiple times without Pause in between - q.Resume() - time.Sleep(50 * time.Millisecond) - q.Resume() // Should be a no-op due to schedulerRunning flag - time.Sleep(50 * time.Millisecond) - q.Resume() // Should be a no-op due to schedulerRunning flag - time.Sleep(50 * time.Millisecond) - - // Final pause to stop scheduler - q.Pause() - - // Allow time for goroutines to clean up - runtime.GC() - time.Sleep(200 * time.Millisecond) - - finalGoroutines := runtime.NumGoroutine() - - // The number of goroutines should not have increased significantly - // Only ONE scheduler goroutine should have been created despite multiple Resume calls - goroutineDiff := finalGoroutines - initialGoroutines - require.LessOrEqual(t, goroutineDiff, 2, - "Goroutine leak detected from multiple Resume calls: initial=%d, final=%d, diff=%d", - initialGoroutines, finalGoroutines, goroutineDiff) -} diff --git a/scheduler_test.go b/scheduler_test.go deleted file mode 100644 index 282243e..0000000 --- a/scheduler_test.go +++ /dev/null @@ -1,271 +0,0 @@ -package queue_test - -import ( - "sync" - "testing" - "time" - - "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/require" - "github.com/tinh-tinh/queue/v2" -) - -func Test_ScheduleJob(t *testing.T) { - schedulerQueue := queue.New("scheduler_test", &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 3, - RetryFailures: 0, - Pattern: "@every 1s", // Enable scheduler - ScheduleInterval: 1 * time.Second, - }) - - // Track processed jobs - processedJobs := make(map[string]bool) - var mu sync.Mutex - - schedulerQueue.Process(func(job *queue.Job) { - job.Process(func() error { - mu.Lock() - processedJobs[job.Id] = true - mu.Unlock() - return nil - }) - }) - - // Schedule a job to run 2 seconds from now - runAt := time.Now().Add(2 * time.Second) - err := schedulerQueue.ScheduleJob("scheduled_job_1", runAt) - require.Nil(t, err) - - // Verify job is in scheduled set - scheduledJobs, err := schedulerQueue.GetScheduledJobs() - require.Nil(t, err) - require.Equal(t, 1, len(scheduledJobs)) - require.Equal(t, "scheduled_job_1", scheduledJobs[0].JobId) - - // Wait for job to be processed (2s + 1s buffer) - time.Sleep(3 * time.Second) - - // Verify job was processed - mu.Lock() - require.True(t, processedJobs["scheduled_job_1"]) - mu.Unlock() - - // Verify job is no longer in scheduled set - scheduledJobs, err = schedulerQueue.GetScheduledJobs() - require.Nil(t, err) - require.Equal(t, 0, len(scheduledJobs)) -} - -func Test_RemoveScheduledJob(t *testing.T) { - schedulerQueue := queue.New("remove_scheduled_test", &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 3, - RetryFailures: 0, - Pattern: "@every 1s", - ScheduleInterval: 1 * time.Second, - }) - - // Schedule a job for 5 seconds from now - runAt := time.Now().Add(5 * time.Second) - err := schedulerQueue.ScheduleJob("job_to_remove", runAt) - require.Nil(t, err) - - // Verify job is scheduled - scheduledJobs, err := schedulerQueue.GetScheduledJobs() - require.Nil(t, err) - require.Equal(t, 1, len(scheduledJobs)) - - // Remove the scheduled job - err = schedulerQueue.RemoveScheduledJob("job_to_remove") - require.Nil(t, err) - - // Verify job is no longer scheduled - scheduledJobs, err = schedulerQueue.GetScheduledJobs() - require.Nil(t, err) - require.Equal(t, 0, len(scheduledJobs)) -} - -func Test_PauseScheduler(t *testing.T) { - pauseQueue := queue.New("pause_scheduler_test", &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 3, - RetryFailures: 0, - Pattern: "@every 1s", - ScheduleInterval: 1 * time.Second, - }) - - // Track processed jobs - processedJobs := make(map[string]bool) - var mu sync.Mutex - - pauseQueue.Process(func(job *queue.Job) { - job.Process(func() error { - mu.Lock() - processedJobs[job.Id] = true - mu.Unlock() - return nil - }) - }) - - // Schedule a job to run 2 seconds from now - runAt := time.Now().Add(2 * time.Second) - err := pauseQueue.ScheduleJob("paused_job", runAt) - require.Nil(t, err) - - // Pause the queue immediately - pauseQueue.Pause() - - // Wait for when the job should have been processed - time.Sleep(3 * time.Second) - - // Verify job was NOT processed (scheduler stopped) - mu.Lock() - require.False(t, processedJobs["paused_job"]) - mu.Unlock() - - // Verify job is still in scheduled set - scheduledJobs, err := pauseQueue.GetScheduledJobs() - require.Nil(t, err) - require.Equal(t, 1, len(scheduledJobs)) - require.Equal(t, "paused_job", scheduledJobs[0].JobId) -} - -func Test_ResumeScheduler(t *testing.T) { - resumeQueue := queue.New("resume_scheduler_test", &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 3, - RetryFailures: 0, - Pattern: "@every 1s", - ScheduleInterval: 1 * time.Second, - }) - - // Track processed jobs - processedJobs := make(map[string]bool) - var mu sync.Mutex - - resumeQueue.Process(func(job *queue.Job) { - job.Process(func() error { - mu.Lock() - processedJobs[job.Id] = true - mu.Unlock() - return nil - }) - }) - - // Pause the queue - resumeQueue.Pause() - - // Schedule a job to run 3 seconds from now - runAt := time.Now().Add(3 * time.Second) - err := resumeQueue.ScheduleJob("resume_job", runAt) - require.Nil(t, err) - - // Resume the queue - resumeQueue.Resume() - - // Wait for job to be processed (3s + 2s buffer) - time.Sleep(5 * time.Second) - - // Verify job was processed (scheduler restarted) - mu.Lock() - require.True(t, processedJobs["resume_job"]) - mu.Unlock() - - // Verify job is no longer in scheduled set - scheduledJobs, err := resumeQueue.GetScheduledJobs() - require.Nil(t, err) - require.Equal(t, 0, len(scheduledJobs)) -} - -func Test_PauseResumeMultipleScheduledJobs(t *testing.T) { - multiQueue := queue.New("multi_pause_resume_test", &queue.Options{ - Connect: &redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }, - Workers: 3, - RetryFailures: 0, - Pattern: "@every 1s", - ScheduleInterval: 1 * time.Second, - }) - - // Track processed jobs - processedJobs := make(map[string]bool) - var mu sync.Mutex - - multiQueue.Process(func(job *queue.Job) { - job.Process(func() error { - mu.Lock() - processedJobs[job.Id] = true - mu.Unlock() - return nil - }) - }) - - // Schedule multiple jobs - runAt1 := time.Now().Add(2 * time.Second) - runAt2 := time.Now().Add(3 * time.Second) - runAt3 := time.Now().Add(4 * time.Second) - - err := multiQueue.ScheduleJob("job1", runAt1) - require.Nil(t, err) - err = multiQueue.ScheduleJob("job2", runAt2) - require.Nil(t, err) - err = multiQueue.ScheduleJob("job3", runAt3) - require.Nil(t, err) - - // Verify all jobs are scheduled - scheduledJobs, err := multiQueue.GetScheduledJobs() - require.Nil(t, err) - require.Equal(t, 3, len(scheduledJobs)) - - // Pause after 2.5 seconds (job1 should have been processed) - time.Sleep(2500 * time.Millisecond) - multiQueue.Pause() - - // Wait a bit more - time.Sleep(2 * time.Second) - - // Verify only job1 was processed - mu.Lock() - require.True(t, processedJobs["job1"]) - require.False(t, processedJobs["job2"]) - require.False(t, processedJobs["job3"]) - mu.Unlock() - - // Resume the queue - multiQueue.Resume() - - // Wait for remaining jobs to be processed - time.Sleep(2 * time.Second) - - // Verify all jobs were eventually processed - mu.Lock() - require.True(t, processedJobs["job1"]) - require.True(t, processedJobs["job2"]) - require.True(t, processedJobs["job3"]) - mu.Unlock() - - // Verify all jobs are removed from scheduled set - scheduledJobs, err = multiQueue.GetScheduledJobs() - require.Nil(t, err) - require.Equal(t, 0, len(scheduledJobs)) -}