From e8767aeb93f98404552a7b1f34e91eaa20cc855e Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sat, 8 Feb 2025 18:09:36 +0800 Subject: [PATCH 1/7] update --- cdc/owner/changefeed.go | 1 + .../conf/changefeed.toml | 2 + .../overwrite_resume_with_syncpoint/run.sh | 68 +++++++++++++++++++ 3 files changed, 71 insertions(+) create mode 100644 tests/integration_tests/overwrite_resume_with_syncpoint/conf/changefeed.toml create mode 100644 tests/integration_tests/overwrite_resume_with_syncpoint/run.sh diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index f04f83949b0..e28745f7fd1 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -719,6 +719,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { c.barriers = nil c.initialized = false c.isReleased = true + c.resolvedTs = 0 log.Info("changefeed closed", zap.String("namespace", c.id.Namespace), diff --git a/tests/integration_tests/overwrite_resume_with_syncpoint/conf/changefeed.toml b/tests/integration_tests/overwrite_resume_with_syncpoint/conf/changefeed.toml new file mode 100644 index 00000000000..63153e1dc53 --- /dev/null +++ b/tests/integration_tests/overwrite_resume_with_syncpoint/conf/changefeed.toml @@ -0,0 +1,2 @@ +enable-sync-point = true +sync-point-interval = "30s" \ No newline at end of file diff --git a/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh b/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh new file mode 100644 index 00000000000..b4f08c4b872 --- /dev/null +++ b/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh @@ -0,0 +1,68 @@ +#!/bin/bash +# the script test when we enable syncpoint, and pause the changefeed, +# then resume with a forward checkpoint, to ensure the changefeed can be sync correctly. + +set -eux + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function check_ts_forward() { + changefeedid=$1 + rts1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts') + checkpoint1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso') + sleep 1 + rts2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts') + checkpoint2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso') + if [[ "$rts1" != "null" ]] && [[ "$rts1" != "0" ]]; then + if [[ "$rts1" -ne "$rts2" ]] || [[ "$checkpoint1" -ne "$checkpoint2" ]]; then + echo "changefeed is working normally rts: ${rts1}->${rts2} checkpoint: ${checkpoint1}->${checkpoint2}" + return + fi + fi + exit 1 +} + +function run() { + # No need to test kafka and storage sink. + if [ "$SINK_TYPE" != "mysql" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + SINK_URI="mysql://root@127.0.0.1:3306/" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml --changefeed-id="test4" + + check_ts_forward "test4" + + run_cdc_cli changefeed pause --changefeed-id="test4" + + sleep 15 + + checkpoint1=$(cdc cli changefeed query --changefeed-id="test4" 2>&1 | jq '.checkpoint_tso') + # add a large number to avoid the problem of losing precision when jq processing large integers + checkpoint1=$((checkpoint1 + 1000000)) + + # resume a forward checkpointTs + run_cdc_cli changefeed resume --changefeed-id="test4" --no-confirm --overwrite-checkpoint-ts=$checkpoint1 + + check_ts_forward "test4" + + cleanup_process $CDC_BINARY +} + + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 668bf7edbb41c3033f3d949389ce09a778f4dc63 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sat, 8 Feb 2025 18:52:00 +0800 Subject: [PATCH 2/7] update --- tests/integration_tests/run_group.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index e45d8825118..cd0e6394356 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -10,7 +10,7 @@ group=$2 # Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant # changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence # multi_cdc_cluster capture_suicide_while_balance_table -mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart" +mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart overwrite_resume_with_syncpoint" mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table" From 4690182a33db41a13db469b14973cb6b49bc09c7 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sat, 8 Feb 2025 19:04:17 +0800 Subject: [PATCH 3/7] update --- tests/integration_tests/run_group.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 9021edc3a10..cd0e6394356 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -10,7 +10,6 @@ group=$2 # Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant # changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence # multi_cdc_cluster capture_suicide_while_balance_table - mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart overwrite_resume_with_syncpoint" mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table" From 6de1807b4603b25acc6816ca5697a37af220335d Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Sat, 8 Feb 2025 19:09:37 +0800 Subject: [PATCH 4/7] update --- .../overwrite_resume_with_syncpoint/run.sh | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh b/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh index b4f08c4b872..9fa464d0a52 100644 --- a/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh +++ b/tests/integration_tests/overwrite_resume_with_syncpoint/run.sh @@ -1,5 +1,5 @@ #!/bin/bash -# the script test when we enable syncpoint, and pause the changefeed, +# the script test when we enable syncpoint, and pause the changefeed, # then resume with a forward checkpoint, to ensure the changefeed can be sync correctly. set -eux @@ -34,34 +34,33 @@ function run() { rm -rf $WORK_DIR && mkdir -p $WORK_DIR - start_tidb_cluster --workdir $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR cd $WORK_DIR - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - SINK_URI="mysql://root@127.0.0.1:3306/" - run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml --changefeed-id="test4" + SINK_URI="mysql://root@127.0.0.1:3306/" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml --changefeed-id="test4" check_ts_forward "test4" - run_cdc_cli changefeed pause --changefeed-id="test4" + run_cdc_cli changefeed pause --changefeed-id="test4" - sleep 15 + sleep 15 - checkpoint1=$(cdc cli changefeed query --changefeed-id="test4" 2>&1 | jq '.checkpoint_tso') - # add a large number to avoid the problem of losing precision when jq processing large integers - checkpoint1=$((checkpoint1 + 1000000)) + checkpoint1=$(cdc cli changefeed query --changefeed-id="test4" 2>&1 | jq '.checkpoint_tso') + # add a large number to avoid the problem of losing precision when jq processing large integers + checkpoint1=$((checkpoint1 + 1000000)) - # resume a forward checkpointTs - run_cdc_cli changefeed resume --changefeed-id="test4" --no-confirm --overwrite-checkpoint-ts=$checkpoint1 + # resume a forward checkpointTs + run_cdc_cli changefeed resume --changefeed-id="test4" --no-confirm --overwrite-checkpoint-ts=$checkpoint1 - check_ts_forward "test4" + check_ts_forward "test4" cleanup_process $CDC_BINARY } - trap stop_tidb_cluster EXIT run $* check_logs $WORK_DIR From f5b271de56b79cb6f5bf68ef7de8bee06ab41fd1 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 11 Feb 2025 17:00:25 +0800 Subject: [PATCH 5/7] update --- cdc/owner/changefeed.go | 4 +++- tests/integration_tests/run_group.sh | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index e28745f7fd1..1f5876982b4 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -466,8 +466,10 @@ LOOP2: } checkpointTs := c.state.Status.CheckpointTs - if c.resolvedTs == 0 { + //Invariant: ResolvedTs must >= checkpointTs! + if c.resolvedTs == 0 || c.resolvedTs < checkpointTs { c.resolvedTs = checkpointTs + log.Info("Initialize changefeed resolvedTs!", zap.Uint64("resolvedTs", c.resolvedTs), zap.Uint64("checkpointTs", c.checkpointTs)) } minTableBarrierTs := c.state.Status.MinTableBarrierTs diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index cd0e6394356..d0b88669bba 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -10,7 +10,7 @@ group=$2 # Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant # changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence # multi_cdc_cluster capture_suicide_while_balance_table -mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart overwrite_resume_with_syncpoint" +mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart" mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table" @@ -41,7 +41,7 @@ groups=( ["G07"]='kv_client_stream_reconnect cdc split_region' ["G08"]='processor_err_chan changefeed_reconstruct multi_capture' ["G09"]='gc_safepoint changefeed_pause_resume cli savepoint synced_status' - ["G10"]='default_value simple cdc_server_tips event_filter' + ["G10"]='default_value simple cdc_server_tips event_filter overwrite_resume_with_syncpoint' ["G11"]='resolve_lock move_table autorandom generate_column' ["G12"]='many_pk_or_uk capture_session_done_during_task ddl_attributes' ["G13"]='tiflash region_merge common_1' From adc67e27325b58d63cb3890a85cb18bb383ddefe Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 11 Feb 2025 17:07:09 +0800 Subject: [PATCH 6/7] update --- cdc/owner/changefeed.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 1f5876982b4..86ae0b259ce 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -469,7 +469,7 @@ LOOP2: //Invariant: ResolvedTs must >= checkpointTs! if c.resolvedTs == 0 || c.resolvedTs < checkpointTs { c.resolvedTs = checkpointTs - log.Info("Initialize changefeed resolvedTs!", zap.Uint64("resolvedTs", c.resolvedTs), zap.Uint64("checkpointTs", c.checkpointTs)) + log.Info("Initialize changefeed resolvedTs!", zap.Uint64("resolvedTs", c.resolvedTs), zap.Uint64("checkpointTs", checkpointTs)) } minTableBarrierTs := c.state.Status.MinTableBarrierTs From fb74becdbb1239df8b1743dae43579f402e7a315 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 11 Feb 2025 17:35:48 +0800 Subject: [PATCH 7/7] update --- cdc/owner/changefeed.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 86ae0b259ce..f72c29a28bb 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -466,7 +466,7 @@ LOOP2: } checkpointTs := c.state.Status.CheckpointTs - //Invariant: ResolvedTs must >= checkpointTs! + // Invariant: ResolvedTs must >= checkpointTs! if c.resolvedTs == 0 || c.resolvedTs < checkpointTs { c.resolvedTs = checkpointTs log.Info("Initialize changefeed resolvedTs!", zap.Uint64("resolvedTs", c.resolvedTs), zap.Uint64("checkpointTs", checkpointTs))