From 0fcb89ad719b983d43623798cf0968b9797335ac Mon Sep 17 00:00:00 2001 From: Ren0503 Date: Mon, 20 Oct 2025 23:26:41 +0700 Subject: [PATCH] feat: change format logger based on logger from core --- .gitignore | 3 ++- constant.go | 3 ++- job.go | 24 ++++++++++++--------- logger.go | 8 +++++++ processor_test.go | 12 +++++++++-- queue.go | 55 ++++++++++++++++++++++++++--------------------- queue_test.go | 10 +++++++-- 7 files changed, 74 insertions(+), 41 deletions(-) create mode 100644 logger.go diff --git a/.gitignore b/.gitignore index 4db1828..6f108ad 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ *.env .vscode/ *.logs -coverage/ \ No newline at end of file +coverage/ +logs/ \ No newline at end of file diff --git a/constant.go b/constant.go index b3ee8ff..93c58c3 100644 --- a/constant.go +++ b/constant.go @@ -5,7 +5,8 @@ type LoggerType string const ( LoggerDefault LoggerType = "default" LoggerInfo LoggerType = "info" + LoggerWarn LoggerType = "warn" + LoggerError LoggerType = "error" LoggerFatal LoggerType = "fatal" - LoggerPanic LoggerType = "panic" LoggerDisabled LoggerType = "disabled" ) diff --git a/job.go b/job.go index 3e81f58..3e3d84f 100644 --- a/job.go +++ b/job.go @@ -3,6 +3,7 @@ package queue import ( "context" "fmt" + "strings" "time" ) @@ -69,21 +70,24 @@ type Callback func() error func (job *Job) Process(cb Callback) { job.Status = ActiveStatus job.ProcessedOn = time.Now() - job.queue.formatLog(LoggerInfo, "Running job %s progress\n\n", job.Id) + job.queue.formatLog(LoggerInfo, "Running job %s progress", job.Id) + defer func() { if r := recover(); r != nil { failedReason := fmt.Sprintf("%v", r) job.HandlerError(failedReason) } }() + err := cb() - if err == nil { - job.FinishedOn = time.Now() - job.Status = CompletedStatus - job.queue.formatLog(LoggerInfo, "Job %s done in %dms\n\n", job.Id, job.FinishedOn.Sub(job.ProcessedOn).Milliseconds()) - } else { + if err != nil { job.HandlerError(err.Error()) + return } + + job.FinishedOn = time.Now() + job.Status = CompletedStatus + job.queue.formatLog(LoggerInfo, "Job %s done in %dms", job.Id, job.FinishedOn.Sub(job.ProcessedOn).Milliseconds()) } func (job *Job) HandlerError(reasonError string) { @@ -91,16 +95,16 @@ func (job *Job) HandlerError(reasonError string) { job.Status = FailedStatus // Store error client := job.queue.client - _, err := client.HSet(context.Background(), job.queue.Name, job.Id, job.FailedReason).Result() + _, err := client.Set(context.Background(), fmt.Sprintf("%s:%s", strings.ToLower(job.queue.Name), job.Id), job.FailedReason, 0).Result() if err != nil { - job.queue.formatLog(LoggerInfo, "Failed to store error: %s\n\n", err.Error()) + job.queue.formatLog(LoggerFatal, "Failed to store error: %s", err.Error()) } if job.RetryFailures > 0 { job.Status = DelayedStatus job.RetryFailures-- - job.queue.formatLog(LoggerInfo, "Add job %s for retry (%d remains) \n\n", job.Id, job.RetryFailures) + job.queue.formatLog(LoggerWarn, "Add job %s for retry (%d remains) ", job.Id, job.RetryFailures) } else { - job.queue.formatLog(LoggerInfo, "Failed job %s \n\n", job.Id) + job.queue.formatLog(LoggerError, "Failed job %s ", job.Id) } } diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..83542bb --- /dev/null +++ b/logger.go @@ -0,0 +1,8 @@ +package queue + +type Logger interface { + Infof(msg string, metadata ...any) + Warnf(msg string, metadata ...any) + Errorf(msg string, metadata ...any) + Fatalf(msg string, metadata ...any) +} diff --git a/processor_test.go b/processor_test.go index 2c515dd..330355f 100644 --- a/processor_test.go +++ b/processor_test.go @@ -55,7 +55,7 @@ func Test_Proccessor(t *testing.T) { return ctrl } - appMoule := func() core.Module { + appModule := func() core.Module { return core.NewModule(core.NewModuleOptions{ Imports: []core.Modules{ queue.ForRootFactory(func(ref core.RefProvider) *queue.Options { @@ -76,7 +76,7 @@ func Test_Proccessor(t *testing.T) { }) } - server := core.CreateFactory(appMoule) + server := core.CreateFactory(appModule) server.SetGlobalPrefix("api") testServer := httptest.NewServer(server.PrepareBeforeListen()) @@ -89,3 +89,11 @@ func Test_Proccessor(t *testing.T) { time.Sleep(1 * time.Second) } + +func TestNil(t *testing.T) { + appModule := core.NewModule(core.NewModuleOptions{ + Imports: []core.Modules{}, + }) + + require.Nil(t, queue.Inject(appModule, "")) +} diff --git a/queue.go b/queue.go index 992fbe8..1eaafe1 100644 --- a/queue.go +++ b/queue.go @@ -3,7 +3,6 @@ package queue import ( "context" "fmt" - "log" "slices" "sort" "strconv" @@ -15,6 +14,7 @@ import ( "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" "github.com/tinh-tinh/tinhtinh/v2/common" + "github.com/tinh-tinh/tinhtinh/v2/middleware/logger" ) type JobFnc func(job *Job) @@ -30,6 +30,7 @@ type Queue struct { cronPattern string running bool config Options + Logger Logger } type RateLimiter struct { @@ -43,7 +44,8 @@ type Options struct { RetryFailures int Limiter *RateLimiter Pattern string - Logger LoggerType + Logger Logger + DisableLog bool RemoveOnComplete bool RemoveOnFail bool Delay time.Duration @@ -62,6 +64,10 @@ type Options struct { // // The returned queue is ready to use. func New(name string, opt *Options) *Queue { + if opt == nil { + panic("store missing config") + } + client := redis.NewClient(opt.Connect) pool := goredis.NewPool(client) rs := redsync.New(pool) @@ -75,8 +81,8 @@ func New(name string, opt *Options) *Queue { config: *opt, } - if opt.Logger == "" { - queue.config.Logger = LoggerDefault + if opt.Logger == nil { + queue.config.Logger = logger.Create(logger.Options{}) } if opt.Pattern != "" { @@ -96,10 +102,10 @@ func New(name string, opt *Options) *Queue { func (q *Queue) AddJob(opt AddJobOptions) { var job *Job if q.IsLimit() { - q.formatLog(LoggerInfo, "Add job %s to delay\n", opt.Id) + q.formatLog(LoggerInfo, "Add job %s to delay", opt.Id) job = q.delayJob(opt) } else { - q.formatLog(LoggerInfo, "Add job %s to waiting\n", opt.Id) + q.formatLog(LoggerInfo, "Add job %s to waiting", opt.Id) job = q.newJob(opt) } q.jobs = append(q.jobs, *job) @@ -121,10 +127,10 @@ func (q *Queue) BulkAddJob(options []AddJobOptions) { for _, option := range options { var job *Job if q.IsLimit() { - q.formatLog(LoggerInfo, "Add job %s to delay\n", option.Id) + q.formatLog(LoggerInfo, "Add job %s to delay", option.Id) job = q.delayJob(option) } else { - q.formatLog(LoggerInfo, "Add job %s to waiting\n", option.Id) + q.formatLog(LoggerInfo, "Add job %s to waiting", option.Id) job = q.newJob(option) } q.jobs = append(q.jobs, *job) @@ -141,7 +147,7 @@ func (q *Queue) Process(jobFnc JobFnc) { if q.scheduler != nil { _, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() }) if err != nil { - q.formatLog(LoggerFatal, "failed to add job: %v\n", err) + q.formatLog(LoggerError, "failed to add job: %v", err) } q.scheduler.Start() } @@ -153,12 +159,12 @@ func (q *Queue) Process(jobFnc JobFnc) { // stored. func (q *Queue) Run() { if !q.running { - q.formatLog(LoggerInfo, "Queue is not running") + q.formatLog(LoggerWarn, "Queue is not running") return } // Lock the mutex if err := q.mutex.Lock(); err != nil { - q.formatLog(LoggerFatal, "Error when lock mutex: %v\n", err) + q.formatLog(LoggerError, "Error when lock mutex: %v", err) return } execJobs := []*Job{} @@ -192,7 +198,7 @@ func (q *Queue) Run() { defer func() { if r := recover(); r != nil { failedReason := fmt.Sprintf("%v", r) - q.formatLog(LoggerInfo, "Error when processing job: %v\n", failedReason) + q.formatLog(LoggerInfo, "Error when processing job: %v", failedReason) } }() q.jobFnc(job) @@ -206,7 +212,7 @@ func (q *Queue) Run() { select { case <-done: - q.formatLog(LoggerInfo, "All jobs done\n") + q.formatLog(LoggerInfo, "All jobs done") case <-ctx.Done(): q.MarkJobFailedTimeout(numJobs) } @@ -220,7 +226,7 @@ func (q *Queue) Run() { q.Retry() // Unlock the mutex if ok, err := q.mutex.Unlock(); !ok || err != nil { - q.formatLog(LoggerFatal, "Error when unlock mutex: %v\n", err) + q.formatLog(LoggerError, "Error when unlock mutex: %v", err) } } @@ -273,7 +279,7 @@ func (q *Queue) Retry() { select { case <-done: - q.formatLog(LoggerInfo, "All jobs done when retry\n") + q.formatLog(LoggerInfo, "All jobs done when retry") case <-ctx.Done(): q.MarkJobFailedTimeout(numJobs) } @@ -345,7 +351,7 @@ func (q *Queue) IsLimit() bool { } else { value, err := client.Incr(q.ctx, q.Name).Result() if err != nil { - q.formatLog(LoggerPanic, "Error when count redis: %v\n", err) + q.formatLog(LoggerError, "Error when count redis: %v", err) } if value == 1 { client.Expire(q.ctx, q.Name, q.config.Limiter.Duration) @@ -393,22 +399,21 @@ func (q *Queue) MarkJobFailedTimeout(numberJobs []*Job) { } func (q *Queue) formatLog(logType LoggerType, format string, v ...any) { - if q.config.Logger == LoggerDisabled { + if q.config.DisableLog { return - } else if q.config.Logger == LoggerDefault { - q.log(logType, format, v...) - } else if q.config.Logger == logType { - q.log(logType, format, v...) } + q.log(logType, format, v...) } func (q *Queue) log(logType LoggerType, format string, v ...any) { switch logType { case LoggerInfo: - log.Printf(format, v...) + q.config.Logger.Infof(format, v...) + case LoggerWarn: + q.config.Logger.Warnf(format, v...) + case LoggerError: + q.config.Logger.Errorf(format, v...) case LoggerFatal: - log.Fatalf(format, v...) - case LoggerPanic: - log.Panicf(format, v...) + q.config.Logger.Fatalf(format, v...) } } diff --git a/queue_test.go b/queue_test.go index 43775eb..e4bd69d 100644 --- a/queue_test.go +++ b/queue_test.go @@ -207,7 +207,7 @@ func TestDisableLog(t *testing.T) { }, Workers: 3, RetryFailures: 3, - Logger: queue.LoggerDisabled, + DisableLog: true, }) userQueue.Process(func(job *queue.Job) { @@ -232,7 +232,6 @@ func Test_LoggerInfo(t *testing.T) { }, Workers: 3, RetryFailures: 3, - Logger: queue.LoggerInfo, }) userQueue.Process(func(job *queue.Job) { @@ -468,3 +467,10 @@ func Test_Timeout(t *testing.T) { failedJob := userTimeoutQueue.CountJobs(queue.FailedStatus) require.Equal(t, 1, failedJob) } + +func TestPanic(t *testing.T) { + require.Panics(t, func() { + abc := queue.New("Abc", nil) + abc.Pause() + }) +}