Skip to content

Commit

Permalink
Fix #837
Browse files Browse the repository at this point in the history
  • Loading branch information
apocelipes committed Feb 26, 2025
1 parent d719317 commit c6808ca
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type executor struct {

// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
// ensure that stop runs before the next call to start and only runs once
stopOnce *sync.Once
// the timeout value when stopping
stopTimeout time.Duration
// used to signal that the executor has completed shutdown
Expand Down Expand Up @@ -88,6 +90,7 @@ func (e *executor) start() {
// any other uses within the executor should create a context
// using the executor context as parent.
e.ctx, e.cancel = context.WithCancel(context.Background())
e.stopOnce = &sync.Once{}

// the standardJobsWg tracks
standardJobsWg := &waitGroupWithMutex{}
Expand All @@ -111,7 +114,9 @@ func (e *executor) start() {
case jIn := <-e.jobsIn:
select {
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
e.stopOnce.Do(func() {
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
})
return
default:
}
Expand All @@ -131,7 +136,7 @@ func (e *executor) start() {

// spin off into a goroutine to unblock the executor and
// allow for processing for more work
go func() {
go func(executorCtx context.Context) {
// make sure to cancel the above context per the docs
// // Canceling this context releases resources associated with it, so code should
// // call cancel as soon as the operations running in this Context complete.
Expand Down Expand Up @@ -211,8 +216,7 @@ func (e *executor) start() {
}
} else {
select {
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
case <-executorCtx.Done():
return
default:
}
Expand All @@ -228,9 +232,11 @@ func (e *executor) start() {
}(*j)
}
}
}()
}(e.ctx)
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
e.stopOnce.Do(func() {
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
})
return
}
}
Expand Down

0 comments on commit c6808ca

Please sign in to comment.