Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix changefeed stuck problem when overwrite resume with a forward checkpointTs #12056

Merged
merged 8 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.barriers = nil
c.initialized = false
c.isReleased = true
c.resolvedTs = 0

log.Info("changefeed closed",
zap.String("namespace", c.id.Namespace),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enable-sync-point = true
sync-point-interval = "30s"
67 changes: 67 additions & 0 deletions tests/integration_tests/overwrite_resume_with_syncpoint/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/bin/bash
# the script test when we enable syncpoint, and pause the changefeed,
# then resume with a forward checkpoint, to ensure the changefeed can be sync correctly.

set -eux

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function check_ts_forward() {
changefeedid=$1
rts1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts')
checkpoint1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso')
sleep 1
rts2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts')
checkpoint2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso')
if [[ "$rts1" != "null" ]] && [[ "$rts1" != "0" ]]; then
if [[ "$rts1" -ne "$rts2" ]] || [[ "$checkpoint1" -ne "$checkpoint2" ]]; then
echo "changefeed is working normally rts: ${rts1}->${rts2} checkpoint: ${checkpoint1}->${checkpoint2}"
return
fi
fi
exit 1
}

function run() {
# No need to test kafka and storage sink.
if [ "$SINK_TYPE" != "mysql" ]; then
return
fi

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

SINK_URI="mysql://[email protected]:3306/"
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml --changefeed-id="test4"

check_ts_forward "test4"

run_cdc_cli changefeed pause --changefeed-id="test4"

sleep 15

checkpoint1=$(cdc cli changefeed query --changefeed-id="test4" 2>&1 | jq '.checkpoint_tso')
# add a large number to avoid the problem of losing precision when jq processing large integers
checkpoint1=$((checkpoint1 + 1000000))

# resume a forward checkpointTs
run_cdc_cli changefeed resume --changefeed-id="test4" --no-confirm --overwrite-checkpoint-ts=$checkpoint1

check_ts_forward "test4"

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ group=$2
# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant
# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence
# multi_cdc_cluster capture_suicide_while_balance_table
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart safe_mode"
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint server_config_compatibility changefeed_dup_error_restart overwrite_resume_with_syncpoint"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this test to mysql_only_http group to make CI faster.

mysql_only_http="http_api http_api_tls api_v2"
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

Expand Down
Loading