diff --git a/src/sync/engine.rs b/src/sync/engine.rs index d9d649e..f8e5ca7 100644 --- a/src/sync/engine.rs +++ b/src/sync/engine.rs @@ -19,7 +19,8 @@ use super::fetcher::RpcClient; use super::sink::SinkSet; use super::writer::{ detect_all_gaps, detect_blocks_missing_receipts, find_fork_point, get_block_hash, has_gaps, - load_sync_state, save_sync_state, update_sync_rate, update_synced_num, update_tip_num, + load_sync_state, rollback_tip_num, save_sync_state, update_sync_rate, update_synced_num, + update_tip_num, }; /// RPC concurrency limits @@ -508,8 +509,10 @@ impl SyncEngine { "Reorg handled: deleted orphaned blocks" ); - // Update tip_num to fork point so realtime sync continues from there - update_tip_num(self.pool(), self.chain_id, fork_block, fork_block).await?; + // Roll back tip_num to fork point so realtime sync continues from there. + // Uses rollback_tip_num (direct SET) instead of update_tip_num (GREATEST) + // because tip_num must decrease during a reorg. + rollback_tip_num(self.pool(), self.chain_id, fork_block).await?; Ok(()) } diff --git a/src/sync/writer.rs b/src/sync/writer.rs index a8d2f78..dc150ac 100644 --- a/src/sync/writer.rs +++ b/src/sync/writer.rs @@ -779,6 +779,27 @@ pub async fn update_tip_num(pool: &Pool, chain_id: u64, tip_num: u64, head_num: Ok(()) } +/// Roll back tip_num during a reorg. Unlike `update_tip_num` which uses +/// GREATEST (monotonic), this unconditionally sets tip_num and head_num +/// to the fork point so the sync engine re-fetches the canonical chain. +pub async fn rollback_tip_num(pool: &Pool, chain_id: u64, fork_block: u64) -> Result<()> { + let conn = pool.get().await?; + + conn.execute( + r#" + UPDATE sync_state + SET tip_num = $1, + head_num = $1, + updated_at = NOW() + WHERE chain_id = $2 + "#, + &[&(fork_block as i64), &(chain_id as i64)], + ) + .await?; + + Ok(()) +} + /// Update only synced_num (for gap-fill sync - avoids clobbering tip_num) pub async fn update_synced_num(pool: &Pool, chain_id: u64, synced_num: u64) -> Result<()> { let conn = pool.get().await?; diff --git a/tests/smoke_test.rs b/tests/smoke_test.rs index 58a7724..14bc9cc 100644 --- a/tests/smoke_test.rs +++ b/tests/smoke_test.rs @@ -7,7 +7,7 @@ use tidx::db::ThrottledPool; use tidx::query::EventSignature; use tidx::sync::engine::SyncEngine; use tidx::sync::sink::SinkSet; -use tidx::sync::writer::{detect_gaps, get_block_hash, load_sync_state, save_sync_state, update_synced_num, update_tip_num}; +use tidx::sync::writer::{detect_gaps, get_block_hash, load_sync_state, rollback_tip_num, save_sync_state, update_synced_num, update_tip_num}; use tidx::types::SyncState; use serial_test::serial; @@ -515,6 +515,78 @@ async fn test_sync_state_only_increases() { assert_eq!(state.head_num, 100, "head_num should not decrease"); } +#[tokio::test] +#[serial(db)] +async fn test_rollback_tip_num_decreases_during_reorg() { + let db = TestDb::empty().await; + db.truncate_all().await; + + let chain_id = 88881u64; + + // Establish initial state at block 200 + update_tip_num(&db.pool, chain_id, 200, 200).await.expect("Failed"); + + let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap(); + assert_eq!(state.tip_num, 200); + assert_eq!(state.head_num, 200); + + // Simulate reorg: roll back tip to fork point at block 180 + rollback_tip_num(&db.pool, chain_id, 180).await.expect("Failed to rollback"); + + let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap(); + assert_eq!(state.tip_num, 180, "tip_num must decrease to fork point during reorg"); + assert_eq!(state.head_num, 180, "head_num must decrease to fork point during reorg"); +} + +#[tokio::test] +#[serial(db)] +async fn test_update_tip_num_still_monotonic_forward() { + let db = TestDb::empty().await; + db.truncate_all().await; + + let chain_id = 88882u64; + + // Normal forward sync: tip advances + update_tip_num(&db.pool, chain_id, 100, 100).await.expect("Failed"); + update_tip_num(&db.pool, chain_id, 200, 200).await.expect("Failed"); + + let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap(); + assert_eq!(state.tip_num, 200, "tip_num should advance to 200"); + + // update_tip_num with lower value should NOT decrease (GREATEST still works) + update_tip_num(&db.pool, chain_id, 150, 150).await.expect("Failed"); + + let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap(); + assert_eq!(state.tip_num, 200, "tip_num should remain at 200 via GREATEST"); +} + +#[tokio::test] +#[serial(db)] +async fn test_rollback_then_forward_sync_resumes() { + let db = TestDb::empty().await; + db.truncate_all().await; + + let chain_id = 88883u64; + + // Initial state at block 500 + update_tip_num(&db.pool, chain_id, 500, 500).await.expect("Failed"); + update_synced_num(&db.pool, chain_id, 400).await.expect("Failed"); + + // Reorg rolls tip back to fork point 450 + rollback_tip_num(&db.pool, chain_id, 450).await.expect("Failed"); + + let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap(); + assert_eq!(state.tip_num, 450, "tip_num should be at fork point"); + assert_eq!(state.synced_num, 400, "synced_num should be unchanged"); + + // Forward sync resumes from fork point, advances past old tip + update_tip_num(&db.pool, chain_id, 510, 520).await.expect("Failed"); + + let state = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap(); + assert_eq!(state.tip_num, 510, "tip_num should advance past old value"); + assert_eq!(state.head_num, 520, "head_num should reflect new chain head"); +} + #[tokio::test] #[serial(db)] async fn test_sync_state_save_and_load() {