11package mysql
22
33import (
4+ "context"
45 "fmt"
56 "sync"
67 "time"
@@ -32,15 +33,16 @@ type taskHandle struct {
3233
3334 runner DriverHandle
3435
36+ ctx context.Context
37+ cancelFunc context.CancelFunc
3538 waitCh chan * drivers.ExitResult
36- doneCh chan struct {}
3739 stats * common.TaskStatistics
3840
3941 driverConfig * common.MySQLDriverConfig
4042 shutdown bool
4143}
4244
43- func newDtleTaskHandle (logger g.LoggerType , cfg * drivers.TaskConfig , state drivers.TaskState , started time.Time ) * taskHandle {
45+ func newDtleTaskHandle (ctx context. Context , logger g.LoggerType , cfg * drivers.TaskConfig , state drivers.TaskState , started time.Time ) * taskHandle {
4446 h := & taskHandle {
4547 logger : logger ,
4648 stateLock : sync.RWMutex {},
@@ -50,8 +52,8 @@ func newDtleTaskHandle(logger g.LoggerType, cfg *drivers.TaskConfig, state drive
5052 completedAt : time.Time {},
5153 exitResult : nil ,
5254 waitCh : make (chan * drivers.ExitResult ),
53- doneCh : make (chan struct {}),
5455 }
56+ h .ctx , h .cancelFunc = context .WithCancel (ctx )
5557 go h .watchWaitCh ()
5658 return h
5759}
@@ -61,9 +63,9 @@ func (h *taskHandle) watchWaitCh() {
6163 case r := <- h .waitCh :
6264 h .stateLock .Lock ()
6365 h .exitResult = r
66+ h .cancelFunc ()
6467 h .stateLock .Unlock ()
65- close (h .doneCh )
66- case <- h .doneCh :
68+ case <- h .ctx .Done ():
6769 }
6870}
6971
@@ -149,7 +151,7 @@ func (h *taskHandle) run(d *Driver) {
149151 t := time .NewTimer (0 )
150152 for {
151153 select {
152- case <- h .doneCh :
154+ case <- h .ctx . Done () :
153155 if ! t .Stop () { <- t .C }
154156 return
155157 case <- t .C :
@@ -181,12 +183,12 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
181183 case common .TaskTypeSrc :
182184 if h .driverConfig .OracleConfig != nil {
183185 h .logger .Debug ("found oracle src" , "OracleConfig" , h .driverConfig .OracleConfig )
184- runner , err = extractor .NewExtractorOracle (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , d .ctx )
186+ runner , err = extractor .NewExtractorOracle (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , h .ctx )
185187 if err != nil {
186188 return nil , errors .Wrap (err , "NewExtractor" )
187189 }
188190 } else {
189- runner , err = mysql .NewExtractor (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , d .ctx )
191+ runner , err = mysql .NewExtractor (ctx , h .driverConfig , h .logger , d .storeManager , h .waitCh , h .ctx )
190192 if err != nil {
191193 return nil , errors .Wrap (err , "NewOracleExtractor" )
192194 }
@@ -196,13 +198,13 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
196198 if h .driverConfig .KafkaConfig != nil {
197199 h .logger .Debug ("found kafka" , "KafkaConfig" , h .driverConfig .KafkaConfig )
198200 runner , err = kafka .NewKafkaRunner (ctx , h .driverConfig .KafkaConfig , h .logger ,
199- d .storeManager , d .config .NatsAdvertise , h .waitCh , d .ctx )
201+ d .storeManager , d .config .NatsAdvertise , h .waitCh , h .ctx )
200202 if err != nil {
201203 return nil , errors .Wrap (err , "NewKafkaRunner" )
202204 }
203205 } else {
204206 runner , err = mysql .NewApplier (ctx , h .driverConfig , h .logger , d .storeManager ,
205- d .config .NatsAdvertise , h .waitCh , d .eventer , h .taskConfig , d .ctx )
207+ d .config .NatsAdvertise , h .waitCh , d .eventer , h .taskConfig , h .ctx )
206208 if err != nil {
207209 return nil , errors .Wrap (err , "NewApplier" )
208210 }
@@ -301,6 +303,21 @@ func (h *taskHandle) Destroy() {
301303 }
302304}
303305
306+ func (h * taskHandle ) GetExitResult () * drivers.ExitResult {
307+ h .stateLock .Lock ()
308+ defer h .stateLock .Unlock ()
309+ if h .exitResult == nil {
310+ return & drivers.ExitResult {
311+ ExitCode : 0 ,
312+ Signal : 0 ,
313+ OOMKilled : false ,
314+ Err : nil ,
315+ }
316+ } else {
317+ return h .exitResult .Copy ()
318+ }
319+ }
320+
304321type DriverHandle interface {
305322 Run ()
306323
0 commit comments