Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions integration-tests/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions miner-apps/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions miner-apps/jd-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ path = "src/lib/mod.rs"
[dependencies]
stratum-apps = { path = "../../stratum-apps", features = ["jd_client"] }
async-channel = "1.5.1"
dashmap = "6.1.0"
serde = { version = "1.0.89", default-features = false, features = ["derive", "alloc"] }
tokio = { version = "1.44.1", features = ["full"] }
ext-config = { version = "0.14.0", features = ["toml"], package = "config" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,35 @@ impl RouteMessageTo<'_> {
) -> Result<(), JDCErrorKind> {
match self {
RouteMessageTo::Downstream((downstream_id, message)) => {
_ = channel_manager_channel.downstream_sender.send((
downstream_id,
message.into_static(),
None,
));
let sender = channel_manager_channel
.downstream_sender
.get(&downstream_id)
.map(|r| r.value().clone());
if let Some(sender) = sender {
sender.send((message.into_static(), None)).await?;
}
}
RouteMessageTo::Upstream(message) => {
if get_jd_mode() != JdMode::SoloMining {
let message_static = message.into_static();
let sv2_frame: Sv2Frame = AnyMessage::Mining(message_static).try_into()?;
_ = channel_manager_channel
channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await;
.await?;
}
}
RouteMessageTo::JobDeclarator(message) => {
_ = channel_manager_channel
channel_manager_channel
.jd_sender
.send(message.into_static())
.await;
.await?;
}
RouteMessageTo::TemplateProvider(message) => {
_ = channel_manager_channel
channel_manager_channel
.tp_sender
.send(message.into_static())
.await;
.await?;
}
}
Ok(())
Expand Down Expand Up @@ -460,9 +462,15 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
})
})?;

for messages in messages {
let _ = messages.forward(&self.channel_manager_channel).await;
for message in messages {
// A send can only fail if the receiver side of the channel is closed.
// Since this is an unbounded channel, it cannot fail due to capacity
// limits (which would only apply to bounded channels).
if let Err(e) = message.forward(&self.channel_manager_channel).await {
tracing::error!("Failed to forward message {e:?}");
}
}

Ok(())
}

Expand Down Expand Up @@ -723,8 +731,13 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
})
})?;

for messages in messages {
let _ = messages.forward(&self.channel_manager_channel).await;
for message in messages {
// A send can only fail if the receiver side of the channel is closed.
// Since this is an unbounded channel, it cannot fail due to capacity
// limits (which would only apply to bounded channels).
if let Err(e) = message.forward(&self.channel_manager_channel).await {
tracing::error!("Failed to forward message {e:?}");
}
}

Ok(())
Expand Down Expand Up @@ -918,8 +931,13 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
messages
});

for messages in messages {
let _ = messages.forward(&self.channel_manager_channel).await;
for message in messages {
// A send can only fail if the receiver side of the channel is closed.
// Since this is an unbounded channel, it cannot fail due to capacity
// limits (which would only apply to bounded channels).
if let Err(e) = message.forward(&self.channel_manager_channel).await {
tracing::error!("Failed to forward message {e:?}");
}
}

Ok(())
Expand Down Expand Up @@ -1153,8 +1171,13 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
})
})?;

for messages in messages {
let _ = messages.forward(&self.channel_manager_channel).await;
for message in messages {
// A send can only fail if the receiver side of the channel is closed.
// Since this is an unbounded channel, it cannot fail due to capacity
// limits (which would only apply to bounded channels).
if let Err(e) = message.forward(&self.channel_manager_channel).await {
tracing::error!("Failed to forward message {e:?}");
}
}

Ok(())
Expand Down Expand Up @@ -1411,8 +1434,13 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager {
})
})?;

for messages in messages {
_ = messages.forward(&self.channel_manager_channel).await;
for message in messages {
// A send can only fail if the receiver side of the channel is closed.
// Since this is an unbounded channel, it cannot fail due to capacity
// limits (which would only apply to bounded channels).
if let Err(e) = message.forward(&self.channel_manager_channel).await {
tracing::error!("Failed to forward message {e:?}");
}
}

Ok(())
Expand Down
36 changes: 21 additions & 15 deletions miner-apps/jd-client/src/lib/channel_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use std::{
},
};

