diff --git a/canal/canal.go b/canal/canal.go index ca0638ce3..590d43556 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -51,7 +51,7 @@ type Canal struct { includeTableRegex []*regexp.Regexp excludeTableRegex []*regexp.Regexp - delay *uint32 + delay atomic.Uint32 ctx context.Context cancel context.CancelFunc @@ -85,8 +85,6 @@ func NewCanal(cfg *Config) (*Canal, error) { } c.master = &masterInfo{logger: c.cfg.Logger} - c.delay = new(uint32) - var err error if err = c.prepareDumper(); err != nil { @@ -195,7 +193,7 @@ func (c *Canal) prepareDumper() error { } func (c *Canal) GetDelay() uint32 { - return atomic.LoadUint32(c.delay) + return c.delay.Load() } // Run will first try to dump all data from MySQL master `mysqldump`, diff --git a/canal/sync.go b/canal/sync.go index e93826663..253cdf4c0 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -2,7 +2,6 @@ package canal import ( "log/slog" - "sync/atomic" "time" "github.com/go-mysql-org/go-mysql/mysql" @@ -259,7 +258,7 @@ func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) { if now >= ev.Header.Timestamp { newDelay = now - ev.Header.Timestamp } - atomic.StoreUint32(c.delay, newDelay) + c.delay.Store(newDelay) } func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { diff --git a/replication/parser.go b/replication/parser.go index 8d235d9ae..d413eef12 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -33,7 +33,7 @@ type BinlogParser struct { timestampStringLocation *time.Location // used to start/stop processing - stopProcessing uint32 + stopProcessing atomic.Bool useDecimal bool useFloatWithTrailingZero bool @@ -54,11 +54,11 @@ func NewBinlogParser() *BinlogParser { } func (p *BinlogParser) Stop() { - atomic.StoreUint32(&p.stopProcessing, 1) + p.stopProcessing.Store(true) } func (p *BinlogParser) Resume() { - atomic.StoreUint32(&p.stopProcessing, 0) + p.stopProcessing.Store(false) } func (p *BinlogParser) Reset() { @@ -166,7 +166,7 @@ func (p *BinlogParser) parseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, } func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error { - for atomic.LoadUint32(&p.stopProcessing) != 1 { + for !p.stopProcessing.Load() { done, err := p.parseSingleEvent(r, onEvent) if err != nil { if err == errMissingTableMapEvent {