From 079f629851758328b982557b5e8f8edbe70c8e17 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sat, 27 Jun 2026 07:31:56 +0200 Subject: [PATCH] Persist deduplicated message positions so they are always cleared When ProcessMessages drops a message as a duplicate (by idempotency key), stage its position into _deliveredPositions and persist it to the DeliveredPositionsId effect immediately, rather than relying on an immediate per-position store delete. Previously the dedup branch added the position to the in-memory set only when piggybacking on a real delivery's effect upsert. If no delivery followed (all-duplicate batch, no matching subscription, or the flow suspended/completed first) the position was never written to the effect, so AfterFlush never cleared it from the store and a crash could not replay it. Persisting at stage time routes dedup cleanup through the same batched, crash-safe drain path as real deliveries. The write is taken under _lock to match the DeliverMessages access. --- Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs index 74ef9eaa..eaf8b7bc 100644 --- a/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs +++ b/Core/Cleipnir.ResilientFunctions/Queuing/QueueManager.cs @@ -227,7 +227,11 @@ private async Task ProcessMessages(IReadOnlyList messages) if (idempotencyKey != null && !_idempotencyKeys.Add(idempotencyKey, position)) { - await _messageClearer.Clear([position]); + lock (_lock) + { + _deliveredPositions.Add(position); + _effect.FlushlessUpsert(DeliveredPositionsId, _deliveredPositions.ToList(), alias: null); + } continue; }