Skip to content

Commit

Permalink
Added Distributed Locker to JobOptions (#711)
Browse files Browse the repository at this point in the history
  • Loading branch information
0x01F4 authored Apr 22, 2024
1 parent 3b653b9 commit 3faf525
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 15 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ other instances checking to see if a new leader needs to be elected.
(don't see what you need? request on slack to get a repo created to contribute it!)
- [**Locker**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedLocker):
A locker can be used to lock each run of a job to a single instance of gocron.
Locker can be at job or scheduler, if it is defined both at job and scheduler then locker of job will take precedence.
- Implementations: [go-co-op lockers](https://github.com/go-co-op?q=-lock&type=all&language=&sort=)
(don't see what you need? request on slack to get a repo created to contribute it!)

Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil")
ErrWithDistributedElectorNil = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil")
ErrWithDistributedLockerNil = fmt.Errorf("gocron: WithDistributedLocker: locker must not be nil")
ErrWithDistributedJobLockerNil = fmt.Errorf("gocron: WithDistributedJobLocker: locker must not be nil")
ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0")
ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil")
ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil")
Expand Down
7 changes: 7 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.sendOutForRescheduling(&jIn)
return
}
} else if j.locker != nil {
lock, err := j.locker.Lock(j.ctx, j.name)
if err != nil {
e.sendOutForRescheduling(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type internalJob struct {
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error)

locker Locker
}

// stop is used to stop the job's timer and cancel the context
Expand Down Expand Up @@ -485,6 +487,19 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition {
// JobOption defines the constructor for job options.
type JobOption func(*internalJob) error

// WithDistributedJobLocker sets the locker to be used by multiple
// Scheduler instances to ensure that only one instance of each
// job is run.
func WithDistributedJobLocker(locker Locker) JobOption {
return func(j *internalJob) error {
if locker == nil {
return ErrWithDistributedJobLockerNil
}
j.locker = locker
return nil
}
}

// WithEventListeners sets the event listeners that should be
// run for the job.
func WithEventListeners(eventListeners ...EventListener) JobOption {
Expand Down
66 changes: 51 additions & 15 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,14 @@ func TestScheduler_NewJobErrors(t *testing.T) {
nil,
ErrOneTimeJobStartDateTimePast,
},
{
"WithDistributedJobLocker is nil",
DurationJob(
time.Second,
),
[]JobOption{WithDistributedJobLocker(nil)},
ErrWithDistributedJobLockerNil,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -1199,17 +1207,19 @@ func TestScheduler_WithDistributed(t *testing.T) {

goleak.VerifyNone(t)
tests := []struct {
name string
count int
opt SchedulerOption
assertions func(*testing.T)
name string
count int
schedulerOpts []SchedulerOption
jobOpts []JobOption
assertions func(*testing.T)
}{
{
"3 schedulers with elector",
3,
WithDistributedElector(&testElector{
notLeader: notLeader,
}),
[]SchedulerOption{
WithDistributedElector(&testElector{notLeader: notLeader}),
},
nil,
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLeaderCount int
Expand All @@ -1229,9 +1239,32 @@ func TestScheduler_WithDistributed(t *testing.T) {
{
"3 schedulers with locker",
3,
WithDistributedLocker(&testLocker{
notLocked: notLocked,
}),
[]SchedulerOption{
WithDistributedLocker(&testLocker{notLocked: notLocked}),
},
nil,
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLockedCount int
for {
if time.Now().After(timeout) {
break
}
select {
case <-notLocked:
notLockedCount++
default:
}
}
},
},
{
"3 schedulers and job with Distributed locker",
3,
nil,
[]JobOption{
WithDistributedJobLocker(&testLocker{notLocked: notLocked}),
},
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLockedCount int
Expand All @@ -1257,12 +1290,17 @@ func TestScheduler_WithDistributed(t *testing.T) {

for i := tt.count; i > 0; i-- {
s := newTestScheduler(t,
tt.opt,
tt.schedulerOpts...,
)
jobOpts := []JobOption{
WithStartAt(
WithStartImmediately(),
),
}
jobOpts = append(jobOpts, tt.jobOpts...)

go func() {
s.Start()

_, err := s.NewJob(
DurationJob(
time.Second,
Expand All @@ -1273,9 +1311,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
jobsRan <- struct{}{}
},
),
WithStartAt(
WithStartImmediately(),
),
jobOpts...,
)
require.NoError(t, err)

Expand Down

0 comments on commit 3faf525

Please sign in to comment.