Skip to content

Commit 74fa196

Browse files
authored
Merge pull request #228 from Shourya742/2026-02-02-migrate-tproxy-to-dashmap
Migrate tproxy to dashmap
2 parents b6a8f73 + 4766cf7 commit 74fa196

15 files changed

Lines changed: 1017 additions & 1368 deletions

File tree

miner-apps/translator/src/lib/mod.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,12 @@ impl TranslatorSv2 {
105105
self.config.downstream_port,
106106
);
107107

108-
let sv1_server = Sv1Server::new(
108+
let sv1_server = Arc::new(Sv1Server::new(
109109
downstream_addr,
110110
channel_manager_to_sv1_server_receiver,
111111
sv1_server_to_channel_manager_sender,
112112
self.config.clone(),
113-
);
113+
));
114114

115115
info!("Initializing upstream connection...");
116116

@@ -143,14 +143,15 @@ impl TranslatorSv2 {
143143
));
144144

145145
info!("Launching ChannelManager tasks...");
146-
ChannelManager::run_channel_manager_tasks(
147-
channel_manager.clone(),
148-
notify_shutdown.clone(),
149-
shutdown_complete_tx.clone(),
150-
status_sender.clone(),
151-
task_manager.clone(),
152-
)
153-
.await;
146+
channel_manager
147+
.clone()
148+
.run_channel_manager_tasks(
149+
notify_shutdown.clone(),
150+
shutdown_complete_tx.clone(),
151+
status_sender.clone(),
152+
task_manager.clone(),
153+
)
154+
.await;
154155

155156
// Start monitoring server if configured
156157
if let Some(monitoring_addr) = self.config.monitoring_address() {
@@ -166,7 +167,7 @@ impl TranslatorSv2 {
166167
* handled separately) */
167168
)
168169
.expect("Failed to initialize monitoring server")
169-
.with_sv1_monitoring(Arc::new(sv1_server.clone())) // SV1 client connections
170+
.with_sv1_monitoring(sv1_server.clone()) // SV1 client connections
170171
.expect("Failed to add SV1 monitoring");
171172

