Fix/extension negotiation race#276
Conversation
|
Would close: #264 |
487856e to
1f26025
Compare
1f26025 to
3231f43
Compare
3231f43 to
a63c174
Compare
a63c174 to
ee3fd94
Compare
ae508a0 to
98f1165
Compare
Wait for RequestExtensionsSuccess/Error response before starting the SV1 server. This ensures the ChannelManager knows which extensions are active BEFORE downstream miners can connect. Previously, RequestExtensions was sent asynchronously and the SV1 server started immediately without waiting for the response. This caused a race condition where extension-dependent behavior (like UserIdentity TLV for worker hashrate tracking) could not be reliably determined. Changes: - setup_connection() now returns Vec<u16> of negotiated extensions - negotiate_extensions() handles the extension negotiation flow synchronously - handle_extension_response() parses RequestExtensionsSuccess/Error - ChannelManager::new() accepts negotiated_extensions parameter - ChannelManager::set_negotiated_extensions() for upstream fallback scenarios - Added ExtensionNegotiationTimeout error variant (30 second timeout) Fixes: stratum-mining#264
Wait for RequestExtensionsSuccess/Error response before starting the downstream server. This ensures the ChannelManager knows which extensions are active BEFORE downstream miners can connect. Previously, RequestExtensions was sent asynchronously and the downstream server started immediately without waiting for the response. This caused a race condition where extension-dependent behavior could not be reliably determined. Changes: - setup_connection() now returns Vec<u16> of negotiated extensions - negotiate_extensions() handles the extension negotiation flow synchronously - handle_extension_response() parses RequestExtensionsSuccess/Error - Upstream::start() now returns the negotiated extensions - ChannelManager::set_negotiated_extensions() stores extensions before downstream server starts - Added ExtensionNegotiationTimeout error variant (30 second timeout) Fixes: stratum-mining#264
When a miner's username exceeds 32 bytes (the protocol limit for the UserIdentity TLV in SV2 extensions), truncate it at a valid UTF-8 character boundary and log a warning. This ensures usernames are passed through to the pool even when they exceed the limit, preserving user recognition on pool dashboards while maintaining protocol compliance.
Replace magic numbers 0x01 and 0x02 with MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS and MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR constants for better readability.
Align Translator with JDC by returning Vec<u16> directly from initialize_upstream() instead of wrapping it in UpstreamInitResult. This simplifies the code and makes both implementations consistent.
98f1165 to
0658e7d
Compare
| JDCError::fallback(e) | ||
| })?; | ||
|
|
||
| self.handle_extension_response(response).await |
There was a problem hiding this comment.
How do we know that what we get from the upstream is the response of the RequestExtensions message?
There was a problem hiding this comment.
The Extensions Negotiation spec (Extension 0x0001), Section 4.2 "Ordering" (https://github.com/stratum-mining/sv2-spec/blob/main/extensions/0x0001-extensions-negotiation.md#4-implementation-notes) states:
- The
RequestExtensionsmessage MUST be sent immediately afterSetupConnection.Successand before any other protocol-specific messages.- The response to
RequestExtensionsMUST be received before proceeding with any further protocol-specific messages.
And Section 4.3 — "Backward Compatibility" (Client Behavior):
Clients MUST NOT send any further extension-specific messages until they receive a RequestExtensions.Success or RequestExtensions.Error response.
Since the client hasn't opened any mining channels or sent any other messages at this point, the server has nothing else it could legitimately send us. The spec mandates the server respond to RequestExtensions before any other protocol-specific messages flow.
At this point, setup_connection() is the only consumer of upstream_receiver. The main message dispatch loop (run_upstream_task) is not spawned until setup_connection() returns successfully — so no other code can race to consume the response.
Even given the above guarantees, handle_extension_response() explicitly validates msg_type against MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS (0x01) and MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR (0x02), with a catch-all _ arm that returns an UnexpectedMessage error. So a misbehaving server sending an unexpected message type would be caught and fail the connection setup.
We could additionally check ext_type == 0x0001 in the header to fully verify it's an Extensions Negotiation message, though in practice this is redundant given the ordering guarantee.
There was a problem hiding this comment.
Why not calling the handle_extensions_message_frame_from_server here, instead of implementing the handler again here in the handle_extension_response you created?
What I would do is the following:
- I receive a frame from the upstream
- I check it's an extension message
- If it is, I call
handle_extensions_message_frame_from_serverwhich internally calls the trait implementations we have in theextensions_message_handler.rs.
There was a problem hiding this comment.
Ah I see the duplication of code now. I'll refactor and ping when ready for review
Co-authored-by: Gabriele Vernetti <62447440+GitGab19@users.noreply.github.com>
…rAsync trait In both JDC and Translator, `handle_extension_response` was manually reimplementing the parsing and validation logic already present in each `ChannelManager`'s `extensions_message_handler.rs` trait impl. Replace that duplicated logic by calling `handle_extensions_message_frame_from_server` directly on the `ChannelManager`, which internally dispatches to the existing `handle_request_extensions_success` / `handle_request_extensions_error` handlers. To support this, thread `channel_manager: &mut ChannelManager` through the call chains: start() → setup_connection() → negotiate_extensions() → handle_extension_response() The `ChannelManager` is now created before `initialize_upstream` / `upstream.start()` so the trait impl has access to it during extension negotiation and can store the result internally. The caller no longer needs to pass negotiated extensions back to the `ChannelManager` after the fact. A retry forwarding loop is added to `negotiate_extensions`: after each `handle_extension_response` call, if the trait impl sent a retry `RequestExtensions` frame via its upstream channel, the frame is detected via `try_recv()` and forwarded to the pool before looping to await the next response.
4a547bd to
11202e5
Compare
| /// validation and stores the negotiated extensions. On a `RequestExtensionsError` | ||
| /// where the server requires extensions we support, it sends a retry `RequestExtensions` | ||
| /// via its upstream channel. | ||
| async fn handle_extension_response( |
There was a problem hiding this comment.
| async fn handle_extension_response( | |
| async fn handle_request_extensions_response( |
| /// Sends RequestExtensions and waits for the response. | ||
| /// | ||
| /// This method handles the extension negotiation flow: | ||
| /// 1. Sends RequestExtensions with required extensions | ||
| /// 2. Waits for RequestExtensionsSuccess or RequestExtensionsError | ||
| /// 3. Delegates response handling to the `ChannelManager` via its | ||
| /// `HandleExtensionsFromServerAsync` trait implementation | ||
| /// 4. If the server requires additional extensions we support, the ChannelManager | ||
| /// sends a retry `RequestExtensions`; this method detects and forwards it | ||
| /// | ||
| /// # Returns | ||
| /// * `Ok(Vec<u16>)` - The list of successfully negotiated extensions | ||
| /// * `Err(TproxyError)` - Extension negotiation failed | ||
| async fn negotiate_extensions( | ||
| &mut self, | ||
| channel_manager: &mut ChannelManager, | ||
| ) -> TproxyResult<Vec<u16>, error::Upstream> { | ||
| let request_extensions = RequestExtensions { | ||
| request_id: 1, | ||
| requested_extensions: Seq064K::new(self.required_extensions.clone()).unwrap(), | ||
| }; | ||
|
|
||
| let sv2_frame: Sv2Frame = AnyMessage::Extensions(request_extensions.into_static().into()) | ||
| .try_into() | ||
| .map_err(TproxyError::shutdown)?; | ||
|
|
||
| info!( | ||
| "Sending RequestExtensions to upstream with required extensions: {:?}", | ||
| self.required_extensions | ||
| ); | ||
|
|
||
| self.upstream_channel_state | ||
| .upstream_sender | ||
| .send(sv2_frame) | ||
| .await | ||
| .map_err(|e| { | ||
| error!("Failed to send RequestExtensions to upstream: {:?}", e); | ||
| TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) | ||
| })?; | ||
|
|
||
| loop { | ||
| // Wait for extension negotiation response with timeout | ||
| let response = tokio::time::timeout( | ||
| Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), | ||
| self.upstream_channel_state.upstream_receiver.recv(), | ||
| ) | ||
| .await | ||
| .map_err(|_| { | ||
| error!( | ||
| "Extension negotiation timed out after {} seconds", | ||
| EXTENSION_NEGOTIATION_TIMEOUT_SECS | ||
| ); | ||
| TproxyError::fallback(TproxyErrorKind::ExtensionNegotiationTimeout) | ||
| })? | ||
| .map_err(|e| { | ||
| error!("Failed to receive extension negotiation response: {}", e); | ||
| TproxyError::fallback(e) | ||
| })?; | ||
|
|
||
| // Delegate response handling to the ChannelManager's trait implementation. | ||
| // This validates required extensions and may send a retry RequestExtensions | ||
| // if the server requires extensions we support. | ||
| self.handle_extension_response(response, channel_manager) | ||
| .await?; | ||
|
|
||
| // If the ChannelManager sent a retry RequestExtensions (via its upstream_sender), | ||
| // pick it up and forward it directly to the pool, then loop to await the next response. | ||
| if let Ok(retry_frame) = self | ||
| .upstream_channel_state | ||
| .channel_manager_receiver | ||
| .try_recv() | ||
| { | ||
| info!("Forwarding retry RequestExtensions to upstream pool..."); | ||
| self.upstream_channel_state | ||
| .upstream_sender | ||
| .send(retry_frame) | ||
| .await | ||
| .map_err(|e| { | ||
| error!("Failed to forward retry RequestExtensions to pool: {:?}", e); | ||
| TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) | ||
| })?; | ||
| continue; | ||
| } | ||
|
|
||
| // No retry pending — negotiation is complete. | ||
| // Return the extensions stored by the ChannelManager's trait implementation. | ||
| return channel_manager | ||
| .get_negotiated_extensions_with_server(None) | ||
| .map_err(|e| TproxyError::fallback(e.kind)); | ||
| } | ||
| } | ||
|
|
||
| /// Checks that the response is an extension message and delegates handling to the | ||
| /// `ChannelManager` via its `HandleExtensionsFromServerAsync` trait implementation. | ||
| /// | ||
| /// The ChannelManager's implementation in `extensions_message_handler.rs` performs | ||
| /// validation and stores the negotiated extensions. On a `RequestExtensionsError` | ||
| /// where the server requires extensions we support, it sends a retry `RequestExtensions` | ||
| /// via its upstream channel. | ||
| async fn handle_extension_response( | ||
| &mut self, | ||
| mut response: Sv2Frame, | ||
| channel_manager: &mut ChannelManager, | ||
| ) -> TproxyResult<(), error::Upstream> { | ||
| let header = response.get_header().ok_or_else(|| { | ||
| error!("Extension response frame missing header"); | ||
| TproxyError::fallback(TproxyErrorKind::UnexpectedMessage(0, 0)) | ||
| })?; | ||
|
|
||
| channel_manager | ||
| .handle_extensions_message_frame_from_server(None, header, response.payload()) | ||
| .await | ||
| .map_err(|e| TproxyError::fallback(e.kind))?; | ||
|
|
There was a problem hiding this comment.
Given that these two functions are exactly the same for both JDC and tProxy, shouldn't we have them in stratum_apps::extensions_negotiation and call them from both JDC and tProxy?
Fixes the extension negotiation race condition where Translator and JDC would start accepting downstream connections before knowing which extensions were negotiated with the upstream server.