11package mysql
22
33import (
4- "context"
54 "fmt"
65 "sync"
76 "time"
@@ -33,12 +32,12 @@ type taskHandle struct {
3332
3433 runner DriverHandle
3534
36- ctx context.Context
37- cancelFunc context.CancelFunc
38- waitCh chan * drivers.ExitResult
39- stats * common.TaskStatistics
35+ waitCh chan * drivers.ExitResult
36+ doneCh chan struct {}
37+ stats * common.TaskStatistics
4038
4139 driverConfig * common.MySQLDriverConfig
40+ shutdown bool
4241}
4342
4443func newDtleTaskHandle (logger g.LoggerType , cfg * drivers.TaskConfig , state drivers.TaskState , started time.Time ) * taskHandle {
@@ -50,12 +49,24 @@ func newDtleTaskHandle(logger g.LoggerType, cfg *drivers.TaskConfig, state drive
5049 startedAt : started ,
5150 completedAt : time.Time {},
5251 exitResult : nil ,
53- waitCh : make (chan * drivers.ExitResult , 1 ),
52+ waitCh : make (chan * drivers.ExitResult ),
53+ doneCh : make (chan struct {}),
5454 }
55- h . ctx , h .cancelFunc = context . WithCancel ( context . TODO () )
55+ go h .watchWaitCh ( )
5656 return h
5757}
5858
59+ func (h * taskHandle ) watchWaitCh () {
60+ select {
61+ case r := <- h .waitCh :
62+ h .stateLock .Lock ()
63+ h .exitResult = r
64+ h .stateLock .Unlock ()
65+ close (h .doneCh )
66+ case <- h .doneCh :
67+ }
68+ }
69+
5970func (h * taskHandle ) TaskStatus () (* drivers.TaskStatus , error ) {
6071 h .stateLock .RLock ()
6172 defer h .stateLock .RUnlock ()
@@ -83,13 +94,14 @@ func (h *taskHandle) TaskStatus() (*drivers.TaskStatus, error) {
8394 }, nil
8495}
8596
97+ // used when h.runner has not been setup
8698func (h * taskHandle ) onError (err error ) {
87- h .waitCh <- & drivers.ExitResult {
99+ common . WriteWaitCh ( h .waitCh , & drivers.ExitResult {
88100 ExitCode : common .TaskStateDead ,
89101 Signal : 0 ,
90102 OOMKilled : false ,
91103 Err : err ,
92- }
104+ })
93105}
94106
95107func (h * taskHandle ) IsRunning () bool {
@@ -137,7 +149,8 @@ func (h *taskHandle) run(d *Driver) {
137149 t := time .NewTimer (0 )
138150 for {
139151 select {
140- case <- h .ctx .Done ():
152+ case <- h .doneCh :
153+ if ! t .Stop () { <- t .C }
141154 return
142155 case <- t .C :
143156 if h .runner != nil {
@@ -168,12 +181,12 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
168181 case common .TaskTypeSrc :
169182 if h .driverConfig .OracleConfig != nil {
170183 h .logger .Debug ("found oracle src" , "OracleConfig" , h .driverConfig .OracleConfig )
171- runner , err = extractor .NewExtractorOracle (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh )
184+ runner , err = extractor .NewExtractorOracle (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , d . ctx )
172185 if err != nil {
173186 return nil , errors .Wrap (err , "NewExtractor" )
174187 }
175188 } else {
176- runner , err = mysql .NewExtractor (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , h .ctx )
189+ runner , err = mysql .NewExtractor (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , d .ctx )
177190 if err != nil {
178191 return nil , errors .Wrap (err , "NewOracleExtractor" )
179192 }
@@ -183,13 +196,13 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
183196 if h .driverConfig .KafkaConfig != nil {
184197 h .logger .Debug ("found kafka" , "KafkaConfig" , h .driverConfig .KafkaConfig )
185198 runner , err = kafka .NewKafkaRunner (ctx , h .driverConfig .KafkaConfig , h .logger ,
186- d .storeManager , d .config .NatsAdvertise , h .waitCh , h .ctx )
199+ d .storeManager , d .config .NatsAdvertise , h .waitCh , d .ctx )
187200 if err != nil {
188201 return nil , errors .Wrap (err , "NewKafkaRunner" )
189202 }
190203 } else {
191204 runner , err = mysql .NewApplier (ctx , h .driverConfig , h .logger , d .storeManager ,
192- d .config .NatsAdvertise , h .waitCh , d .eventer , h .taskConfig , h .ctx )
205+ d .config .NatsAdvertise , h .waitCh , d .eventer , h .taskConfig , d .ctx )
193206 if err != nil {
194207 return nil , errors .Wrap (err , "NewApplier" )
195208 }
@@ -265,23 +278,35 @@ func (h *taskHandle) emitStats(ru *common.TaskStatistics) {
265278 }
266279}
267280
268- func (h * taskHandle ) Destroy () bool {
269- h .stateLock .RLock ()
270- //driver.des
271- h .cancelFunc ()
281+ func (h * taskHandle ) Destroy () {
282+ if h .shutdown {
283+ return
284+ }
285+ h .stateLock .Lock ()
286+ h .shutdown = true
287+ h .stateLock .Unlock ()
288+
289+ common .WriteWaitCh (h .waitCh , & drivers.ExitResult {
290+ ExitCode : 0 ,
291+ Signal : 0 ,
292+ OOMKilled : false ,
293+ Err : nil ,
294+ })
295+
272296 if h .runner != nil {
273297 err := h .runner .Shutdown ()
274298 if err != nil {
275299 h .logger .Error ("error in h.runner.Shutdown" , "err" , err )
276300 }
277301 }
278- return h .procState == drivers .TaskStateExited
279302}
280303
281304type DriverHandle interface {
282305 Run ()
283306
284- // Shutdown is used to stop the task
307+ // Shutdown is used to stop the task.
308+ // Do not send ExitResult in Shutdown().
309+ // pause API will call Shutdown and the task should not exit.
285310 Shutdown () error
286311
287312 // Stats returns aggregated stats of the driver
0 commit comments