172173
// Create shutdown signal that waits for ShutdownAll
@@ -281,7 +282,7 @@ impl TranslatorSv2 {
281282
status_sender: Sender<Status>,
282283
shutdown_complete_tx: mpsc::Sender<()>,
283284
task_manager: Arc<TaskManager>,
284-
sv1_server_instance: Sv1Server,
285+
sv1_server_instance: Arc<Sv1Server>,
285286
required_extensions: Vec<u16>,
286287
) -> Result<(), TproxyErrorKind> {
287288
const MAX_RETRIES: usize = 3;

miner-apps/translator/src/lib/monitoring.rs

Lines changed: 56 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -13,71 +13,67 @@ impl ServerMonitoring for ChannelManager {
1313
let mut extended_channels = Vec::new();
1414
let standard_channels = Vec::new(); // tProxy only uses extended channels
1515

16-
self.channel_manager_data
17-
.safe_lock(|d| {
18-
match tproxy_mode() {
19-
TproxyMode::Aggregated => {
16+
match tproxy_mode() {
17+
TproxyMode::Aggregated => {
18+
self.upstream_extended_channel
19+
.super_safe_lock(|upstream_channel| {
2020
// In Aggregated mode: one shared upstream channel to the server
21-
if let Some(upstream_channel) = &d.upstream_extended_channel {
22-
if let Ok(channel_lock) = upstream_channel.read() {
23-
let channel_id = channel_lock.get_channel_id();
24-
let target = channel_lock.get_target();
25-
let extranonce_prefix = channel_lock.get_extranonce_prefix();
26-
let user_identity = channel_lock.get_user_identity();
27-
let share_accounting = channel_lock.get_share_accounting();
21+
if let Some(upstream_channel) = upstream_channel.as_mut() {
22+
let channel_id = upstream_channel.get_channel_id();
23+
let target = upstream_channel.get_target();
24+
let extranonce_prefix = upstream_channel.get_extranonce_prefix();
25+
let user_identity = upstream_channel.get_user_identity();
26+
let share_accounting = upstream_channel.get_share_accounting();
2827

29-
extended_channels.push(ServerExtendedChannelInfo {
30-
channel_id,
31-
user_identity: user_identity.clone(),
32-
nominal_hashrate: channel_lock.get_nominal_hashrate(),
33-
target_hex: hex::encode(target.to_be_bytes()),
34-
extranonce_prefix_hex: hex::encode(extranonce_prefix),
35-
full_extranonce_size: channel_lock.get_full_extranonce_size(),
36-
rollable_extranonce_size: channel_lock
37-
.get_rollable_extranonce_size(),
38-
version_rolling: channel_lock.is_version_rolling(),
39-
shares_accepted: share_accounting.get_shares_accepted(),
40-
share_work_sum: share_accounting.get_share_work_sum(),
41-
last_share_sequence_number: share_accounting
42-
.get_last_share_sequence_number(),
43-
best_diff: share_accounting.get_best_diff(),
44-
});
45-
}
28+
extended_channels.push(ServerExtendedChannelInfo {
29+
channel_id,
30+
user_identity: user_identity.clone(),
31+
nominal_hashrate: upstream_channel.get_nominal_hashrate(),
32+
target_hex: hex::encode(target.to_be_bytes()),
33+
extranonce_prefix_hex: hex::encode(extranonce_prefix),
34+
full_extranonce_size: upstream_channel.get_full_extranonce_size(),
35+
rollable_extranonce_size: upstream_channel
36+
.get_rollable_extranonce_size(),
37+
version_rolling: upstream_channel.is_version_rolling(),
38+
shares_accepted: share_accounting.get_shares_accepted(),
39+
share_work_sum: share_accounting.get_share_work_sum(),
40+
last_share_sequence_number: share_accounting
41+
.get_last_share_sequence_number(),
42+
best_diff: share_accounting.get_best_diff(),
43+
});
4644
}
47-
}
48-
TproxyMode::NonAggregated => {
49-
// In NonAggregated mode: each downstream Sv1 miner has its own upstream Sv2
50-
// channel to the server
51-
for (_channel_id, extended_channel) in d.extended_channels.iter() {
52-
if let Ok(channel_lock) = extended_channel.read() {
53-
let channel_id = channel_lock.get_channel_id();
54-
let target = channel_lock.get_target();
55-
let extranonce_prefix = channel_lock.get_extranonce_prefix();
56-
let user_identity = channel_lock.get_user_identity();
57-
let share_accounting = channel_lock.get_share_accounting();
45+
});
46+
}
47+
TproxyMode::NonAggregated => {
48+
// In NonAggregated mode: each downstream Sv1 miner has its own upstream Sv2
49+
// channel to the server
50+
for channel in self.extended_channels.iter() {
51+
let extended_channel = channel.value();
5852

59-
extended_channels.push(ServerExtendedChannelInfo {
60-
channel_id,
61-
user_identity: user_identity.clone(),
62-
nominal_hashrate: channel_lock.get_nominal_hashrate(),
63-
target_hex: hex::encode(target.to_be_bytes()),
64-
extranonce_prefix_hex: hex::encode(extranonce_prefix),
65-
full_extranonce_size: channel_lock.get_full_extranonce_size(),
66-
rollable_extranonce_size: channel_lock
67-
.get_rollable_extranonce_size(),
68-
version_rolling: channel_lock.is_version_rolling(),
69-
shares_accepted: share_accounting.get_shares_accepted(),
70-
share_work_sum: share_accounting.get_share_work_sum(),
71-
last_share_sequence_number: share_accounting
72-
.get_last_share_sequence_number(),
73-
best_diff: share_accounting.get_best_diff(),
74-
});
75-
}
76-
}
77-
}
53+
let channel_id = extended_channel.get_channel_id();
54+
let target = extended_channel.get_target();
55+
let extranonce_prefix = extended_channel.get_extranonce_prefix();
56+
let user_identity = extended_channel.get_user_identity();
57+
let share_accounting = extended_channel.get_share_accounting();
58+
59+
extended_channels.push(ServerExtendedChannelInfo {
60+
channel_id,
61+
user_identity: user_identity.clone(),
62+
nominal_hashrate: extended_channel.get_nominal_hashrate(),
63+
target_hex: hex::encode(target.to_be_bytes()),
64+
extranonce_prefix_hex: hex::encode(extranonce_prefix),
65+
full_extranonce_size: extended_channel.get_full_extranonce_size(),
66+
rollable_extranonce_size: extended_channel.get_rollable_extranonce_size(),
67+
version_rolling: extended_channel.is_version_rolling(),
68+
shares_accepted: share_accounting.get_shares_accepted(),
69+
share_work_sum: share_accounting.get_share_work_sum(),
70+
last_share_sequence_number: share_accounting
71+
.get_last_share_sequence_number(),
72+
best_diff: share_accounting.get_best_diff(),
73+
});
7874
}
79-
})
80-
.unwrap();
75+
}
76+
}
8177

8278
ServerInfo {
8379
extended_channels,
Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{cell::RefCell, sync::atomic::AtomicBool, time::Instant};
1+
use std::time::Instant;
22
use stratum_apps::{
33
stratum_core::{
44
bitcoin::Target,
@@ -16,78 +16,74 @@ use super::SubmitShareWithChannelId;
1616
#[derive(Debug)]
1717
pub struct DownstreamData {
1818
pub channel_id: Option<ChannelId>,
19-
pub downstream_id: DownstreamId,
2019
pub extranonce1: Extranonce<'static>,
2120
pub extranonce2_len: usize,
21+
pub target: Target,
22+
pub hashrate: Option<Hashrate>,
2223
pub version_rolling_mask: Option<HexU32Be>,
2324
pub version_rolling_min_bit: Option<HexU32Be>,
2425
pub last_job_version_field: Option<u32>,
2526
pub authorized_worker_name: String,
2627
pub user_identity: String,
27-
pub target: Target,
28-
pub hashrate: Option<Hashrate>,
2928
pub cached_set_difficulty: Option<json_rpc::Message>,
3029
pub cached_notify: Option<json_rpc::Message>,
3130
pub pending_target: Option<Target>,
3231
pub pending_hashrate: Option<Hashrate>,
33-
// Flag to track if SV1 handshake is complete (subscribe + authorize)
34-
pub sv1_handshake_complete: AtomicBool,
3532
// Queue of Sv1 handshake messages received while waiting for SV2 channel to open
3633
pub queued_sv1_handshake_messages: Vec<json_rpc::Message>,
37-
// Flag to indicate we're processing queued Sv1 handshake message responses
38-
pub processing_queued_sv1_handshake_responses: AtomicBool,
3934
// Stores pending shares to be sent to the sv1_server
40-
pub pending_share: RefCell<Option<SubmitShareWithChannelId>>,
35+
pub pending_share: Option<SubmitShareWithChannelId>,
4136
// Tracks the upstream target for this downstream, used for vardiff target comparison
4237
pub upstream_target: Option<Target>,
4338
// Timestamp of when the last job was received by this downstream, used for keepalive check
4439
pub last_job_received_time: Option<Instant>,
4540
}
4641

4742
impl DownstreamData {
48-
pub fn new(downstream_id: DownstreamId, target: Target, hashrate: Option<Hashrate>) -> Self {
43+
pub fn new(hashrate: Option<Hashrate>, target: Target) -> Self {
4944
DownstreamData {
5045
channel_id: None,
51-
downstream_id,
5246
extranonce1: vec![0; 8]
5347
.try_into()
5448
.expect("8-byte extranonce is always valid"),
5549
extranonce2_len: 4,
50+
target,
51+
hashrate,
5652
version_rolling_mask: None,
5753
version_rolling_min_bit: None,
5854
last_job_version_field: None,
5955
authorized_worker_name: String::new(),
6056
user_identity: String::new(),
61-
target,
62-
hashrate,
6357
cached_set_difficulty: None,
6458
cached_notify: None,
6559
pending_target: None,
6660
pending_hashrate: None,
67-
sv1_handshake_complete: AtomicBool::new(false),
6861
queued_sv1_handshake_messages: Vec::new(),
69-
processing_queued_sv1_handshake_responses: AtomicBool::new(false),
70-
pending_share: RefCell::new(None),
62+
pending_share: None,
7163
upstream_target: None,
7264
last_job_received_time: None,
7365
}
7466
}
7567

76-
pub fn set_pending_target(&mut self, new_target: Target) {
68+
pub fn set_pending_target(&mut self, new_target: Target, downstream_id: DownstreamId) {
7769
self.pending_target = Some(new_target);
78-
debug!("Downstream {}: Set pending target", self.downstream_id);
70+
debug!("Downstream {downstream_id}: Set pending target");
7971
}
8072

81-
pub fn set_pending_hashrate(&mut self, new_hashrate: Option<Hashrate>) {
73+
pub fn set_pending_hashrate(
74+
&mut self,
75+
new_hashrate: Option<Hashrate>,
76+
downstream_id: DownstreamId,
77+
) {
8278
self.pending_hashrate = new_hashrate;
83-
debug!("Downstream {}: Set pending hashrate", self.downstream_id);
79+
debug!("Downstream {downstream_id}: Set pending hashrate");
8480
}
8581

86-
pub fn set_upstream_target(&mut self, upstream_target: Target) {
82+
pub fn set_upstream_target(&mut self, upstream_target: Target, downstream_id: DownstreamId) {
8783
self.upstream_target = Some(upstream_target);
8884
debug!(
89-
"Downstream {}: Set upstream target to {:?}",
90-
self.downstream_id, upstream_target
85+
"Downstream {downstream_id}: Set upstream target to {:?}",
86+
upstream_target
9187
);
9288
}
9389
}

0 commit comments

Comments
 (0)