diff --git a/executor.go b/executor.go index 3f48a1e..37fccb4 100644 --- a/executor.go +++ b/executor.go @@ -36,6 +36,8 @@ type executor struct { // used by the executor to receive a stop signal from the scheduler stopCh chan struct{} + // ensure that stop runs before the next call to start and only runs once + stopOnce *sync.Once // the timeout value when stopping stopTimeout time.Duration // used to signal that the executor has completed shutdown @@ -88,6 +90,7 @@ func (e *executor) start() { // any other uses within the executor should create a context // using the executor context as parent. e.ctx, e.cancel = context.WithCancel(context.Background()) + e.stopOnce = &sync.Once{} // the standardJobsWg tracks standardJobsWg := &waitGroupWithMutex{} @@ -131,7 +134,7 @@ func (e *executor) start() { // spin off into a goroutine to unblock the executor and // allow for processing for more work - go func() { + go func(executorCtx context.Context) { // make sure to cancel the above context per the docs // // Canceling this context releases resources associated with it, so code should // // call cancel as soon as the operations running in this Context complete. @@ -211,8 +214,7 @@ func (e *executor) start() { } } else { select { - case <-e.stopCh: - e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg) + case <-executorCtx.Done(): return default: } @@ -228,7 +230,7 @@ func (e *executor) start() { }(*j) } } - }() + }(e.ctx) case <-e.stopCh: e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg) return @@ -473,86 +475,88 @@ func (e *executor) incrementJobCounter(j internalJob, status JobStatus) { } func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) { - e.logger.Debug("gocron: stopping executor") - // we've been asked to stop. This is either because the scheduler has been told - // to stop all jobs or the scheduler has been asked to completely shutdown. - // - // cancel tells all the functions to stop their work and send in a done response - e.cancel() - - // the wait for job channels are used to report back whether we successfully waited - // for all jobs to complete or if we hit the configured timeout. - waitForJobs := make(chan struct{}, 1) - waitForSingletons := make(chan struct{}, 1) - waitForLimitMode := make(chan struct{}, 1) - - // the waiter context is used to cancel the functions waiting on jobs. - // this is done to avoid goroutine leaks. - waiterCtx, waiterCancel := context.WithCancel(context.Background()) - - // wait for standard jobs to complete - go func() { - e.logger.Debug("gocron: waiting for standard jobs to complete") + e.stopOnce.Do(func() { + e.logger.Debug("gocron: stopping executor") + // we've been asked to stop. This is either because the scheduler has been told + // to stop all jobs or the scheduler has been asked to completely shutdown. + // + // cancel tells all the functions to stop their work and send in a done response + e.cancel() + + // the wait for job channels are used to report back whether we successfully waited + // for all jobs to complete or if we hit the configured timeout. + waitForJobs := make(chan struct{}, 1) + waitForSingletons := make(chan struct{}, 1) + waitForLimitMode := make(chan struct{}, 1) + + // the waiter context is used to cancel the functions waiting on jobs. + // this is done to avoid goroutine leaks. + waiterCtx, waiterCancel := context.WithCancel(context.Background()) + + // wait for standard jobs to complete go func() { - // this is done in a separate goroutine, so we aren't - // blocked by the WaitGroup's Wait call in the event - // that the waiter context is cancelled. - // This particular goroutine could leak in the event that - // some long-running standard job doesn't complete. - standardJobsWg.Wait() - e.logger.Debug("gocron: standard jobs completed") - waitForJobs <- struct{}{} + e.logger.Debug("gocron: waiting for standard jobs to complete") + go func() { + // this is done in a separate goroutine, so we aren't + // blocked by the WaitGroup's Wait call in the event + // that the waiter context is cancelled. + // This particular goroutine could leak in the event that + // some long-running standard job doesn't complete. + standardJobsWg.Wait() + e.logger.Debug("gocron: standard jobs completed") + waitForJobs <- struct{}{} + }() + <-waiterCtx.Done() }() - <-waiterCtx.Done() - }() - // wait for per job singleton limit mode runner jobs to complete - go func() { - e.logger.Debug("gocron: waiting for singleton jobs to complete") + // wait for per job singleton limit mode runner jobs to complete go func() { - singletonJobsWg.Wait() - e.logger.Debug("gocron: singleton jobs completed") - waitForSingletons <- struct{}{} + e.logger.Debug("gocron: waiting for singleton jobs to complete") + go func() { + singletonJobsWg.Wait() + e.logger.Debug("gocron: singleton jobs completed") + waitForSingletons <- struct{}{} + }() + <-waiterCtx.Done() }() - <-waiterCtx.Done() - }() - // wait for limit mode runners to complete - go func() { - e.logger.Debug("gocron: waiting for limit mode jobs to complete") + // wait for limit mode runners to complete go func() { - limitModeJobsWg.Wait() - e.logger.Debug("gocron: limitMode jobs completed") - waitForLimitMode <- struct{}{} + e.logger.Debug("gocron: waiting for limit mode jobs to complete") + go func() { + limitModeJobsWg.Wait() + e.logger.Debug("gocron: limitMode jobs completed") + waitForLimitMode <- struct{}{} + }() + <-waiterCtx.Done() }() - <-waiterCtx.Done() - }() - // now either wait for all the jobs to complete, - // or hit the timeout. - var count int - timeout := time.Now().Add(e.stopTimeout) - for time.Now().Before(timeout) && count < 3 { - select { - case <-waitForJobs: - count++ - case <-waitForSingletons: - count++ - case <-waitForLimitMode: - count++ - default: + // now either wait for all the jobs to complete, + // or hit the timeout. + var count int + timeout := time.Now().Add(e.stopTimeout) + for time.Now().Before(timeout) && count < 3 { + select { + case <-waitForJobs: + count++ + case <-waitForSingletons: + count++ + case <-waitForLimitMode: + count++ + default: + } } - } - if count < 3 { - e.done <- ErrStopJobsTimedOut - e.logger.Debug("gocron: executor stopped - timed out") - } else { - e.done <- nil - e.logger.Debug("gocron: executor stopped") - } - waiterCancel() + if count < 3 { + e.done <- ErrStopJobsTimedOut + e.logger.Debug("gocron: executor stopped - timed out") + } else { + e.done <- nil + e.logger.Debug("gocron: executor stopped") + } + waiterCancel() - if e.limitMode != nil { - e.limitMode.started = false - } + if e.limitMode != nil { + e.limitMode.started = false + } + }) } diff --git a/scheduler.go b/scheduler.go index f2c31d4..1cbc98d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -141,7 +141,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { jobUpdateNextRuns: make(chan uuid.UUID), jobsOutCompleted: make(chan uuid.UUID), jobOutRequest: make(chan jobOutRequest, 1000), - done: make(chan error), + done: make(chan error, 1), } s := &scheduler{