Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/sync/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use super::fetcher::RpcClient;
use super::sink::SinkSet;
use super::writer::{
detect_all_gaps, detect_blocks_missing_receipts, find_fork_point, get_block_hash, has_gaps,
load_sync_state, save_sync_state, update_sync_rate, update_synced_num, update_tip_num,
load_sync_state, rollback_tip_num, save_sync_state, update_sync_rate, update_synced_num,
update_tip_num,
};

/// RPC concurrency limits
Expand Down Expand Up @@ -508,8 +509,10 @@ impl SyncEngine {
"Reorg handled: deleted orphaned blocks"
);

// Update tip_num to fork point so realtime sync continues from there
update_tip_num(self.pool(), self.chain_id, fork_block, fork_block).await?;
// Roll back tip_num to fork point so realtime sync continues from there.
// Uses rollback_tip_num (direct SET) instead of update_tip_num (GREATEST)
// because tip_num must decrease during a reorg.
rollback_tip_num(self.pool(), self.chain_id, fork_block).await?;

Ok(())
}
Expand Down
21 changes: 21 additions & 0 deletions src/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,27 @@ pub async fn update_tip_num(pool: &Pool, chain_id: u64, tip_num: u64, head_num:
Ok(())
}

/// Roll back tip_num during a reorg. Unlike `update_tip_num` which uses
/// GREATEST (monotonic), this unconditionally sets tip_num and head_num
/// to the fork point so the sync engine re-fetches the canonical chain.
pub async fn rollback_tip_num(pool: &Pool, chain_id: u64, fork_block: u64) -> Result<()> {
let conn = pool.get().await?;

conn.execute(
r#"
UPDATE sync_state
SET tip_num = $1,
head_num = $1,
updated_at = NOW()
WHERE chain_id = $2
"#,
&[&(fork_block as i64), &(chain_id as i64)],
)
.await?;

Ok(())
}

/// Update only synced_num (for gap-fill sync - avoids clobbering tip_num)
pub async fn update_synced_num(pool: &Pool, chain_id: u64, synced_num: u64) -> Result<()> {
let conn = pool.get().await?;
Expand Down
74 changes: 73 additions & 1 deletion tests/smoke_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tidx::db::ThrottledPool;
use tidx::query::EventSignature;
use tidx::sync::engine::SyncEngine;
use tidx::sync::sink::SinkSet;
use tidx::sync::writer::{detect_gaps, get_block_hash, load_sync_state, save_sync_state, update_synced_num, update_tip_num};
use tidx::sync::writer::{detect_gaps, get_block_hash, load_sync_state, rollback_tip_num, save_sync_state, update_synced_num, update_tip_num};
use tidx::types::SyncState;
use serial_test::serial;

Expand Down Expand Up @@ -515,6 +515,78 @@ async fn test_sync_state_only_increases() {
assert_eq!(state.head_num, 100, "head_num should not decrease");
}

#[tokio::test]
#[serial(db)]
async fn test_rollback_tip_num_decreases_during_reorg() {
let db = TestDb::empty().await;
db.truncate_all().await;

let chain_id = 88881u64;

// Establish initial state at block 200
update_tip_num(&db.pool, chain_id, 200, 200).await.expect("Failed");

let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap();
assert_eq!(state.tip_num, 200);
assert_eq!(state.head_num, 200);

// Simulate reorg: roll back tip to fork point at block 180
rollback_tip_num(&db.pool, chain_id, 180).await.expect("Failed to rollback");

let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap();
assert_eq!(state.tip_num, 180, "tip_num must decrease to fork point during reorg");
assert_eq!(state.head_num, 180, "head_num must decrease to fork point during reorg");
}

#[tokio::test]
#[serial(db)]
async fn test_update_tip_num_still_monotonic_forward() {
let db = TestDb::empty().await;
db.truncate_all().await;

let chain_id = 88882u64;

// Normal forward sync: tip advances
update_tip_num(&db.pool, chain_id, 100, 100).await.expect("Failed");
update_tip_num(&db.pool, chain_id, 200, 200).await.expect("Failed");

let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap();
assert_eq!(state.tip_num, 200, "tip_num should advance to 200");

// update_tip_num with lower value should NOT decrease (GREATEST still works)
update_tip_num(&db.pool, chain_id, 150, 150).await.expect("Failed");

let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap();
assert_eq!(state.tip_num, 200, "tip_num should remain at 200 via GREATEST");
}

#[tokio::test]
#[serial(db)]
async fn test_rollback_then_forward_sync_resumes() {
let db = TestDb::empty().await;
db.truncate_all().await;

let chain_id = 88883u64;

// Initial state at block 500
update_tip_num(&db.pool, chain_id, 500, 500).await.expect("Failed");
update_synced_num(&db.pool, chain_id, 400).await.expect("Failed");

// Reorg rolls tip back to fork point 450
rollback_tip_num(&db.pool, chain_id, 450).await.expect("Failed");

let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap();
assert_eq!(state.tip_num, 450, "tip_num should be at fork point");
assert_eq!(state.synced_num, 400, "synced_num should be unchanged");

// Forward sync resumes from fork point, advances past old tip
update_tip_num(&db.pool, chain_id, 510, 520).await.expect("Failed");

let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap();
assert_eq!(state.tip_num, 510, "tip_num should advance past old value");
assert_eq!(state.head_num, 520, "head_num should reflect new chain head");
}

#[tokio::test]
#[serial(db)]
async fn test_sync_state_save_and_load() {
Expand Down
Loading