-
Notifications
You must be signed in to change notification settings - Fork 3
refactor(sacp): Session API improvements and proxy race condition fix #95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The name run_until better reflects the semantics: run background responders until the provided closure completes. This aligns with tokio::task::LocalSet::run_until and similar APIs. Updated all call sites across sacp, sacp-tokio, sacp-conductor, elizacp, yopo, and associated tests/examples.
The name start_session better reflects that this method blocks until the session handshake completes before returning. The 'spawn' terminology was misleading since the responders are spawned but the method itself awaits. Also renamed spawn_session_proxy to start_session_proxy for consistency.
Add SessionBuilder::on_session_start() which spawns a task that runs the provided closure once the session starts. Unlike start_session(), this returns immediately without blocking the current task. The closure receives an ActiveSession<'static, _> and runs in a background task, making it suitable for fire-and-forget session handling patterns.
SessionBuilder now has a BlockState type parameter (Blocking/NonBlocking). Methods that block the current task (run_until, start_session, start_session_proxy) require calling block_task() first. The on_session_start() method remains available without block_task() since it spawns a task and returns immediately. Also renamed run_session to run_until for consistency with JrConnection::run_until.
…t_session_proxy - start_session_proxy now returns SessionId instead of () - Add on_proxy_session_start() which spawns a task and runs a closure with the SessionId once the proxy is established - The spawned variant doesn't require block_task() since it returns immediately
Separate session handler registration from MCP handler registrations so they can be managed independently during the proxy transition. The new implementation follows a careful sequence to prevent message loss or reordering: 1. Drop session handler registration (stops new messages to channel) 2. Drop update_tx (closes channel for drain detection) 3. Drain and forward any queued messages to client 4. Install proxy handler for future messages This ensures all messages are delivered in order during the transition from active session mode to proxy mode.
The example now correctly handles the SessionId return value and returns Ok(()) from the handler to satisfy IntoHandled bounds.
…arding In proxy mode, forward messages to the conductor's successor using the Agent endpoint instead of manually wrapping in SuccessorMessage and sending to Client. This simplifies the code and prepares for fixing the message ordering bug where responses bypass the conductor's message loop.
…e loop The conductor must maintain message ordering when forwarding between components. Previously, send_proxied_message_to would respond directly on the request context, bypassing the conductor's message loop and causing a race condition where responses could arrive before notifications. Changed send_proxied_message_via to send_proxied_message_to_via with an explicit endpoint parameter, and use it for all message forwarding to ensure responses go through conductor_tx like notifications do.
Since the conductor can act as a proxy, it's important to be explicit about which endpoint (Client or Agent) is being used rather than relying on HasDefaultEndpoint. This makes the message flow direction clear in the code. - Change if_request/if_notification to if_request_from/if_notification_from with Client - Change send_notification to send_notification_to with Client
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
This PR refactors the Session API for clarity and adds a fix for a race condition in
proxy_remaining_messages.Changes
API Renames
with_client→run_untilonJrConnectionBuilder- Better reflects the blocking behavior (precedent:tokio::task::LocalSet::run_until)spawn_session→start_sessiononSessionBuilder- The old name was misleading since it blocks the current taskNew Spawned Session Methods
on_session_start(cx, async |session| {...})- Truly spawned session that returns immediatelyon_proxy_session_start(cx, async |id| {...})- Spawned proxy session variantBuilder Pattern for Blocking
block_task()method with sentinel types (Blocking/NonBlocking) to make blocking behavior explicit at the type levelstart_session()andstart_session_proxy()now require callingblock_task()firstRace Condition Fix
Fixed
proxy_remaining_messagesto prevent message loss/reordering during the transition from active session to proxy mode:update_tx(closes channel for drain detection)This required separating
session_handler_registrationfrommcp_handler_registrationsinActiveSession.Final API
.block_task().start_session()ActiveSession<'static>.block_task().start_session_proxy(cx)SessionId.on_session_start(cx, async |session| {...})().on_proxy_session_start(cx, async |id| {...})()