-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunner.go
161 lines (138 loc) · 3.66 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package jobs
import (
"sync"
"time"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
// Try contains details of an attempt to run
// a job.
type Try struct {
// Runner is the name of the runner that tried to
// run this job.
Runner string `json:"runner" bson:"runner"`
// When is a timestamp of when the attempt took place.
When time.Time `json:"when" bson:"when"`
// Err is the error that was returned by JobFunc.
Err string `json:"err,omitempty" bson:"err,omitempty"`
}
// JobFunc is the function that gets called for each
// job.
type JobFunc func(job *J) error
// Runner runs jobs.
type Runner struct {
c *mgo.Collection
fn JobFunc
stop chan struct{}
Interval time.Duration
err error
stoponce sync.Once
name string
kind string
}
// Start starts the process.
func (r *Runner) Start() error {
r.stop = make(chan struct{})
r.stoponce = sync.Once{}
go func() {
var job *J
outside:
for {
iter := r.c.Find(bson.M{
"status": bson.M{"$in": []interface{}{StatusNew, StatusWaiting}},
"runat": bson.M{"$lte": time.Now()},
"kind": r.kind,
}).Sort("created").Iter()
for iter.Next(&job) {
var err error
var changeInfo *mgo.ChangeInfo
change := mgo.Change{
Update: bson.M{"$set": bson.M{"status": StatusWorking}},
ReturnNew: true,
}
if changeInfo, err = r.c.Find(bson.M{
"_id": job.ID,
"status": bson.M{"$in": []interface{}{StatusNew, StatusWaiting}},
}).Apply(change, &job); err != nil {
if err == mgo.ErrNotFound {
// skip this one - someone else is dealing with it
continue
}
r.err = err
break
}
if changeInfo.Updated != 1 {
// skip this one - someone else is dealing with it
continue
}
jobErr := r.fn(job)
// record this attempt
try := &Try{
When: time.Now(),
Runner: r.name,
}
job.Tries = append(job.Tries, try)
if jobErr != nil {
try.Err = jobErr.Error()
job.RunAt = time.Now().Add(job.RetryInterval)
job.Retries--
job.Status = StatusWaiting
if job.Retries == 0 {
job.Status = StatusFailed
}
} else {
// success
job.Status = StatusSuccess
}
if err := r.c.UpdateId(job.ID, job); err != nil {
r.err = err
break
}
}
if err := iter.Close(); err != nil {
r.err = err
}
if r.err != nil {
r.Stop()
}
select {
case <-r.stop:
// stop
break outside
case <-time.After(r.Interval):
// carry on
}
}
}()
return nil
}
// Stop stops the runner. Callers should then block on StopChan()
// to be notified of when the runner has stopped.
func (r *Runner) Stop() {
r.stoponce.Do(func() {
close(r.stop)
})
}
// StopChan is a channel that gets closed when the runner
// has stopped. Callers should block on this after calling
// Stop to ensure the runner has properly stopped.
// <-runner.StopChan()
func (r *Runner) StopChan() <-chan struct{} { return r.stop }
// Err is the last error that occurred.
func (r *Runner) Err() error { return r.err }
// Name is the name of the runner.
func (r *Runner) Name() string { return r.name }
// Kind is the name of the types of jobs this runner will process.
func (r *Runner) Kind() string { return r.kind }
// NewRunner makes a new Runner capable of running jobs. The name should be unique
// across a system. The mgo.Collection is where the job records live in MongoDB.
// The kind string should match J.Kind values for jobs that this runner should execute.
func NewRunner(name string, c *mgo.Collection, kind string, fn JobFunc) *Runner {
return &Runner{
c: c,
fn: fn,
Interval: 500 * time.Millisecond,
name: name,
kind: kind,
}
}