Skip to content
135 changes: 67 additions & 68 deletions miner-apps/Cargo.lock

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions miner-apps/jd-client/src/lib/channel_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,16 @@ impl ChannelManager {
Ok(channel_manager)
}

/// Sets the negotiated extensions.
///
/// This is used after upstream connection setup to store the extensions
/// that were successfully negotiated with the upstream server.
pub fn set_negotiated_extensions(&self, extensions: Vec<u16>) {
self.channel_manager_data.super_safe_lock(|data| {
data.negotiated_extensions = extensions;
});
}

// Bootstraps a group channel with the given parameters.
// Returns a `GroupChannel` if successful, otherwise returns `None`.
//
Expand Down
5 changes: 5 additions & 0 deletions miner-apps/jd-client/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ pub enum JDCErrorKind {
RequiredExtensionsNotSupported(Vec<u16>),
/// Server requires extensions that the translator doesn't support
ServerRequiresUnsupportedExtensions(Vec<u16>),
/// Extension negotiation timed out waiting for response
ExtensionNegotiationTimeout,
/// BitcoinCoreSv2 cancellation token activated
BitcoinCoreSv2CancellationTokenActivated,
/// Failed to create BitcoinCoreSv2 tokio runtime
Expand Down Expand Up @@ -368,6 +370,9 @@ impl fmt::Display for JDCErrorKind {
ServerRequiresUnsupportedExtensions(extensions) => {
write!(f, "Server requires extensions that the translator doesn't support: {extensions:?}")
}
ExtensionNegotiationTimeout => {
write!(f, "Extension negotiation timed out waiting for response")
}
BitcoinCoreSv2CancellationTokenActivated => {
write!(f, "BitcoinCoreSv2 cancellation token activated")
}
Expand Down
10 changes: 9 additions & 1 deletion miner-apps/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,17 +289,24 @@ impl JobDeclaratorClient {
.await
{
Ok((upstream, job_declarator)) => {
upstream
// Start upstream and wait for extension negotiation to complete.
let negotiated_extensions = upstream
.start(
self.config.min_supported_version(),
self.config.max_supported_version(),
self.cancellation_token.clone(),
fallback_coordinator.clone(),
status_sender.clone(),
task_manager.clone(),
&mut channel_manager_clone,
)
.await;

info!(
"Upstream extension negotiation complete. Negotiated extensions: {:?}",
negotiated_extensions
);

job_declarator
.start(
self.cancellation_token.clone(),
Expand Down Expand Up @@ -457,6 +464,7 @@ impl JobDeclaratorClient {
fallback_coordinator.clone(),
status_sender.clone(),
task_manager.clone(),
&mut channel_manager,
)
.await;

Expand Down
162 changes: 136 additions & 26 deletions miner-apps/jd-client/src/lib/upstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
//! Responsibilities:
//! - Establish a TCP + Noise encrypted connection to upstream
//! - Perform `SetupConnection` handshake
//! - Negotiate extensions synchronously before returning
//! - Forward SV2 mining messages between upstream and channel manager
//! - Handle common messages from upstream

use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc, time::Duration};

use async_channel::{unbounded, Receiver, Sender};
use bitcoin_core_sv2::CancellationToken;
Expand All @@ -18,8 +19,11 @@ use stratum_apps::{
fallback_coordinator::FallbackCoordinator,
network_helpers::{connect_with_noise, resolve_host},
stratum_core::{
binary_sv2::Seq064K, extensions_sv2::RequestExtensions, framing_sv2,
handlers_sv2::HandleCommonMessagesFromServerAsync, parsers_sv2::AnyMessage,
binary_sv2::Seq064K,
extensions_sv2::RequestExtensions,
framing_sv2,
handlers_sv2::{HandleCommonMessagesFromServerAsync, HandleExtensionsFromServerAsync},
parsers_sv2::AnyMessage,
},
task_manager::TaskManager,
utils::{
Expand All @@ -31,12 +35,16 @@ use tokio::net::TcpStream;
use tracing::{debug, error, info, warn};

use crate::{
channel_manager::ChannelManager,
error::{self, JDCError, JDCErrorKind, JDCResult},
io_task::spawn_io_tasks,
status::{handle_error, Status, StatusSender},
utils::{get_setup_connection_message, UpstreamEntry},
};

/// Timeout for extension negotiation response (10 seconds)
const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 10;

mod message_handler;

/// Placeholder for future upstream-specific data/state.
Expand Down Expand Up @@ -151,11 +159,18 @@ impl Upstream {
/// Perform `SetupConnection` handshake with upstream.
///
/// Sends [`SetupConnection`] and awaits response.
/// If required extensions are configured, negotiates them synchronously
/// before returning.
///
/// # Returns
/// * `Ok(Vec<u16>)` - The list of negotiated extensions (empty if none were requested)
/// * `Err(JDCError)` - Error during handshake or extension negotiation
pub async fn setup_connection(
&mut self,
min_version: u16,
max_version: u16,
) -> JDCResult<(), error::Upstream> {
channel_manager: &mut ChannelManager,
) -> JDCResult<Vec<u16>, error::Upstream> {
info!("Upstream: initiating SV2 handshake...");
let setup_connection =
get_setup_connection_message(min_version, max_version, &self.address)
Expand Down Expand Up @@ -197,25 +212,32 @@ impl Upstream {
.await?;

// Send RequestExtensions after successful SetupConnection if there are required extensions
// and wait for the response before returning
if !self.required_extensions.is_empty() {
self.send_request_extensions().await?;
let negotiated = self.negotiate_extensions(channel_manager).await?;
return Ok(negotiated);
}

Ok(())
Ok(vec![])
}

/// Send `RequestExtensions` message to upstream.
/// The supported extensions are stored for potential retry if the server requires additional
/// extensions.
async fn send_request_extensions(&mut self) -> JDCResult<(), error::Upstream> {
info!(
"Sending RequestExtensions to upstream with required extensions: {:?}",
self.required_extensions
);
if self.required_extensions.is_empty() {
return Ok(());
}

/// Sends RequestExtensions and waits for the response.
///
/// This method handles the extension negotiation flow:
/// 1. Sends RequestExtensions with required extensions
/// 2. Waits for RequestExtensionsSuccess or RequestExtensionsError
/// 3. Delegates response handling to the `ChannelManager` via its
/// `HandleExtensionsFromServerAsync` trait implementation
/// 4. If the server requires additional extensions we support, the ChannelManager
/// sends a retry `RequestExtensions`; this method detects and forwards it
///
/// # Returns
/// * `Ok(Vec<u16>)` - The list of successfully negotiated extensions
/// * `Err(JDCError)` - Extension negotiation failed
async fn negotiate_extensions(
&mut self,
channel_manager: &mut ChannelManager,
) -> JDCResult<Vec<u16>, error::Upstream> {
let requested_extensions =
Seq064K::new(self.required_extensions.clone()).map_err(JDCError::shutdown)?;

Expand All @@ -242,18 +264,91 @@ impl Upstream {
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
})?;

info!("Sent RequestExtensions to upstream");
loop {
// Wait for extension negotiation response with timeout
let response = tokio::time::timeout(
Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS),
self.upstream_channel.upstream_receiver.recv(),
)
.await
.map_err(|_| {
error!(
"Extension negotiation timed out after {} seconds",
EXTENSION_NEGOTIATION_TIMEOUT_SECS
);
JDCError::fallback(JDCErrorKind::ExtensionNegotiationTimeout)
})?
.map_err(|e| {
error!("Failed to receive extension negotiation response: {}", e);
JDCError::fallback(e)
})?;

// Delegate response handling to the ChannelManager's trait implementation.
// This checks the frame, validates required extensions, and may send a
// retry RequestExtensions if the server requires extensions we support.
self.handle_extension_response(response, channel_manager)
.await?;

// If the ChannelManager sent a retry RequestExtensions (via its upstream_sender),
// pick it up and forward it directly to the pool, then loop to await the next response.
if let Ok(retry_frame) = self.upstream_channel.channel_manager_receiver.try_recv() {
info!("Forwarding retry RequestExtensions to upstream pool...");
self.upstream_channel
.upstream_sender
.send(retry_frame)
.await
.map_err(|e| {
error!(?e, "Failed to forward retry RequestExtensions to pool");
JDCError::fallback(JDCErrorKind::ChannelErrorSender)
})?;
continue;
}

// No retry pending — negotiation is complete.
// Return the extensions stored by the ChannelManager's trait implementation.
return channel_manager
.get_negotiated_extensions_with_server(None)
.map_err(|e| JDCError::fallback(e.kind));
}
}

/// Checks that the response is an extension message and delegates handling to the
/// `ChannelManager` via its `HandleExtensionsFromServerAsync` trait implementation.
///
/// The ChannelManager's implementation in `extensions_message_handler.rs` performs
/// validation and stores the negotiated extensions. On a `RequestExtensionsError`
/// where the server requires extensions we support, it sends a retry `RequestExtensions`
/// via its upstream channel.
async fn handle_extension_response(
&mut self,
mut response: Sv2Frame,
channel_manager: &mut ChannelManager,
) -> JDCResult<(), error::Upstream> {
let header = response.get_header().ok_or_else(|| {
error!("Extension response frame missing header");
JDCError::fallback(JDCErrorKind::UnexpectedMessage(0, 0))
})?;

channel_manager
.handle_extensions_message_frame_from_server(None, header, response.payload())
.await
.map_err(|e| JDCError::fallback(e.kind))?;

Ok(())
}

/// Start unified upstream loop.
///
/// Responsibilities:
/// - Run `setup_connection`
/// - Run `setup_connection` (including extension negotiation)
/// - Handle messages from upstream (pool) and channel manager
/// - React to shutdown signals
///
/// This function spawns an async task and returns immediately.
/// This function spawns an async task and returns the negotiated extensions.
///
/// # Returns
/// * `Vec<u16>` - The list of negotiated extensions (empty if none were requested or setup
/// failed)
#[allow(clippy::too_many_arguments)]
pub async fn start(
mut self,
Expand All @@ -263,13 +358,26 @@ impl Upstream {
fallback_coordinator: FallbackCoordinator,
status_sender: Sender<Status>,
task_manager: Arc<TaskManager>,
) {
channel_manager: &mut ChannelManager,
) -> Vec<u16> {
let status_sender = StatusSender::Upstream(status_sender);

if let Err(e) = self.setup_connection(min_version, max_version).await {
error!(error = ?e, "Upstream: connection setup failed.");
return;
}
let negotiated_extensions = match self
.setup_connection(min_version, max_version, channel_manager)
.await
{
Ok(extensions) => {
info!(
"Upstream: extension negotiation complete. Extensions: {:?}",
extensions
);
extensions
}
Err(e) => {
error!(error = ?e, "Upstream: connection setup failed.");
return vec![];
}
};

task_manager.spawn(async move {
// we just spawned a new task that's relevant to fallback coordination
Expand Down Expand Up @@ -315,6 +423,8 @@ impl Upstream {
// signal fallback coordinator that this task has completed its cleanup
fallback_handler.done();
});

negotiated_extensions
}

// Handle incoming frames from upstream (pool).
Expand Down
5 changes: 5 additions & 0 deletions miner-apps/translator/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub enum TproxyErrorKind {
RequiredExtensionsNotSupported(Vec<u16>),
/// Server requires extensions that the translator doesn't support
ServerRequiresUnsupportedExtensions(Vec<u16>),
/// Extension negotiation timed out waiting for response
ExtensionNegotiationTimeout,
/// Represents a generic channel send failure, described by a string.
General(String),
/// Error bubbling up from translator-core library
Expand Down Expand Up @@ -256,6 +258,9 @@ impl fmt::Display for TproxyErrorKind {
extensions
)
}
ExtensionNegotiationTimeout => {
write!(f, "Extension negotiation timed out waiting for response")
}
SV1Error => write!(f, "Sv1 error"),
TranslatorCore(ref e) => write!(f, "Translator core error: {e:?}"),
NetworkHelpersError(ref e) => write!(f, "Network helpers error: {e:?}"),
Expand Down
Loading