Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
*.env
.vscode/
*.logs
coverage/
coverage/
logs/
3 changes: 2 additions & 1 deletion constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ type LoggerType string
const (
LoggerDefault LoggerType = "default"
LoggerInfo LoggerType = "info"
LoggerWarn LoggerType = "warn"
LoggerError LoggerType = "error"
LoggerFatal LoggerType = "fatal"
LoggerPanic LoggerType = "panic"
LoggerDisabled LoggerType = "disabled"
)
24 changes: 14 additions & 10 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"context"
"fmt"
"strings"
"time"
)

Expand Down Expand Up @@ -69,38 +70,41 @@ type Callback func() error
func (job *Job) Process(cb Callback) {
job.Status = ActiveStatus
job.ProcessedOn = time.Now()
job.queue.formatLog(LoggerInfo, "Running job %s progress\n\n", job.Id)
job.queue.formatLog(LoggerInfo, "Running job %s progress", job.Id)

defer func() {
if r := recover(); r != nil {
failedReason := fmt.Sprintf("%v", r)
job.HandlerError(failedReason)
}
}()

err := cb()
if err == nil {
job.FinishedOn = time.Now()
job.Status = CompletedStatus
job.queue.formatLog(LoggerInfo, "Job %s done in %dms\n\n", job.Id, job.FinishedOn.Sub(job.ProcessedOn).Milliseconds())
} else {
if err != nil {
job.HandlerError(err.Error())
return
}

job.FinishedOn = time.Now()
job.Status = CompletedStatus
job.queue.formatLog(LoggerInfo, "Job %s done in %dms", job.Id, job.FinishedOn.Sub(job.ProcessedOn).Milliseconds())
}

func (job *Job) HandlerError(reasonError string) {
job.FailedReason = reasonError
job.Status = FailedStatus
// Store error
client := job.queue.client
_, err := client.HSet(context.Background(), job.queue.Name, job.Id, job.FailedReason).Result()
_, err := client.Set(context.Background(), fmt.Sprintf("%s:%s", strings.ToLower(job.queue.Name), job.Id), job.FailedReason, 0).Result()
if err != nil {
job.queue.formatLog(LoggerInfo, "Failed to store error: %s\n\n", err.Error())
job.queue.formatLog(LoggerFatal, "Failed to store error: %s", err.Error())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Log level may be too severe for storage failure.

Using LoggerFatal for a Redis storage failure could be overly harsh. While the failure to persist error details is serious, it may not warrant a fatal-level log that typically implies program termination. Consider using LoggerError instead, which was suggested in a previous review.

Apply this diff to use error level:

-		job.queue.formatLog(LoggerFatal, "Failed to store error: %s", err.Error())
+		job.queue.formatLog(LoggerError, "Failed to store error: %s", err.Error())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
job.queue.formatLog(LoggerFatal, "Failed to store error: %s", err.Error())
job.queue.formatLog(LoggerError, "Failed to store error: %s", err.Error())
🤖 Prompt for AI Agents
In job.go around line 100, the log call using LoggerFatal for a Redis storage
failure should be changed to use LoggerError; replace LoggerFatal with
LoggerError in the job.queue.formatLog(...) call so the failure is logged at
error level (consistent with the prior review suggestion).

}
if job.RetryFailures > 0 {
job.Status = DelayedStatus
job.RetryFailures--
job.queue.formatLog(LoggerInfo, "Add job %s for retry (%d remains) \n\n", job.Id, job.RetryFailures)
job.queue.formatLog(LoggerWarn, "Add job %s for retry (%d remains) ", job.Id, job.RetryFailures)
} else {
job.queue.formatLog(LoggerInfo, "Failed job %s \n\n", job.Id)
job.queue.formatLog(LoggerError, "Failed job %s ", job.Id)
}
}

Expand Down
8 changes: 8 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package queue

type Logger interface {
Infof(msg string, metadata ...any)
Warnf(msg string, metadata ...any)
Errorf(msg string, metadata ...any)
Fatalf(msg string, metadata ...any)
}
12 changes: 10 additions & 2 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Test_Proccessor(t *testing.T) {
return ctrl
}

appMoule := func() core.Module {
appModule := func() core.Module {
return core.NewModule(core.NewModuleOptions{
Imports: []core.Modules{
queue.ForRootFactory(func(ref core.RefProvider) *queue.Options {
Expand All @@ -76,7 +76,7 @@ func Test_Proccessor(t *testing.T) {
})
}

server := core.CreateFactory(appMoule)
server := core.CreateFactory(appModule)
server.SetGlobalPrefix("api")

testServer := httptest.NewServer(server.PrepareBeforeListen())
Expand All @@ -89,3 +89,11 @@ func Test_Proccessor(t *testing.T) {

time.Sleep(1 * time.Second)
}

func TestNil(t *testing.T) {
appModule := core.NewModule(core.NewModuleOptions{
Imports: []core.Modules{},
})

require.Nil(t, queue.Inject(appModule, ""))
}
55 changes: 30 additions & 25 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package queue
import (
"context"
"fmt"
"log"
"slices"
"sort"
"strconv"
Expand All @@ -15,6 +14,7 @@ import (
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
"github.com/tinh-tinh/tinhtinh/v2/common"
"github.com/tinh-tinh/tinhtinh/v2/middleware/logger"
)

type JobFnc func(job *Job)
Expand All @@ -30,6 +30,7 @@ type Queue struct {
cronPattern string
running bool
config Options
Logger Logger
}

type RateLimiter struct {
Expand All @@ -43,7 +44,8 @@ type Options struct {
RetryFailures int
Limiter *RateLimiter
Pattern string
Logger LoggerType
Logger Logger
DisableLog bool
RemoveOnComplete bool
RemoveOnFail bool
Delay time.Duration
Expand All @@ -62,6 +64,10 @@ type Options struct {
//
// The returned queue is ready to use.
func New(name string, opt *Options) *Queue {
if opt == nil {
panic("store missing config")
}

client := redis.NewClient(opt.Connect)
pool := goredis.NewPool(client)
rs := redsync.New(pool)
Expand All @@ -75,8 +81,8 @@ func New(name string, opt *Options) *Queue {
config: *opt,
}

if opt.Logger == "" {
queue.config.Logger = LoggerDefault
if opt.Logger == nil {
queue.config.Logger = logger.Create(logger.Options{})
}

if opt.Pattern != "" {
Expand All @@ -96,10 +102,10 @@ func New(name string, opt *Options) *Queue {
func (q *Queue) AddJob(opt AddJobOptions) {
var job *Job
if q.IsLimit() {
q.formatLog(LoggerInfo, "Add job %s to delay\n", opt.Id)
q.formatLog(LoggerInfo, "Add job %s to delay", opt.Id)
job = q.delayJob(opt)
} else {
q.formatLog(LoggerInfo, "Add job %s to waiting\n", opt.Id)
q.formatLog(LoggerInfo, "Add job %s to waiting", opt.Id)
job = q.newJob(opt)
}
q.jobs = append(q.jobs, *job)
Expand All @@ -121,10 +127,10 @@ func (q *Queue) BulkAddJob(options []AddJobOptions) {
for _, option := range options {
var job *Job
if q.IsLimit() {
q.formatLog(LoggerInfo, "Add job %s to delay\n", option.Id)
q.formatLog(LoggerInfo, "Add job %s to delay", option.Id)
job = q.delayJob(option)
} else {
q.formatLog(LoggerInfo, "Add job %s to waiting\n", option.Id)
q.formatLog(LoggerInfo, "Add job %s to waiting", option.Id)
job = q.newJob(option)
}
q.jobs = append(q.jobs, *job)
Expand All @@ -141,7 +147,7 @@ func (q *Queue) Process(jobFnc JobFnc) {
if q.scheduler != nil {
_, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() })
if err != nil {
q.formatLog(LoggerFatal, "failed to add job: %v\n", err)
q.formatLog(LoggerError, "failed to add job: %v", err)
}
q.scheduler.Start()
}
Expand All @@ -153,12 +159,12 @@ func (q *Queue) Process(jobFnc JobFnc) {
// stored.
func (q *Queue) Run() {
if !q.running {
q.formatLog(LoggerInfo, "Queue is not running")
q.formatLog(LoggerWarn, "Queue is not running")
return
}
// Lock the mutex
if err := q.mutex.Lock(); err != nil {
q.formatLog(LoggerFatal, "Error when lock mutex: %v\n", err)
q.formatLog(LoggerError, "Error when lock mutex: %v", err)
return
}
execJobs := []*Job{}
Expand Down Expand Up @@ -192,7 +198,7 @@ func (q *Queue) Run() {
defer func() {
if r := recover(); r != nil {
failedReason := fmt.Sprintf("%v", r)
q.formatLog(LoggerInfo, "Error when processing job: %v\n", failedReason)
q.formatLog(LoggerInfo, "Error when processing job: %v", failedReason)
}
}()
Comment on lines 199 to 203

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Recover path does not mark job failed; severity should be error.

Jobs that panic before entering job.Process remain stuck as “ready”. Mark them failed and record the reason.

-                defer func() {
-                    if r := recover(); r != nil {
-                        failedReason := fmt.Sprintf("%v", r)
-                        q.formatLog(LoggerInfo, "Error when processing job: %v", failedReason)
-                    }
-                }()
+                defer func() {
+                    if r := recover(); r != nil {
+                        failedReason := fmt.Sprintf("%v", r)
+                        q.formatLog(LoggerError, "Error when processing job: %v", failedReason)
+                        job.HandlerError(failedReason)
+                    }
+                }()

q.jobFnc(job)
Expand All @@ -206,7 +212,7 @@ func (q *Queue) Run() {

select {
case <-done:
q.formatLog(LoggerInfo, "All jobs done\n")
q.formatLog(LoggerInfo, "All jobs done")
case <-ctx.Done():
q.MarkJobFailedTimeout(numJobs)
}
Comment on lines 213 to 218

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Call cancel() after select to release resources.

Add cancel() in both cases.

-        case <-done:
-            q.formatLog(LoggerInfo, "All jobs done")
+        case <-done:
+            q.formatLog(LoggerInfo, "All jobs done")
+            cancel()
         case <-ctx.Done():
             q.MarkJobFailedTimeout(numJobs)
+            cancel()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
select {
case <-done:
q.formatLog(LoggerInfo, "All jobs done\n")
q.formatLog(LoggerInfo, "All jobs done")
case <-ctx.Done():
q.MarkJobFailedTimeout(numJobs)
}
select {
case <-done:
q.formatLog(LoggerInfo, "All jobs done")
cancel()
case <-ctx.Done():
q.MarkJobFailedTimeout(numJobs)
cancel()
}
🤖 Prompt for AI Agents
In queue.go around lines 213 to 218, the select branches that handle <-done and
<-ctx.Done() currently don't call cancel(), leaking the cancel function's
resources; update both branches to call cancel() after q.formatLog(LoggerInfo,
"All jobs done") and after q.MarkJobFailedTimeout(numJobs) respectively so the
context cancel function is always invoked and resources are released.

Expand All @@ -220,7 +226,7 @@ func (q *Queue) Run() {
q.Retry()
// Unlock the mutex
if ok, err := q.mutex.Unlock(); !ok || err != nil {
q.formatLog(LoggerFatal, "Error when unlock mutex: %v\n", err)
q.formatLog(LoggerError, "Error when unlock mutex: %v", err)
}
}

Expand Down Expand Up @@ -273,7 +279,7 @@ func (q *Queue) Retry() {

select {
case <-done:
q.formatLog(LoggerInfo, "All jobs done when retry\n")
q.formatLog(LoggerInfo, "All jobs done when retry")
case <-ctx.Done():
q.MarkJobFailedTimeout(numJobs)
}
Expand Down Expand Up @@ -345,7 +351,7 @@ func (q *Queue) IsLimit() bool {
} else {
value, err := client.Incr(q.ctx, q.Name).Result()
if err != nil {
q.formatLog(LoggerPanic, "Error when count redis: %v\n", err)
q.formatLog(LoggerError, "Error when count redis: %v", err)
}
if value == 1 {
client.Expire(q.ctx, q.Name, q.config.Limiter.Duration)
Expand Down Expand Up @@ -393,22 +399,21 @@ func (q *Queue) MarkJobFailedTimeout(numberJobs []*Job) {
}

func (q *Queue) formatLog(logType LoggerType, format string, v ...any) {
if q.config.Logger == LoggerDisabled {
if q.config.DisableLog {
return
} else if q.config.Logger == LoggerDefault {
q.log(logType, format, v...)
} else if q.config.Logger == logType {
q.log(logType, format, v...)
}
q.log(logType, format, v...)
}

func (q *Queue) log(logType LoggerType, format string, v ...any) {
switch logType {
case LoggerInfo:
log.Printf(format, v...)
q.config.Logger.Infof(format, v...)
case LoggerWarn:
q.config.Logger.Warnf(format, v...)
case LoggerError:
q.config.Logger.Errorf(format, v...)
case LoggerFatal:
log.Fatalf(format, v...)
case LoggerPanic:
log.Panicf(format, v...)
q.config.Logger.Fatalf(format, v...)
}
}
10 changes: 8 additions & 2 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestDisableLog(t *testing.T) {
},
Workers: 3,
RetryFailures: 3,
Logger: queue.LoggerDisabled,
DisableLog: true,
})

userQueue.Process(func(job *queue.Job) {
Expand All @@ -232,7 +232,6 @@ func Test_LoggerInfo(t *testing.T) {
},
Workers: 3,
RetryFailures: 3,
Logger: queue.LoggerInfo,
})

userQueue.Process(func(job *queue.Job) {
Expand Down Expand Up @@ -468,3 +467,10 @@ func Test_Timeout(t *testing.T) {
failedJob := userTimeoutQueue.CountJobs(queue.FailedStatus)
require.Equal(t, 1, failedJob)
}

func TestPanic(t *testing.T) {
require.Panics(t, func() {
abc := queue.New("Abc", nil)
abc.Pause()
})
}