diff --git a/go.mod b/go.mod index dd3c41c..37b9597 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/pingcap/mysql-tester +module github.com/bb7133/mysql-tester go 1.21 @@ -13,6 +13,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/sys v0.5.0 // indirect diff --git a/go.sum b/go.sum index 77d729c..de86ac2 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/defined2014/mysql v0.0.0-20231121061906-fcfacaa39f49 h1:Q3Ri7Ycix4T+Ig7I896I6w0WuCajid2SgyierI16NSo= github.com/defined2014/mysql v0.0.0-20231121061906-fcfacaa39f49/go.mod h1:5GYlY+PrT+c8FHAJTMIsyOuHUNf62KAQuRPMGssbixo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8= github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/r/example.result b/r/example.result index 6e5cb8d..e009bb0 100644 --- a/r/example.result +++ b/r/example.result @@ -6,8 +6,9 @@ a b SELECT 1 FROM NON_EXISTING_TABLE; Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist SELECT 2 FROM NON_EXISTING_TABLE; +Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist SELECT 3 FROM NON_EXISTING_TABLE; -Got one of the listed errors +Error 1146 (42S02): Table 'example.NON_EXISTING_TABLE' doesn't exist SELECT 4; 4 4 @@ -20,16 +21,17 @@ SELECT 6; 1 SELECT; Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "1 SELECT;" 2 SELECT; +Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "2 SELECT;" 3 SELECT; -Got one of the listed errors +Error 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 1 near "3 SELECT;" explain analyze format='brief' select * from t; id estRows actRows task access object execution info operator info memory disk -TableReader 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, rpc_num:, rpc_time:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:} data:TableFullScan Bytes N/A -└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:} keep order:false, stats:pseudo N/A N/A +TableReader 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan Bytes N/A +└─TableFullScan 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {delete_skipped_count:, key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A explain analyze select * from t; id estRows actRows task access object execution info operator info memory disk -TableReader_5 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, rpc_num:, rpc_time:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:} data:TableFullScan_4 Bytes N/A -└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:} keep order:false, stats:pseudo N/A N/A +TableReader_5 10000.00 5 root NULL time:, loops:, RU:, cop_task: {num:, max:, proc_keys:, tot_proc:, tot_wait:, copr_cache_hit_ratio:, build_task_duration:, max_distsql_concurrency:, rpc_info:{Cop:{num_rpc:, total_time:}} data:TableFullScan_4 Bytes N/A +└─TableFullScan_4 10000.00 5 cop[tikv] table:t tikv_task:{time:, loops:, scan_detail: {total_process_keys:, total_process_keys_size:, total_keys:, get_snapshot_time:, rocksdb: {delete_skipped_count:, key_skipped_count:, block: {}}}, time_detail: {total_process_time:, total_wait_time:, tikv_wall_time:} keep order:false, stats:pseudo N/A N/A insert into t values (6, 6); affected rows: 1 info: @@ -47,3 +49,8 @@ affected rows: 3 info: Records: 2 Duplicates: 1 Warnings: 0 1 use `test`;; +use example; +select * from t1; +f1 f2 +1 1 +2 2 diff --git a/src/main.go b/src/main.go index 3da256a..1682edd 100644 --- a/src/main.go +++ b/src/main.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "os" + "os/exec" "path/filepath" "regexp" "sort" @@ -29,6 +30,7 @@ import ( "time" "github.com/defined2014/mysql" + "github.com/google/uuid" "github.com/pingcap/errors" log "github.com/sirupsen/logrus" ) @@ -47,6 +49,17 @@ var ( retryConnCount int collationDisable bool checkErr bool + pathBR string + pathDumpling string + pathCDC string + addressCDC string + downstream string + + downStreamHost string + downStreamPort string + downStreamUser string + downStreamPassword string + downStreamDB string ) func init() { @@ -63,6 +76,11 @@ func init() { flag.IntVar(&retryConnCount, "retry-connection-count", 120, "The max number to retry to connect to the database.") flag.BoolVar(&checkErr, "check-error", false, "if --error ERR does not match, return error instead of just warn") flag.BoolVar(&collationDisable, "collation-disable", false, "run collation related-test with new-collation disabled") + flag.StringVar(&pathBR, "path-br", "", "Path of BR binary") + flag.StringVar(&pathDumpling, "path-dumpling", "", "Path of Dumpling binary") + flag.StringVar(&pathCDC, "path-cdc", "", "Path of TiCDC binary") + flag.StringVar(&addressCDC, "address-cdc", "127.0.0.1:8300", "Address of Server") + flag.StringVar(&downstream, "downstream", "", "Connection string of downstream TiDB cluster") } const ( @@ -98,6 +116,11 @@ type ReplaceRegex struct { replace string } +type SourceAndTarget struct { + sourceTable string + targetTable string +} + type tester struct { mdb *sql.DB name string @@ -148,6 +171,18 @@ type tester struct { // replace output result through --replace_regex /\.dll/.so/ replaceRegex []*ReplaceRegex + + // backup and restore context through --backup_and_restore $BACKUP_TABLE as $RESTORE_TABLE' + backupAndRestore *SourceAndTarget + + // dump and import context through --dump_and_import $SOURCE_TABLE as $TARGET_TABLE' + dumpAndImport *SourceAndTarget + + // replication checkpoint database name + replicationCheckpointDB string + + // replication checkpoint ID + replicationCheckpointID int } func newTester(name string) *tester { @@ -162,6 +197,8 @@ func newTester(name string) *tester { t.enableConcurrent = false t.enableInfo = false + t.replicationCheckpointDB = "checkpoint-" + uuid.NewString() + t.replicationCheckpointID = 0 return t } @@ -202,7 +239,7 @@ func isTiDB(db *sql.DB) bool { return true } -func (t *tester) addConnection(connName, hostName, userName, password, db string) { +func (t *tester) addConnection(connName, hostName, port, userName, password, db string) { var ( mdb *sql.DB err error @@ -268,6 +305,64 @@ func (t *tester) disconnect(connName string) { t.currConnName = default_connection } +func parseUserInfo(userInfo string) (string, string, error) { + colonIndex := strings.Index(userInfo, ":") + if colonIndex == -1 { + return "", "", fmt.Errorf("missing password in userinfo") + } + return userInfo[:colonIndex], userInfo[colonIndex+1:], nil +} + +func parseHostPort(hostPort string) (string, string, error) { + colonIndex := strings.Index(hostPort, ":") + if colonIndex == -1 { + return "", "", fmt.Errorf("missing port in host:port") + } + return hostPort[:colonIndex], hostPort[colonIndex+1:], nil +} + +func parseDownstream(connStr string) (dbname string, host string, port string, user string, password string) { + // Splitting into userinfo and network/database parts + parts := strings.SplitN(connStr, "@", 2) + if len(parts) != 2 { + fmt.Println("Invalid connection string format") + return + } + + // Parsing userinfo + userInfo := parts[0] + user, password, err := parseUserInfo(userInfo) + if err != nil { + fmt.Println("Error parsing userinfo:", err) + return + } + + // Splitting network type and database part + networkAndDB := parts[1] + networkTypeIndex := strings.Index(networkAndDB, "(") + if networkTypeIndex == -1 { + fmt.Println("Invalid connection string format: missing network type") + return + } + + // Extracting host, port, and database name + hostPortDB := networkAndDB[networkTypeIndex+1:] + hostPortDBParts := strings.SplitN(hostPortDB, ")/", 2) + if len(hostPortDBParts) != 2 { + fmt.Println("Invalid connection string format") + return + } + + host, port, err = parseHostPort(hostPortDBParts[0]) + if err != nil { + fmt.Println("Error parsing host and port:", err) + return + } + + dbname = hostPortDBParts[1] + return +} + func (t *tester) preProcess() { dbName := "test" mdb, err := OpenDBWithRetry("mysql", user+":"+passwd+"@tcp("+host+":"+port+")/"+dbName+"?time_zone=%27Asia%2FShanghai%27&allowAllFiles=true"+params, retryConnCount) @@ -296,6 +391,7 @@ func (t *tester) preProcess() { log.Fatalf("Executing create db %s err[%v]", dbName, err) } t.mdb = mdb + conn, err := initConn(mdb, user, passwd, host, dbName) if err != nil { log.Fatalf("Open db err %v", err) @@ -303,6 +399,17 @@ func (t *tester) preProcess() { t.conn[default_connection] = conn t.curr = conn t.currConnName = default_connection + + if downstream != "" { + // create replication checkpoint database + if _, err := t.mdb.Exec(fmt.Sprintf("create database if not exists `%s`", t.replicationCheckpointDB)); err != nil { + log.Fatalf("Executing create db %s err[%v]", t.replicationCheckpointDB, err) + } + + downStreamDB, downStreamHost, downStreamPort, downStreamUser, downStreamPassword = parseDownstream(downstream) + t.addConnection("downstream", downStreamHost, downStreamPort, downStreamUser, downStreamPassword, downStreamDB) + } + t.switchConnection(default_connection) } func (t *tester) postProcess() { @@ -312,6 +419,7 @@ func (t *tester) postProcess() { } t.mdb.Close() }() + t.switchConnection(default_connection) if !reserveSchema { rows, err := t.mdb.Query("show databases") if err != nil { @@ -352,6 +460,167 @@ func (t *tester) addSuccess(testSuite *XUnitTestSuite, startTime *time.Time, cnt }) } +func generateBRStatements(source, target string) (string, string) { + // Generate a random UUID + uuid := uuid.NewString() + + // Create the TMP_DIR path + tmpDir := fmt.Sprintf("/tmp/%s_%s", source, uuid) + + // Generate the SQL statements + backupSQL := fmt.Sprintf("BACKUP TABLE `%s` TO '%s'", source, tmpDir) + restoreSQL := fmt.Sprintf("RESTORE TABLE `%s` FROM '%s'", source, tmpDir) + + return backupSQL, restoreSQL +} + +func (t *tester) handleBackupAndRestore(q query) error { + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("backup_and_restore is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } + t.enableResultLog = false + defer func() { t.enableResultLog = true }() + + var err error + t.backupAndRestore, err = parseSourceAndTarget(q.Query) + if err != nil { + return errors.Annotate(err, fmt.Sprintf("Could not parse backup table and restore table name in --backup_and_restore, line: %d sql:%v", q.Line, q.Query)) + } + backupStmt, restoreStmt := generateBRStatements(t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) + if err := t.executeStmt(backupStmt); err != nil { + return err + } + tempTable := t.backupAndRestore.sourceTable + uuid.NewString() + renameStmt := fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, tempTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + if err := t.executeStmt(restoreStmt); err != nil { + return err + } + renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", t.backupAndRestore.sourceTable, t.backupAndRestore.targetTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + renameStmt = fmt.Sprintf("RENAME TABLE `%s` TO `%s`", tempTable, t.backupAndRestore.sourceTable) + if err := t.executeStmt(renameStmt); err != nil { + return err + } + return nil +} + +func (t *tester) dumpTable(source string) (string, error) { + // Check if the file exists + if _, err := os.Stat(pathDumpling); os.IsNotExist(err) { + return "", errors.New(fmt.Sprintf("path-dumpling [%s] does not exist.", pathDumpling)) + } + + log.Warnf("Start dumping table: %s", source) + path := "/tmp/" + source + "_" + uuid.NewString() + cmdArgs := []string{ + fmt.Sprintf("-h%s", host), + fmt.Sprintf("-P%s", port), + fmt.Sprintf("-u%s", user), + fmt.Sprintf("-T%s.%s", t.name, source), + fmt.Sprintf("-o%s", path), + "--output-filename-template", + "tempDump", + "--no-header", + "--filetype", + "csv", + } + + if passwd != "" { + cmdArgs = append(cmdArgs, fmt.Sprintf("-p%s", passwd)) + } + + cmd := exec.Command(pathDumpling, cmdArgs...) + + output, err := cmd.CombinedOutput() + if err != nil { + return "", errors.Annotate(err, fmt.Sprintf("Dumpling failed: %s, output: %s.", cmd.String(), string(output))) + } + log.Warnf("Done executing commands: %s, output: %s)", + cmd.String(), string(output)) + return path, nil +} + +func (t *tester) importTableStmt(path, target string) string { + return fmt.Sprintf(` + IMPORT INTO %s + FROM '%s/tempDump.csv' + `, target, path) +} + +func (t *tester) handleDumpAndImport(q query) error { + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("dump_and_import is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } + t.enableResultLog = false + defer func() { t.enableResultLog = true }() + var err error + t.dumpAndImport, err = parseSourceAndTarget(q.Query) + if err != nil { + return err + } + path, err := t.dumpTable(t.dumpAndImport.sourceTable) + if err != nil { + return err + } + dupTableStmt := fmt.Sprintf("CREATE TABLE `%s` LIKE `%s`", t.dumpAndImport.targetTable, t.dumpAndImport.sourceTable) + if err := t.executeStmt(dupTableStmt); err != nil { + return err + } + importStmt := t.importTableStmt(path, t.dumpAndImport.targetTable) + if err = t.executeStmt(importStmt); err != nil { + return err + } + return nil +} + +func (t *tester) waitForReplicationCheckpoint() error { + curr := t.currConnName + defer t.switchConnection(curr) + + if err := t.executeStmt(fmt.Sprintf("use `%s`", t.replicationCheckpointDB)); err != nil { + return err + } + + markerTable := fmt.Sprintf("marker_%d", t.replicationCheckpointID) + if err := t.executeStmt(fmt.Sprintf("create table `%s`.`%s` (id int primary key)", t.replicationCheckpointDB, markerTable)); err != nil { + return err + } + + t.switchConnection("downstream") + + checkInterval := 1 * time.Second + queryTimeout := 10 * time.Second + + // Keep querying until the table is found + for { + ctx, cancel := context.WithTimeout(context.Background(), queryTimeout) + defer cancel() + + query := fmt.Sprintf("select * from information_schema.tables where table_schema = '%s' and table_name = '%s';", t.replicationCheckpointDB, markerTable) + rows, err := t.mdb.QueryContext(ctx, query) + if err != nil { + log.Printf("Error checking for table: %v", err) + return err + } + + if rows.Next() { + fmt.Printf("Table '%s' found!\n", markerTable) + break + } else { + fmt.Printf("Table '%s' not found. Retrying in %v...\n", markerTable, checkInterval) + } + + time.Sleep(checkInterval) + } + + return nil +} + func (t *tester) Run() error { t.preProcess() defer t.postProcess() @@ -474,7 +743,7 @@ func (t *tester) Run() error { for i := 0; i < 4; i++ { args = append(args, "") } - t.addConnection(args[0], args[1], args[2], args[3], args[4]) + t.addConnection(args[0], args[1], port, args[2], args[3], args[4]) case Q_CONNECTION: q.Query = strings.TrimSpace(q.Query) if q.Query[len(q.Query)-1] == ';' { @@ -523,6 +792,21 @@ func (t *tester) Run() error { return errors.Annotate(err, fmt.Sprintf("Could not parse regex in --replace_regex: line: %d sql:%v", q.Line, q.Query)) } t.replaceRegex = regex + case Q_BACKUP_AND_RESTORE: + if err := t.handleBackupAndRestore(q); err != nil { + return err + } + case Q_DUMP_AND_IMPORT: + if err := t.handleDumpAndImport(q); err != nil { + return err + } + case Q_REPLICATION_CHECKPOINT: + if !isTiDB(t.mdb) { + return errors.New(fmt.Sprintf("replication_checkpoint is only supported on TiDB, line: %d sql:%v", q.Line, q.Query)) + } + if err := t.waitForReplicationCheckpoint(); err != nil { + return err + } default: log.WithFields(log.Fields{"command": q.firstWord, "arguments": q.Query, "line": q.Line}).Warn("command not implemented") } @@ -539,7 +823,6 @@ func (t *tester) Run() error { if xmlPath != "" { t.addSuccess(&testSuite, &startTime, testCnt) } - return t.flushResult() } diff --git a/src/query.go b/src/query.go index 6a128d8..1ecf40c 100644 --- a/src/query.go +++ b/src/query.go @@ -124,6 +124,9 @@ const ( Q_COMMENT /* Comments, ignored. */ Q_COMMENT_WITH_COMMAND Q_EMPTY_LINE + Q_BACKUP_AND_RESTORE + Q_DUMP_AND_IMPORT + Q_REPLICATION_CHECKPOINT ) // ParseQueries parses an array of string into an array of query object. diff --git a/src/type.go b/src/type.go index 50ea5a6..b346e10 100644 --- a/src/type.go +++ b/src/type.go @@ -114,6 +114,9 @@ var commandMap = map[string]int{ "single_query": Q_SINGLE_QUERY, "begin_concurrent": Q_BEGIN_CONCURRENT, "end_concurrent": Q_END_CONCURRENT, + "backup_and_restore": Q_BACKUP_AND_RESTORE, + "dump_and_import": Q_DUMP_AND_IMPORT, + "replication_checkpoint": Q_REPLICATION_CHECKPOINT, } func findType(cmdName string) int { diff --git a/src/util.go b/src/util.go index b62c64b..a677d9b 100644 --- a/src/util.go +++ b/src/util.go @@ -104,3 +104,18 @@ func ParseReplaceRegex(originalString string) ([]*ReplaceRegex, error) { } return ret, nil } + +func parseSourceAndTarget(s string) (*SourceAndTarget, error) { + s = strings.ToLower(strings.TrimSpace(s)) + + parts := strings.Split(s, "as") + if len(parts) != 2 { + return nil, errors.Errorf("Could not parse source table and target table name: %v", s) + } + + st := &SourceAndTarget{ + sourceTable: strings.TrimSpace(parts[0]), + targetTable: strings.TrimSpace(parts[1]), + } + return st, nil +} diff --git a/t/br_integration.test b/t/br_integration.test new file mode 100644 index 0000000..5645c37 --- /dev/null +++ b/t/br_integration.test @@ -0,0 +1,9 @@ +# Test BR and AutoIncrement + +CREATE TABLE t1 (a INT PRIMARY KEY NONCLUSTERED AUTO_INCREMENT, b INT) AUTO_ID_CACHE = 100; +INSERT INTO t1 (b) VALUES (1), (2), (3); +SHOW TABLE t1 NEXT_ROW_ID; + +--backup_and_restore t1 AS tt1 + +SHOW TABLE tt1 NEXT_ROW_ID; \ No newline at end of file diff --git a/t/cdc_integration.test b/t/cdc_integration.test new file mode 100644 index 0000000..e0cd210 --- /dev/null +++ b/t/cdc_integration.test @@ -0,0 +1,12 @@ +# Test TiCDC replication + +CREATE TABLE t3 (a INT PRIMARY KEY, b INT, UNIQUE KEY (b)); + +INSERT INTO t3 VALUES (1, 23); +--error ER_DUP_ENTRY: Duplicate entry '23' for key 'b' +INSERT INTO t3 VALUES (11, 23); + +--replication_checkpoint +--connection downstream +--error ER_DUP_ENTRY: Duplicate entry '23' for key 'b' +INSERT INTO t3 VALUES (11, 23); diff --git a/t/dumpling_import_integration.test b/t/dumpling_import_integration.test new file mode 100644 index 0000000..27c56a7 --- /dev/null +++ b/t/dumpling_import_integration.test @@ -0,0 +1,10 @@ +# Test Lightning and AutoRandom + +CREATE TABLE t2(c BIGINT AUTO_RANDOM PRIMARY KEY, a INT, b INT); +INSERT INTO t2(a, b) VALUES (1, 1), (2, 2), (3, 3); +SELECT * FROM t2; + +--dump_and_import t2 AS tt2 + +INSERT INTO tt2(a, b) VALUES (1, 1), (2, 2), (3, 3); +SELECT * FROM tt2; \ No newline at end of file diff --git a/t/example.test b/t/example.test index 4b6f18d..8a1f389 100644 --- a/t/example.test +++ b/t/example.test @@ -38,6 +38,10 @@ explain analyze select * from t; --enable_info insert into t values (6, 6); +# --backup_and_restore t AS tt + +# --dump_and_import t AS td + DROP TABLE IF EXISTS t1; CREATE TABLE t1 (f1 INT PRIMARY KEY, f2 INT NOT NULL UNIQUE); INSERT t1 VALUES (1, 1); @@ -48,3 +52,10 @@ INSERT t1 VALUES (1, 1), (1, 1) ON DUPLICATE KEY UPDATE f1 = 2, f2 = 2; --echo $a use `test`;; + +sleep 10; + +--replication_checkpoint +connection default; +use example; +select * from t1;