Persist deduplicated message positions so they are always cleared#176
Merged
Conversation
When ProcessMessages drops a message as a duplicate (by idempotency key), stage its position into _deliveredPositions and persist it to the DeliveredPositionsId effect immediately, rather than relying on an immediate per-position store delete. Previously the dedup branch added the position to the in-memory set only when piggybacking on a real delivery's effect upsert. If no delivery followed (all-duplicate batch, no matching subscription, or the flow suspended/completed first) the position was never written to the effect, so AfterFlush never cleared it from the store and a crash could not replay it. Persisting at stage time routes dedup cleanup through the same batched, crash-safe drain path as real deliveries. The write is taken under _lock to match the DeliverMessages access.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
When
QueueManager.ProcessMessagesdrops a message as a duplicate (by idempotency key), it now stages the position into_deliveredPositionsand persists it to theDeliveredPositionsIdeffect immediately, instead of relying on an immediate per-position store delete.Why
The previous edit changed the dedup branch from
await _messageClearer.Clear([position])to_deliveredPositions.Add(position). The problem:_deliveredPositionsis only written into theDeliveredPositionsIdeffect insideDeliverMessages(on a real delivery), and bothAfterFlushandInitializeclear from the effect, not the in-memory set.So if no delivery followed in the same
QueueManagerlifetime — an all-duplicate batch, the non-dup message never matching a subscription, or the flow suspending/completing first — the duplicate's position:DeliveredPositionsIdeffect,AfterFlushnever deleted it from the store, andInitializereads the effect, which never had it).The duplicate could linger in the message store and be repeatedly re-fetched across replica restarts.
Fix
Persist the position into the effect at stage time, routing dedup cleanup through the same batched, crash-safe drain path (
effect → AfterFlush → MessageClearer.Clear) as real deliveries. The write is taken under_lockto match theDeliverMessagesaccess to_deliveredPositions.Testing
dotnet buildof the core project passes (0 warnings, 0 errors).