From aa59fea8958c403974906c69b273788fecd545ea Mon Sep 17 00:00:00 2001 From: John Roesler Date: Wed, 17 Jan 2024 11:31:43 -0600 Subject: [PATCH] wait for new job to be fully created before returning --- example_test.go | 6 +----- scheduler.go | 30 ++++++++++++++++++++++++------ scheduler_test.go | 11 +---------- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/example_test.go b/example_test.go index a704c399..ed77a330 100644 --- a/example_test.go +++ b/example_test.go @@ -367,8 +367,6 @@ func ExampleScheduler_removeByTags() { ) fmt.Println(len(s.Jobs())) - time.Sleep(20 * time.Millisecond) - s.RemoveByTags("tag1", "tag2") fmt.Println(len(s.Jobs())) @@ -391,7 +389,6 @@ func ExampleScheduler_removeJob() { ) fmt.Println(len(s.Jobs())) - time.Sleep(20 * time.Millisecond) _ = s.RemoveJob(j.ID()) @@ -664,8 +661,8 @@ func ExampleWithLimitedRuns() { s.Start() time.Sleep(100 * time.Millisecond) - fmt.Printf("no jobs in scheduler: %v\n", s.Jobs()) _ = s.StopJobs() + fmt.Printf("no jobs in scheduler: %v\n", s.Jobs()) // Output: // one, 2 // no jobs in scheduler: [] @@ -748,7 +745,6 @@ func ExampleWithStartAt() { ), ) s.Start() - time.Sleep(20 * time.Millisecond) next, _ := j.NextRun() fmt.Println(next) diff --git a/scheduler.go b/scheduler.go index eebb221e..7e734047 100644 --- a/scheduler.go +++ b/scheduler.go @@ -70,11 +70,17 @@ type scheduler struct { allJobsOutRequest chan allJobsOutRequest jobOutRequestCh chan jobOutRequest runJobRequestCh chan runJobRequest - newJobCh chan internalJob + newJobCh chan newJobIn removeJobCh chan uuid.UUID removeJobsByTagsCh chan []string } +type newJobIn struct { + ctx context.Context + cancel context.CancelFunc + job internalJob +} + type jobOutRequest struct { id uuid.UUID outChan chan internalJob @@ -118,7 +124,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { clock: clockwork.NewRealClock(), logger: &noOpLogger{}, - newJobCh: make(chan internalJob), + newJobCh: make(chan newJobIn), removeJobCh: make(chan uuid.UUID), removeJobsByTagsCh: make(chan []string), startCh: make(chan struct{}), @@ -144,8 +150,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { case id := <-s.exec.jobIDsOut: s.selectExecJobIDsOut(id) - case j := <-s.newJobCh: - s.selectNewJob(j) + case in := <-s.newJobCh: + s.selectNewJob(in) case id := <-s.removeJobCh: s.selectRemoveJob(id) @@ -346,7 +352,8 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) { close(out.outChan) } -func (s *scheduler) selectNewJob(j internalJob) { +func (s *scheduler) selectNewJob(in newJobIn) { + j := in.job if s.started { next := j.startTime if j.startImmediately { @@ -378,6 +385,7 @@ func (s *scheduler) selectNewJob(j internalJob) { } s.jobs[j.id] = j + in.cancel() } func (s *scheduler) selectRemoveJobsByTags(tags []string) { @@ -548,9 +556,19 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW return nil, err } + newJobCtx, newJobCancel := context.WithCancel(context.Background()) + select { + case <-s.shutdownCtx.Done(): + case s.newJobCh <- newJobIn{ + ctx: newJobCtx, + cancel: newJobCancel, + job: j, + }: + } + select { + case <-newJobCtx.Done(): case <-s.shutdownCtx.Done(): - case s.newJobCh <- j: } return &job{ diff --git a/scheduler_test.go b/scheduler_test.go index 3df33bb3..48ef1d11 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -301,9 +301,7 @@ func TestScheduler_StopTimeout(t *testing.T) { require.NoError(t, err) s.Start() - time.Sleep(time.Millisecond * 200) - err = s.Shutdown() - assert.ErrorIs(t, err, ErrStopJobsTimedOut) + assert.ErrorIs(t, err, s.Shutdown()) cancel() time.Sleep(2 * time.Second) }) @@ -332,15 +330,11 @@ func TestScheduler_Shutdown(t *testing.T) { require.NoError(t, err) s.Start() - time.Sleep(50 * time.Millisecond) require.NoError(t, s.StopJobs()) - time.Sleep(200 * time.Millisecond) s.Start() - time.Sleep(50 * time.Millisecond) require.NoError(t, s.Shutdown()) - time.Sleep(200 * time.Millisecond) }) t.Run("calling Job methods after shutdown errors", func(t *testing.T) { @@ -361,7 +355,6 @@ func TestScheduler_Shutdown(t *testing.T) { require.NoError(t, err) s.Start() - time.Sleep(50 * time.Millisecond) require.NoError(t, s.Shutdown()) _, err = j.LastRun() @@ -465,7 +458,6 @@ func TestScheduler_NewJob(t *testing.T) { s.Start() require.NoError(t, s.Shutdown()) - time.Sleep(50 * time.Millisecond) }) } } @@ -1303,7 +1295,6 @@ func TestScheduler_RemoveJob(t *testing.T) { id = uuid.New() } - time.Sleep(50 * time.Millisecond) err := s.RemoveJob(id) assert.ErrorIs(t, err, err) require.NoError(t, s.Shutdown())