-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtaskqueue.go
193 lines (160 loc) · 3.84 KB
/
taskqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package taskqueue
import (
"context"
"crypto/rand"
"encoding/json"
"errors"
"time"
"github.com/oklog/ulid/v2"
)
const DefaultNameSpace = "taskqueue"
type QueueError int
const (
ErrUnknown QueueError = iota
ErrQueueNotFound
ErrQueueEmpty
)
func (err QueueError) Error() string {
return [...]string{
ErrUnknown: "unknown error occurred",
ErrQueueNotFound: "queue not found",
ErrQueueEmpty: "Queue is empty",
}[err]
}
type JobStatus int8
const (
JobStatusWaiting JobStatus = iota + 1
JobStatusActive
JobStatusCompleted
JobStatusFailed
JobStatusDead
)
func (j JobStatus) String() string {
return []string{
JobStatusWaiting: "Waiting",
JobStatusActive: "Active",
JobStatusCompleted: "Completed",
JobStatusFailed: "Failed",
JobStatusDead: "Dead",
}[j]
}
var ErrInvalidJobStatus = errors.New("invalid job status")
func ParseJobStatus(text string) (JobStatus, error) {
switch text {
case "Waiting":
return JobStatusWaiting, nil
case "Active":
return JobStatusActive, nil
case "Completed":
return JobStatusCompleted, nil
case "Failed":
return JobStatusFailed, nil
case "Dead":
return JobStatusDead, nil
default:
return -1, ErrInvalidJobStatus
}
}
type Job struct {
ID string
QueueName string
Payload []byte
CreatedAt time.Time
StartedAt time.Time
UpdatedAt time.Time
Attempts int
FailureReason string
Status JobStatus
ProcessedBy string
}
func NewJob() *Job {
return &Job{
ID: ulid.MustNew(ulid.Now(), rand.Reader).String(),
Status: JobStatusWaiting,
CreatedAt: time.Now(),
}
}
func (j *Job) JSONMarshalPayload(v any) (err error) {
j.Payload, err = json.Marshal(v)
return
}
func (j *Job) JSONUnMarshalPayload(v any) error {
return json.Unmarshal(j.Payload, v)
}
type EnqueueOptions struct {
QueueName string
Delay time.Duration
}
type DequeueOptions struct {
QueueName string
JobTimeout time.Duration
}
type Enqueuer interface {
Enqueue(ctx context.Context, job *Job, opts *EnqueueOptions) error
}
type Dequeuer interface {
Dequeue(ctx context.Context, opts *DequeueOptions, count int) ([]*Job, error)
}
type DequeueFunc func(ctx context.Context, opts *DequeueOptions, count int) ([]*Job, error)
func (f DequeueFunc) Dequeue(ctx context.Context, opts *DequeueOptions, count int) ([]*Job, error) {
return f(ctx, opts, count)
}
type AckOptions struct {
QueueName string
}
type NackOptions struct {
QueueName string
RetryAfter time.Duration
ShouldRetry bool
}
type Acker interface {
Ack(ctx context.Context, job *Job, opts *AckOptions) error
Nack(ctx context.Context, job *Job, opts *NackOptions) error
}
type QueueStatus int
const (
QueueStatusUnknown QueueStatus = iota
QueueStatusPaused
QueueStatusRunning
)
func (s QueueStatus) String() string {
return [...]string{
QueueStatusUnknown: "unknown",
QueueStatusPaused: "Paused",
QueueStatusRunning: "Running",
}[s]
}
type QueueDetails struct {
NameSpace string
Name string
JobCount int
Status QueueStatus
Pagination Pagination
Jobs []*Job
}
type Pagination struct {
Page int
Rows int
}
type QueueInfo struct {
NameSpace string
Name string
JobCount int
Status QueueStatus
}
type QueueManager interface {
DeleteJobFromDeadQueue(ctx context.Context, queueName string, jobID string) error
PausePendingQueue(ctx context.Context, queueName string) error
ResumePendingQueue(ctx context.Context, queueName string) error
ListPendingQueues(ctx context.Context) ([]*QueueInfo, error)
ListDeadQueues(ctx context.Context) ([]*QueueInfo, error)
PagePendingQueue(ctx context.Context, queueName string, p Pagination) (*QueueDetails, error)
PageDeadQueue(ctx context.Context, queueName string, p Pagination) (*QueueDetails, error)
}
type Queue interface {
Enqueuer
Dequeuer
Acker
QueueManager
}
var ErrJobNotFound = errors.New("job not found")