Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions magicblock-chainlink/src/submux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1734,4 +1734,75 @@ mod tests {

mux.shutdown().await.unwrap();
}

// -----------------
// Dedup window expiry edge case
// -----------------
#[tokio::test]
async fn test_dedup_same_slot_after_window_expires() {
init_logger();

let (tx1, rx1) = mpsc::channel(10_000);
let (tx2, rx2) = mpsc::channel(10_000);
let client1 = Arc::new(ChainPubsubClientMock::new(tx1, rx1));
let client2 = Arc::new(ChainPubsubClientMock::new(tx2, rx2));

// Use a short dedup window (100ms) so we can test expiry
let mux: SubMuxClient<ChainPubsubClientMock> = new_submux_client(
vec![client1.clone(), client2.clone()],
Some(100),
);
let mut mux_rx = mux.take_updates();

let pk = Pubkey::new_unique();
mux.subscribe(pk, None).await.unwrap();

// First delivery of (pk, slot=42) from client1
client1
.send_account_update(pk, 42, &account_with_lamports(100))
.await;
let first = tokio::time::timeout(
std::time::Duration::from_millis(200),
mux_rx.recv(),
)
.await
.expect("first update expected")
.expect("stream open");
assert_eq!(first.pubkey, pk);
assert_eq!(first.slot, 42);

// Second delivery within the dedup window — should be deduped
client2
.send_account_update(pk, 42, &account_with_lamports(100))
.await;
let recv = tokio::time::timeout(
std::time::Duration::from_millis(200),
mux_rx.recv(),
)
.await;
assert!(
recv.is_err(),
"same-slot update within dedup window should be suppressed"
);

// Wait for the dedup window to expire (100ms + margin)
tokio::time::sleep(std::time::Duration::from_millis(120)).await;

// Third delivery of the same (pk, slot=42) from client2
// after the window expired — should be forwarded again
client2
.send_account_update(pk, 42, &account_with_lamports(100))
.await;
let after_expiry = tokio::time::timeout(
std::time::Duration::from_millis(200),
mux_rx.recv(),
)
.await
.expect("update expected after dedup window expiry")
.expect("stream open");
assert_eq!(after_expiry.pubkey, pk);
assert_eq!(after_expiry.slot, 42);

mux.shutdown().await.unwrap();
}
}
Loading