use async_channel::{Receiver, Sender};
use async_channel::{unbounded, Receiver, Sender};
use bitcoin_core_sv2::CancellationToken;
use dashmap::DashMap;
use stratum_apps::{
coinbase_output_constraints::coinbase_output_constraints_message,
custom_mutex::Mutex,
Expand Down Expand Up @@ -55,7 +56,7 @@ use stratum_apps::{
},
},
};
use tokio::{net::TcpListener, select, sync::broadcast};
use tokio::{net::TcpListener, select};
use tracing::{debug, error, info, warn};

use crate::{
Expand All @@ -65,8 +66,8 @@ use crate::{
error::{self, JDCError, JDCErrorKind, JDCResult},
status::{handle_error, Status, StatusSender},
utils::{
AtomicUpstreamState, DownstreamChannelJobId, PendingChannelRequest, SharesOrderedByDiff,
UpstreamState,
AtomicUpstreamState, DownstreamChannelJobId, DownstreamMessage, PendingChannelRequest,
SharesOrderedByDiff, UpstreamState,
},
};
pub mod downstream_message_handler;
Expand Down Expand Up @@ -246,7 +247,7 @@ pub struct ChannelManagerChannel {
jd_receiver: Receiver<JobDeclaration<'static>>,
tp_sender: Sender<TemplateDistribution<'static>>,
tp_receiver: Receiver<TemplateDistribution<'static>>,
downstream_sender: broadcast::Sender<(DownstreamId, Mining<'static>, Option<Vec<Tlv>>)>,
downstream_sender: Arc<DashMap<DownstreamId, Sender<DownstreamMessage>>>,
downstream_receiver: Receiver<(DownstreamId, Mining<'static>, Option<Vec<Tlv>>)>,
}

Expand Down Expand Up @@ -281,7 +282,6 @@ impl ChannelManager {
jd_receiver: Receiver<JobDeclaration<'static>>,
tp_sender: Sender<TemplateDistribution<'static>>,
tp_receiver: Receiver<TemplateDistribution<'static>>,
downstream_sender: broadcast::Sender<(DownstreamId, Mining<'static>, Option<Vec<Tlv>>)>,
downstream_receiver: Receiver<(DownstreamId, Mining<'static>, Option<Vec<Tlv>>)>,
coinbase_outputs: Vec<u8>,
supported_extensions: Vec<u16>,
Expand Down Expand Up @@ -337,7 +337,7 @@ impl ChannelManager {
jd_receiver,
tp_sender,
tp_receiver,
downstream_sender,
downstream_sender: Arc::new(DashMap::new()),
downstream_receiver,
};

Expand Down Expand Up @@ -437,11 +437,6 @@ impl ChannelManager {
fallback_coordinator: FallbackCoordinator,
status_sender: Sender<Status>,
channel_manager_sender: Sender<(DownstreamId, Mining<'static>, Option<Vec<Tlv>>)>,
channel_manager_receiver: broadcast::Sender<(
DownstreamId,
Mining<'static>,
Option<Vec<Tlv>>,
)>,
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
) -> JDCResult<(), error::ChannelManager> {
Expand Down Expand Up @@ -493,7 +488,6 @@ impl ChannelManager {
let fallback_coordinator_inner = fallback_coordinator.clone();
let status_sender_inner = status_sender.clone();
let channel_manager_sender_inner = channel_manager_sender.clone();
let channel_manager_receiver_inner = channel_manager_receiver.clone();
let task_manager_inner = task_manager_clone.clone();
let supported_extensions_inner = supported_extensions.clone();
let required_extensions_inner = required_extensions.clone();
Expand Down Expand Up @@ -531,12 +525,14 @@ impl ChannelManager {
}
};

let (channel_manager_sender_ds, channel_manager_receiver_ds) = unbounded();

let downstream = Downstream::new(
downstream_id,
channel_id_factory,
group_channel,
channel_manager_sender_inner,
channel_manager_receiver_inner,
channel_manager_receiver_ds,
noise_stream,
cancellation_token_inner.clone(),
fallback_coordinator_inner.clone(),
Expand All @@ -545,6 +541,8 @@ impl ChannelManager {
required_extensions_inner,
);

this.channel_manager_channel.downstream_sender.insert(downstream_id, channel_manager_sender_ds);

this.channel_manager_data.super_safe_lock(|data| {
data.downstream.insert(downstream_id, downstream.clone());
});
Expand Down Expand Up @@ -687,6 +685,9 @@ impl ChannelManager {
.vardiff
.retain(|key, _| key.downstream_id != downstream_id);
});
self.channel_manager_channel
.downstream_sender
.remove(&downstream_id);
Ok(())
}

Expand Down Expand Up @@ -1187,7 +1188,12 @@ impl ChannelManager {
});

for message in messages {
let _ = message.forward(&self.channel_manager_channel).await;
// A send can only fail if the receiver side of the channel is closed.
// Since this is an unbounded channel, it cannot fail due to capacity
// limits (which would only apply to bounded channels).
if let Err(e) = message.forward(&self.channel_manager_channel).await {
tracing::error!("Failed to forward message {e:?}");
}
}

info!("Vardiff update cycle complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,12 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
}

for message in messages {
let _ = message.forward(&self.channel_manager_channel).await;
// A send can only fail if the receiver side of the channel is closed.
// Since this is an unbounded channel, it cannot fail due to capacity
// limits (which would only apply to bounded channels).
if let Err(e) = message.forward(&self.channel_manager_channel).await {
tracing::error!("Failed to forward message {e:?}");
}
}

Ok(())
Expand Down Expand Up @@ -606,7 +611,12 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
}

for message in messages {
let _ = message.forward(&self.channel_manager_channel).await;
// A send can only fail if the receiver side of the channel is closed.
// Since this is an unbounded channel, it cannot fail due to capacity
// limits (which would only apply to bounded channels).
if let Err(e) = message.forward(&self.channel_manager_channel).await {
tracing::error!("Failed to forward message {e:?}");
}
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,12 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
})?;

for message in messages_results.into_iter().flatten() {
let _ = message.forward(&self.channel_manager_channel).await;
// A send can only fail if the receiver side of the channel is closed.
// Since this is an unbounded channel, it cannot fail due to capacity
// limits (which would only apply to bounded channels).
if let Err(e) = message.forward(&self.channel_manager_channel).await {
tracing::error!("Failed to forward message {e:?}");
}
}
Ok(())
}
Expand Down
27 changes: 23 additions & 4 deletions miner-apps/jd-client/src/lib/downstream/common_message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use stratum_apps::{
},
utils::types::Sv2Frame,
};
use tracing::info;
use tracing::{error, info};

#[cfg_attr(not(test), hotpath::measure_all)]
impl HandleCommonMessagesFromClientAsync for Downstream {
Expand Down Expand Up @@ -69,7 +69,12 @@ impl HandleCommonMessagesFromClientAsync for Downstream {
let frame: Sv2Frame = AnyMessage::Common(response.into_static().into())
.try_into()
.map_err(JDCError::shutdown)?;
_ = self.downstream_channel.downstream_sender.send(frame).await;
if let Err(e) = self.downstream_channel.downstream_sender.send(frame).await {
error!(
"Failed to send SetupConnectionError to downstream {}: {e}",
self.downstream_id
);
}

return Err(JDCError::disconnect(
JDCErrorKind::SetupConnectionError,
Expand All @@ -89,7 +94,12 @@ impl HandleCommonMessagesFromClientAsync for Downstream {
let frame: Sv2Frame = AnyMessage::Common(response.into_static().into())
.try_into()
.map_err(JDCError::shutdown)?;
_ = self.downstream_channel.downstream_sender.send(frame).await;
if let Err(e) = self.downstream_channel.downstream_sender.send(frame).await {
error!(
"Failed to send SetupConnectionError to downstream {}: {e}",
self.downstream_id
);
}

return Err(JDCError::disconnect(
JDCErrorKind::SetupConnectionError,
Expand All @@ -109,7 +119,16 @@ impl HandleCommonMessagesFromClientAsync for Downstream {
.try_into()
.map_err(JDCError::shutdown)?;

_ = self.downstream_channel.downstream_sender.send(frame).await;
if let Err(e) = self.downstream_channel.downstream_sender.send(frame).await {
error!(
"Failed to send SetupConnectionSuccess to downstream {}: {e}",
self.downstream_id
);
return Err(JDCError::disconnect(
JDCErrorKind::ChannelErrorSender,
self.downstream_id,
));
}

Ok(())
}
Expand Down
Loading
Loading