Skip to content

Commit 2d23a06

Browse files
author
ffffwh
committed
Merge branch 'master' into 4.23.04.x
2 parents af3b84f + aaf9459 commit 2d23a06

File tree

6 files changed

+145
-63
lines changed

6 files changed

+145
-63
lines changed

api/handler/v2/job.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,16 @@ import (
1111
"strings"
1212
"time"
1313

14-
mysql "github.com/actiontech/dtle/driver/mysql"
15-
14+
nomadApi "github.com/hashicorp/nomad/api"
1615
"github.com/hashicorp/nomad/nomad/structs"
16+
"github.com/labstack/echo/v4"
1717

18-
"github.com/actiontech/dtle/driver/kafka"
19-
18+
"github.com/actiontech/dtle/api/handler"
19+
"github.com/actiontech/dtle/api/models"
2020
"github.com/actiontech/dtle/driver/common"
21-
21+
"github.com/actiontech/dtle/driver/kafka"
22+
mysql "github.com/actiontech/dtle/driver/mysql"
2223
"github.com/actiontech/dtle/g"
23-
24-
"github.com/actiontech/dtle/api/models"
25-
26-
"github.com/actiontech/dtle/api/handler"
27-
nomadApi "github.com/hashicorp/nomad/api"
28-
"github.com/labstack/echo/v4"
2924
)
3025

3126
// @Id MigrationJobListV2
@@ -1955,6 +1950,11 @@ func ReverseJobV2(c echo.Context, filterJobType DtleJobType) error {
19551950
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(err))
19561951
}
19571952

