Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #835 and #837 #836

Merged
merged 2 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 79 additions & 75 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type executor struct {

// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
// ensure that stop runs before the next call to start and only runs once
stopOnce *sync.Once
// the timeout value when stopping
stopTimeout time.Duration
// used to signal that the executor has completed shutdown
Expand Down Expand Up @@ -88,6 +90,7 @@ func (e *executor) start() {
// any other uses within the executor should create a context
// using the executor context as parent.
e.ctx, e.cancel = context.WithCancel(context.Background())
e.stopOnce = &sync.Once{}

// the standardJobsWg tracks
standardJobsWg := &waitGroupWithMutex{}
Expand Down Expand Up @@ -131,7 +134,7 @@ func (e *executor) start() {

// spin off into a goroutine to unblock the executor and
// allow for processing for more work
go func() {
go func(executorCtx context.Context) {
// make sure to cancel the above context per the docs
// // Canceling this context releases resources associated with it, so code should
// // call cancel as soon as the operations running in this Context complete.
Expand Down Expand Up @@ -211,8 +214,7 @@ func (e *executor) start() {
}
} else {
select {
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
case <-executorCtx.Done():
return
default:
}
Expand All @@ -228,7 +230,7 @@ func (e *executor) start() {
}(*j)
}
}
}()
}(e.ctx)
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
return
Expand Down Expand Up @@ -473,86 +475,88 @@ func (e *executor) incrementJobCounter(j internalJob, status JobStatus) {
}

func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
e.logger.Debug("gocron: stopping executor")
// we've been asked to stop. This is either because the scheduler has been told
// to stop all jobs or the scheduler has been asked to completely shutdown.
//
// cancel tells all the functions to stop their work and send in a done response
e.cancel()

// the wait for job channels are used to report back whether we successfully waited
// for all jobs to complete or if we hit the configured timeout.
waitForJobs := make(chan struct{}, 1)
waitForSingletons := make(chan struct{}, 1)
waitForLimitMode := make(chan struct{}, 1)

// the waiter context is used to cancel the functions waiting on jobs.
// this is done to avoid goroutine leaks.
waiterCtx, waiterCancel := context.WithCancel(context.Background())

// wait for standard jobs to complete
go func() {
e.logger.Debug("gocron: waiting for standard jobs to complete")
e.stopOnce.Do(func() {
e.logger.Debug("gocron: stopping executor")
// we've been asked to stop. This is either because the scheduler has been told
// to stop all jobs or the scheduler has been asked to completely shutdown.
//
// cancel tells all the functions to stop their work and send in a done response
e.cancel()

// the wait for job channels are used to report back whether we successfully waited
// for all jobs to complete or if we hit the configured timeout.
waitForJobs := make(chan struct{}, 1)
waitForSingletons := make(chan struct{}, 1)
waitForLimitMode := make(chan struct{}, 1)

// the waiter context is used to cancel the functions waiting on jobs.
// this is done to avoid goroutine leaks.
waiterCtx, waiterCancel := context.WithCancel(context.Background())

// wait for standard jobs to complete
go func() {
// this is done in a separate goroutine, so we aren't
// blocked by the WaitGroup's Wait call in the event
// that the waiter context is cancelled.
// This particular goroutine could leak in the event that
// some long-running standard job doesn't complete.
standardJobsWg.Wait()
e.logger.Debug("gocron: standard jobs completed")
waitForJobs <- struct{}{}
e.logger.Debug("gocron: waiting for standard jobs to complete")
go func() {
// this is done in a separate goroutine, so we aren't
// blocked by the WaitGroup's Wait call in the event
// that the waiter context is cancelled.
// This particular goroutine could leak in the event that
// some long-running standard job doesn't complete.
standardJobsWg.Wait()
e.logger.Debug("gocron: standard jobs completed")
waitForJobs <- struct{}{}
}()
<-waiterCtx.Done()
}()
<-waiterCtx.Done()
}()

// wait for per job singleton limit mode runner jobs to complete
go func() {
e.logger.Debug("gocron: waiting for singleton jobs to complete")
// wait for per job singleton limit mode runner jobs to complete
go func() {
singletonJobsWg.Wait()
e.logger.Debug("gocron: singleton jobs completed")
waitForSingletons <- struct{}{}
e.logger.Debug("gocron: waiting for singleton jobs to complete")
go func() {
singletonJobsWg.Wait()
e.logger.Debug("gocron: singleton jobs completed")
waitForSingletons <- struct{}{}
}()
<-waiterCtx.Done()
}()
<-waiterCtx.Done()
}()

// wait for limit mode runners to complete
go func() {
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
// wait for limit mode runners to complete
go func() {
limitModeJobsWg.Wait()
e.logger.Debug("gocron: limitMode jobs completed")
waitForLimitMode <- struct{}{}
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
go func() {
limitModeJobsWg.Wait()
e.logger.Debug("gocron: limitMode jobs completed")
waitForLimitMode <- struct{}{}
}()
<-waiterCtx.Done()
}()
<-waiterCtx.Done()
}()

// now either wait for all the jobs to complete,
// or hit the timeout.
var count int
timeout := time.Now().Add(e.stopTimeout)
for time.Now().Before(timeout) && count < 3 {
select {
case <-waitForJobs:
count++
case <-waitForSingletons:
count++
case <-waitForLimitMode:
count++
default:
// now either wait for all the jobs to complete,
// or hit the timeout.
var count int
timeout := time.Now().Add(e.stopTimeout)
for time.Now().Before(timeout) && count < 3 {
select {
case <-waitForJobs:
count++
case <-waitForSingletons:
count++
case <-waitForLimitMode:
count++
default:
}
}
}
if count < 3 {
e.done <- ErrStopJobsTimedOut
e.logger.Debug("gocron: executor stopped - timed out")
} else {
e.done <- nil
e.logger.Debug("gocron: executor stopped")
}
waiterCancel()
if count < 3 {
e.done <- ErrStopJobsTimedOut
e.logger.Debug("gocron: executor stopped - timed out")
} else {
e.done <- nil
e.logger.Debug("gocron: executor stopped")
}
waiterCancel()

if e.limitMode != nil {
e.limitMode.started = false
}
if e.limitMode != nil {
e.limitMode.started = false
}
})
}
2 changes: 1 addition & 1 deletion scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
jobUpdateNextRuns: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
done: make(chan error, 1),
}

s := &scheduler{
Expand Down