diff --git a/magicblock-chainlink/src/submux/mod.rs b/magicblock-chainlink/src/submux/mod.rs index 0c9f75013..1e3990b62 100644 --- a/magicblock-chainlink/src/submux/mod.rs +++ b/magicblock-chainlink/src/submux/mod.rs @@ -1734,4 +1734,75 @@ mod tests { mux.shutdown().await.unwrap(); } + + // ----------------- + // Dedup window expiry edge case + // ----------------- + #[tokio::test] + async fn test_dedup_same_slot_after_window_expires() { + init_logger(); + + let (tx1, rx1) = mpsc::channel(10_000); + let (tx2, rx2) = mpsc::channel(10_000); + let client1 = Arc::new(ChainPubsubClientMock::new(tx1, rx1)); + let client2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2)); + + // Use a short dedup window (100ms) so we can test expiry + let mux: SubMuxClient = new_submux_client( + vec![client1.clone(), client2.clone()], + Some(100), + ); + let mut mux_rx = mux.take_updates(); + + let pk = Pubkey::new_unique(); + mux.subscribe(pk, None).await.unwrap(); + + // First delivery of (pk, slot=42) from client1 + client1 + .send_account_update(pk, 42, &account_with_lamports(100)) + .await; + let first = tokio::time::timeout( + std::time::Duration::from_millis(200), + mux_rx.recv(), + ) + .await + .expect("first update expected") + .expect("stream open"); + assert_eq!(first.pubkey, pk); + assert_eq!(first.slot, 42); + + // Second delivery within the dedup window — should be deduped + client2 + .send_account_update(pk, 42, &account_with_lamports(100)) + .await; + let recv = tokio::time::timeout( + std::time::Duration::from_millis(200), + mux_rx.recv(), + ) + .await; + assert!( + recv.is_err(), + "same-slot update within dedup window should be suppressed" + ); + + // Wait for the dedup window to expire (100ms + margin) + tokio::time::sleep(std::time::Duration::from_millis(120)).await; + + // Third delivery of the same (pk, slot=42) from client2 + // after the window expired — should be forwarded again + client2 + .send_account_update(pk, 42, &account_with_lamports(100)) + .await; + let after_expiry = tokio::time::timeout( + std::time::Duration::from_millis(200), + mux_rx.recv(), + ) + .await + .expect("update expected after dedup window expiry") + .expect("stream open"); + assert_eq!(after_expiry.pubkey, pk); + assert_eq!(after_expiry.slot, 42); + + mux.shutdown().await.unwrap(); + } }