Route QueueManager message fetch through MessageWatchdog#175
Closed
stidsborg wants to merge 1 commit into
Closed
Conversation
QueueManager no longer holds an IMessageStore. Its FetchAndNotify now pulls through a MessageFetcher delegate wired to MessageWatchdog.FetchMessages, so all IMessageStore.GetMessages access is owned by the watchdog (on-demand single-flow fetch plus the existing replica poll loop). The on-demand fetch passes only the QueueManager's per-instance fetched positions as the skip set and does not consult the clearer's global pushed-set, keeping subscribe-time and restart-from-replay behaviour identical to the previous in-QueueManager fetch.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Moves all
IMessageStore.GetMessagesaccess out ofQueueManagerand intoMessageWatchdog, so the watchdog owns every store read (on-demand single-flow fetch + the existing replica poll loop).MessageFetcherdelegate(StoredId, IReadOnlyList<long> skip) -> Task<IReadOnlyList<StoredMessage>>(signature-identical toIMessageStore.GetMessages).QueueManagerdrops itsIMessageStoredependency for aMessageFetcher;FetchAndNotifycalls the delegate instead of the store.MessageWatchdog.FetchMessagesis the production target; wired viaFunctionsRegistry->InvocationHelper->CreateQueueManager.new QueueManager(...)tests passfunctionStore.MessageStore.GetMessages(method-group -> delegate).Design note
The on-demand fetch passes only the QueueManager's per-instance fetched positions as the skip set and deliberately does not consult the clearer's global pushed-set. Routing it through the global set would suppress re-delivery to a restarted/suspended flow on the same replica (the watchdog marks every fetched position pushed before routing, and
FlowsManager.Pushdrops non-live flows) and hang it. Per-instance skip keeps subscribe-time and restart-from-replay behaviour identical to the previous in-QueueManager fetch.Testing
🤖 Generated with Claude Code