1953+
if originalJob.BasicTaskProfile.Configuration.SrcConfig.MysqlSrcTaskConfig.TwoWaySync {
1954+
return c.JSON(http.StatusConflict, models.BuildBaseResp(fmt.Errorf(
1955+
"job_id=%v; job can't be reversed with TwoWaySync = true", reqParam.JobId)))
1956+
}
1957+
19581958
reverseJobParam := new(models.CreateOrUpdateMysqlToMysqlJobParamV2)
19591959
reverseJobParam.JobId = fmt.Sprintf("%s-%s", "reverse", consulJobItem.JobId)
19601960
reverseJobParam.TaskStepName = mysql.JobIncrCopy
@@ -1982,6 +1982,15 @@ func ReverseJobV2(c echo.Context, filterJobType DtleJobType) error {
19821982
TwoWaySyncGtid: originalJob.BasicTaskProfile.Configuration.SrcConfig.MysqlSrcTaskConfig.TwoWaySyncGtid,
19831983
},
19841984
}
1985+
for _, dbItem := range reverseJobParam.SrcTask.ReplicateDoDb {
1986+
for _, tbItem := range dbItem.Tables {
1987+
if len(tbItem.ColumnMapFrom) > 0 && len(tbItem.ColumnMapTo) == 0 {
1988+
return c.JSON(http.StatusConflict, models.BuildBaseResp(fmt.Errorf(
1989+
"job_id=%v; job can't be reversed with ColumnMapFrom not matching ColumnMapTo", reqParam.JobId)))
1990+
}
1991+
tbItem.ColumnMapFrom, tbItem.ColumnMapTo = tbItem.ColumnMapTo, tbItem.ColumnMapFrom
1992+
}
1993+
}
19851994
reverseJobParam.DestTask = &models.DestTaskConfig{
19861995
TaskName: common.TaskTypeDest,
19871996
ConnectionConfig: &originalJob.BasicTaskProfile.ConnectionInfo.SrcDataBase,

driver/mysql/applier.go

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"bytes"
1111
gosql "database/sql"
1212
"fmt"
13+
mysqldriver "github.com/go-sql-driver/mysql"
1314
"math"
1415
"strconv"
1516
"strings"
@@ -304,9 +305,23 @@ func (a *Applier) Run() {
304305
Subject: a.subject + "_dtrev",
305306
StateDir: a.stateDir,
306307
}
307-
var cfg2 = *a.mysqlContext
308+
// cfg2 will be a deepcopy of a.mysqlContext
309+
cfg2, err := a.storeManager.GetConfig(a.subject)
310+
if err != nil {
311+
a.onError(common.TaskStateDead, errors.Wrap(err, "GetConfig"))
312+
return
313+
}
308314
cfg2.SrcConnectionConfig = a.mysqlContext.DestConnectionConfig
309315
cfg2.DestConnectionConfig = a.mysqlContext.SrcConnectionConfig
316+
for _, dbItem := range cfg2.ReplicateDoDb {
317+
for _, tbItem := range dbItem.Tables {
318+
if len(tbItem.ColumnMapFrom) > 0 && len(tbItem.ColumnMapTo) == 0 {
319+
a.onError(common.TaskStateDead, errors.Wrap(err, "GetConfig"))
320+
return
321+
}
322+
tbItem.ColumnMapFrom, tbItem.ColumnMapTo = tbItem.ColumnMapTo, tbItem.ColumnMapFrom
323+
}
324+
}
310325

311326
if strings.ToLower(a.mysqlContext.TwoWaySyncGtid) == "auto" {
312327
cfg2.AutoGtid = true
@@ -318,7 +333,7 @@ func (a *Applier) Run() {
318333
cfg2.TwoWaySync = false
319334
cfg2.TwoWaySyncGtid = ""
320335

321-
a.revExtractor, err = NewExtractor(execCtx2, &cfg2, a.logger, a.storeManager, a.waitCh, a.ctx)
336+
a.revExtractor, err = NewExtractor(execCtx2, cfg2, a.logger, a.storeManager, a.waitCh, a.ctx)
322337
if err != nil {
323338
a.onError(common.TaskStateDead, errors.Wrap(err, "reversed Extractor"))
324339
return
@@ -422,7 +437,7 @@ func (a *Applier) doFullCopy() {
422437
return
423438
case copyRows := <-a.dumpEntryQueue:
424439
//time.Sleep(20 * time.Second) // #348 stub
425-
if err = a.ApplyEventQueries(a.db, copyRows); err != nil {
440+
if err = a.ApplyEventQueries(copyRows); err != nil {
426441
return
427442
}
428443
atomic.AddInt64(a.memory1, -int64(copyRows.Size()))
@@ -825,7 +840,7 @@ func (a *Applier) ValidateGrants() error {
825840
return fmt.Errorf("user has insufficient privileges for applier. Needed:ALTER, CREATE, DROP, INDEX, REFERENCES, INSERT, DELETE, UPDATE, SELECT, TRIGGER ON *.*")
826841
}
827842

828-
func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *common.DumpEntry) (err error) {
843+
func (a *Applier) ApplyEventQueries(entry *common.DumpEntry) (err error) {
829844
a.logger.Debug("ApplyEventQueries", "schema", entry.TableSchema, "table", entry.TableName,
830845
"rows", len(entry.ValuesX))
831846

@@ -879,29 +894,18 @@ func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *common.DumpEntry) (err
879894

880895
queries = append(queries, entry.SqlMode, entry.DbSQL)
881896
queries = append(queries, entry.TbSQL...)
882-
tx, err := db.BeginTx(a.ctx, &gosql.TxOptions{})
883-
if err != nil {
884-
return err
885-
}
897+
898+
conn := a.dbs[0].Db
886899
nRows := int64(len(entry.ValuesX))
887-
defer func() {
888-
if err != nil {
889-
return
890-
}
891-
err = tx.Commit()
892-
if err == nil {
893-
atomic.AddInt64(&a.TotalRowsReplayed, nRows)
894-
}
895-
}()
896-
if _, err := tx.ExecContext(a.ctx, querySetFKChecksOff); err != nil {
900+
if _, err := conn.ExecContext(a.ctx, querySetFKChecksOff); err != nil {
897901
return err
898902
}
899903
execQuery := func(query string) error {
900904
a.logger.Debug("ApplyEventQueries. exec", "query", g.StrLim(query, 256))
901-
_, err := tx.ExecContext(a.ctx, query)
905+
_, err := conn.ExecContext(a.ctx, query)
902906
if err != nil {
903907
queryStart := g.StrLim(query, 10) // avoid printing sensitive information
904-
errCtx := errors.Wrapf(err, "tx.Exec. queryStart %v seq", queryStart)
908+
errCtx := errors.Wrapf(err, "tx.Exec. queryStart %v", queryStart)
905909
if !sql.IgnoreError(err) {
906910
a.logger.Error("ApplyEventQueries. exec error", "err", errCtx)
907911
return errCtx
@@ -958,13 +962,22 @@ func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *common.DumpEntry) (err
958962
// last rows or sql too large
959963

960964
if needInsert {
961-
err := execQuery(buf.String())
962-
buf.Reset()
963-
if err != nil {
964-
return err
965+
for iTry := 0; ; iTry++ {
966+
err := execQuery(buf.String())
967+
if errIsMysqlDeadlock(err) && iTry < a.mysqlContext.RetryTxLimit {
968+
a.logger.Info("found deadlock. will retry tx", "schema", entry.TableSchema, "table", entry.Table,
969+
"iTry", iTry)
970+
time.Sleep(retryTxDelay)
971+
continue
972+
} else if err != nil {
973+
return err
974+
}
975+
break
965976
}
977+
buf.Reset()
966978
}
967979
}
980+
atomic.AddInt64(&a.TotalRowsReplayed, nRows)
968981

969982
return nil
970983
}
@@ -1214,3 +1227,15 @@ func (a *Applier) updateDumpProgressLoop() {
12141227
time.Sleep(time.Duration(interval) * time.Second)
12151228
}
12161229
}
1230+
func errIsMysqlDeadlock(err error) bool {
1231+
if err != nil {
1232+
merr, isME := err.(*mysqldriver.MySQLError)
1233+
if isME {
1234+
return merr.Number ==sql.ErrLockDeadlock
1235+
} else {
1236+
return false
1237+
}
1238+
} else {
1239+
return false
1240+
}
1241+
}

driver/mysql/applier_gtid_executed.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,9 @@ func (a *GtidExecutedCreater) createTableGtidExecutedV4() error {
246246
}
247247

248248
func (a *ApplierIncr) cleanGtidExecuted(sid uuid.UUID, txSid string) error {
249-
a.logger.Debug("incr. cleanup before WaitForExecution")
250-
if !a.mtsManager.WaitForAllCommitted() {
249+
if !a.mtsManager.WaitForAllCommitted(a.logger.With("txSid", txSid)) {
251250
return nil // shutdown
252251
}
253-
a.logger.Debug("incr. cleanup after WaitForExecution")
254252

255253
intervalStr := func() string {
256254
a.gtidSetLock.RLock()

driver/mysql/applier_incr.go

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
gomysql "github.com/go-mysql-org/go-mysql/mysql"
1919
"github.com/pkg/errors"
2020
uuid "github.com/satori/go.uuid"
21-
mysqldriver "github.com/go-sql-driver/mysql"
2221
)
2322

2423
const (
@@ -236,19 +235,16 @@ func (a *ApplierIncr) MtsWorker(workerIndex int) {
236235
logger.Debug("a binlogEntry MTS dequeue", "gno", entryContext.Entry.Coordinates.GetGNO())
237236
for iTry := 0; ; iTry++ {
238237
err := a.ApplyBinlogEvent(workerIndex, entryContext)
239-
if err != nil {
240-
if merr, isME := err.(*mysqldriver.MySQLError); isME {
241-
if merr.Number == sql.ErrLockDeadlock && iTry < a.mysqlContext.RetryTxLimit {
242-
logger.Info("found deadlock. will retry tx", "gno", entryContext.Entry.Coordinates.GetGNO(),
243-
"iTry", iTry)
244-
time.Sleep(retryTxDelay)
245-
continue
246-
}
247-
}
238+
if errIsMysqlDeadlock(err) && iTry < a.mysqlContext.RetryTxLimit {
239+
logger.Info("found deadlock. will retry tx", "gno", entryContext.Entry.Coordinates.GetGNO(),
240+
"iTry", iTry)
241+
time.Sleep(retryTxDelay)
242+
continue
243+
} else if err != nil {
248244
a.OnError(common.TaskStateDead, err) // TODO coordinate with other goroutine
249245
keepLoop = false
250246
}
251-
break;
247+
break
252248
}
253249
logger.Debug("after ApplyBinlogEvent.", "gno", entryContext.Entry.Coordinates.GetGNO())
254250
case <-t.C:
@@ -351,11 +347,9 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
351347
} else {
352348
if binlogEntry.Index == 0 {
353349
if rotated {
354-
a.logger.Debug("binlog rotated. WaitForAllCommitted before", "file", a.replayingBinlogFile)
355-
if !a.mtsManager.WaitForAllCommitted() {
350+
if !a.mtsManager.WaitForAllCommitted(a.logger.With("rotate", a.replayingBinlogFile)) {
356351
return nil // TODO shutdown
357352
}
358-
a.logger.Debug("binlog rotated. WaitForAllCommitted after", "file", a.replayingBinlogFile)
359353
a.mtsManager.lastCommitted = 0
360354
a.mtsManager.lastEnqueue = 0
361355
a.wsManager.resetCommonParent(0)
@@ -380,12 +374,11 @@ func (a *ApplierIncr) handleEntry(entryCtx *common.EntryContext) (err error) {
380374
hasDDL := binlogEntry.HasDDL()
381375
inMiddleDDL := hasDDL || a.prevDDL // DDL must be executed separatedly
382376
if inMiddleDDL || isBig {
383-
a.logger.Info("WaitForAllCommitted",
384-
"gno", txGno, "seq", binlogEntry.Coordinates.GetSequenceNumber(),
385-
"lc", binlogEntry.Coordinates.GetLastCommit(), "leq", a.mtsManager.lastEnqueue,
377+
if !a.mtsManager.WaitForAllCommitted(a.logger.With("gno", txGno,
378+
"seq", binlogEntry.Coordinates.GetSequenceNumber(),
379+
"lc", binlogEntry.Coordinates.GetLastCommit(),
386380
"hasDDL", hasDDL, "prevDDL", a.prevDDL,
387-
"bigtx", isBig, "index", binlogEntry.Index)
388-
if !a.mtsManager.WaitForAllCommitted() {
381+
"bigtx", isBig, "index", binlogEntry.Index)) {
389382
return nil // shutdown
390383
}
391384
}

driver/mysql/applier_incr_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,44 @@
11
package mysql
22

3+
import (
4+
"github.com/hashicorp/go-hclog"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestMtsManager(t *testing.T) {
10+
logger := hclog.Default()
11+
shutdownCh := make(chan struct{})
12+
defer close(shutdownCh)
13+
14+
mm := NewMtsManager(shutdownCh, logger)
15+
go mm.LcUpdater()
16+
17+
doneCh := make(chan struct{})
18+
go func() {
19+
mm.WaitForExecution0(1, 0)
20+
logger.Info("wait 1 0")
21+
mm.Executed0(1)
22+
mm.WaitForExecution0(2, 1)
23+
logger.Info("wait 2 1")
24+
mm.Executed0(2)
25+
mm.WaitForExecution0(3, 2)
26+
logger.Info("wait 3 2")
27+
mm.Executed0(3)
28+
29+
// BinlogEntry was resent
30+
mm.WaitForExecution0(2, 1)
31+
mm.Executed0(2)
32+
33+
mm.WaitForAllCommitted(logger)
34+
close(doneCh)
35+
}()
36+
37+
tm := time.NewTimer(10 * time.Second)
38+
select {
39+
case <-doneCh:
40+
t.Log("case finished")
41+
case <-tm.C:
42+
t.Fatal("might be stuck")
43+
}
44+
}

driver/mysql/applier_mts.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"container/heap"
55
"hash/fnv"
66
"sync/atomic"
7+
"time"
78

89
"github.com/actiontech/dtle/driver/common"
910
"github.com/actiontech/dtle/g"
@@ -56,14 +57,18 @@ func NewMtsManager(shutdownCh chan struct{}, logger g.LoggerType) *MtsManager {
5657
}
5758

5859
// This function must be called sequentially.
59-
func (mm *MtsManager) WaitForAllCommitted() bool {
60-
g.Logger.Debug("WaitForAllCommitted", "lc", mm.lastCommitted, "le", mm.lastEnqueue)
60+
func (mm *MtsManager) WaitForAllCommitted(logger g.LoggerType) bool {
61+
t := time.NewTimer(30 * time.Second)
62+
defer t.Stop()
63+
6164
for {
62-
if mm.lastCommitted == mm.lastEnqueue {
65+
if atomic.LoadInt64(&mm.lastCommitted) >= mm.lastEnqueue {
6366
return true
6467
}
6568

6669
select {
70+
case <-t.C: // this will only be triggered once
71+
logger.Warn("WaitForAllCommitted has been stuck for 30s")
6772
case <-mm.shutdownCh:
6873
return false
6974
case <-mm.updated:
@@ -75,15 +80,21 @@ func (mm *MtsManager) WaitForAllCommitted() bool {
7580
// block for waiting. return true for can_execute, false for abortion.
7681
// This function must be called sequentially.
7782
func (mm *MtsManager) WaitForExecution(binlogEntry *common.DataEntry) bool {
78-
mm.lastEnqueue = binlogEntry.Coordinates.GetSequenceNumber()
83+
return mm.WaitForExecution0(
84+
binlogEntry.Coordinates.GetSequenceNumber(),
85+
binlogEntry.Coordinates.(*common.MySQLCoordinateTx).LastCommitted)
86+
}
87+
88+
func (mm *MtsManager) WaitForExecution0(seq int64, lc int64) bool {
89+
mm.lastEnqueue = seq
7990

8091
if mm.forceMts {
8192
return true
8293
}
8394

8495
for {
8596
currentLC := atomic.LoadInt64(&mm.lastCommitted)
86-
if currentLC >= binlogEntry.Coordinates.(*common.MySQLCoordinateTx).LastCommitted {
97+
if currentLC >= lc {
8798
return true
8899
}
89100

@@ -130,10 +141,14 @@ func (mm *MtsManager) LcUpdater() {
130141
}
131142

132143
func (mm *MtsManager) Executed(binlogEntry *common.DataEntry) {
144+
mm.Executed0(binlogEntry.Coordinates.GetSequenceNumber())
145+
}
146+
147+
func (mm *MtsManager) Executed0(seq int64) {
133148
select {
134149
case <-mm.shutdownCh:
135150
return
136-
case mm.chExecuted <- binlogEntry.Coordinates.GetSequenceNumber():
151+
case mm.chExecuted <- seq:
137152
}
138153
}
139154

0 commit comments

Comments
 (0)