Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .vscode/settings.json

This file was deleted.

22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions p2pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ lru = "0.12.5"
tempfile = "3.14.0"
rkv = { version = "0.19.0", features = ["lmdb"] }
bincode = "1.3.3"
prometheus = "0.13.4"

[package.metadata.cargo-machete]
ignored = ["log4rs"]
Expand Down
1 change: 1 addition & 0 deletions p2pool/src/server/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl HttpServer {
.route("/chain", get(handlers::handle_chain))
.route("/peer", get(handlers::handle_peers))
.route("/connections", get(handlers::handle_connections))
.route("/metrics", get(handlers::handle_metrics))
.with_state(AppState {
stats_client: self.stats_client.clone(),
p2p_service_client: self.p2p_service_client.clone(),
Expand Down
162 changes: 162 additions & 0 deletions p2pool/src/server/http/stats/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use std::collections::HashMap;
use axum::{
extract::{Query, State},
http::StatusCode,
response::Response,
Json,
};
use log::{error, info};
use prometheus::{Encoder, Gauge, GaugeVec, IntGauge, Opts, Registry, TextEncoder};
use serde::Serialize;
use tari_core::proof_of_work::PowAlgorithm;
use tari_utilities::{epoch_time::EpochTime, hex::Hex};
Expand All @@ -21,6 +23,7 @@ use crate::server::{
};

const LOG_TARGET: &str = "tari::p2pool::server::stats::get";
const METRICS_LOG_TARGET: &str = "tari::p2pool::server::stats::metrics";

#[derive(Serialize)]
pub(crate) struct BlockResult {
Expand Down Expand Up @@ -361,3 +364,162 @@ async fn get_chain_stats(state: AppState) -> Result<(GetStatsResponse, GetStatsR
// .await,
// )
// }
pub(crate) async fn handle_metrics(State(state): State<AppState>) -> Response {
let timer = std::time::Instant::now();
info!(target: METRICS_LOG_TARGET, "handle_metrics");

// Create a new registry
let registry = Registry::new();

// Get stats data
let stats_result = get_metrics_data(state, &registry).await;

// Generate the metrics output
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut buffer = Vec::new();

if let Err(e) = encoder.encode(&metric_families, &mut buffer) {
error!(target: METRICS_LOG_TARGET, "Failed to encode metrics: {e:?}");
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(format!("Failed to encode metrics: {e:?}").into())
.unwrap();
}

if timer.elapsed() > MAX_ACCEPTABLE_HTTP_TIMEOUT {
error!(target: METRICS_LOG_TARGET, "handle_metrics took too long: {}ms", timer.elapsed().as_millis());
}

// Return the metrics in Prometheus format
match stats_result {
Ok(_) => Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; version=0.0.4")
.body(String::from_utf8_lossy(&buffer).to_string().into())
.unwrap(),
Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(format!("Failed to collect metrics: {e:?}").into())
.unwrap(),
}
}

async fn get_metrics_data(state: AppState, registry: &Registry) -> Result<(), StatusCode> {
// Get full stats which includes chain stats and connection info
let stats = handle_get_stats(State(state.clone())).await?.0;

// Get peer info
let (tx, rx) = oneshot::channel();
state
.p2p_service_client
.send(P2pServiceQuery::GetPeers(tx))
.await
.map_err(|error| {
error!(target: METRICS_LOG_TARGET, "Failed to get peers info: {error:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;

let peers_info = rx.await.map_err(|error| {
error!(target: METRICS_LOG_TARGET, "Failed to receive peers info: {error:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;

// Register and set metrics

// We'll use serde_json to convert the stats to a value we can extract fields from
let rx_stats_json = serde_json::to_value(&stats.randomx_stats).unwrap_or_default();
let sha3x_stats_json = serde_json::to_value(&stats.sha3x_stats).unwrap_or_default();

// Chain height metrics
let chain_height = GaugeVec::new(Opts::new("p2pool_chain_height", "Current chain height"), &["algorithm"]).unwrap();
registry.register(Box::new(chain_height.clone())).unwrap();

if let Some(height) = rx_stats_json.get("height").and_then(|v| v.as_u64()) {
chain_height.with_label_values(&["randomx"]).set(height as f64);
}

if let Some(height) = sha3x_stats_json.get("height").and_then(|v| v.as_u64()) {
chain_height.with_label_values(&["sha3x"]).set(height as f64);
}

// Shares metrics
let total_shares = GaugeVec::new(Opts::new("p2pool_total_shares", "Total number of shares"), &[
"algorithm",
])
.unwrap();
registry.register(Box::new(total_shares.clone())).unwrap();

if let Some(shares) = rx_stats_json.get("total_shares").and_then(|v| v.as_u64()) {
total_shares.with_label_values(&["randomx"]).set(shares as f64);
}

if let Some(shares) = sha3x_stats_json.get("total_shares").and_then(|v| v.as_u64()) {
total_shares.with_label_values(&["sha3x"]).set(shares as f64);
}

// My shares metrics
let my_shares = GaugeVec::new(Opts::new("p2pool_my_shares", "Number of my shares"), &["algorithm"]).unwrap();
registry.register(Box::new(my_shares.clone())).unwrap();

if let Some(shares) = rx_stats_json.get("num_my_shares").and_then(|v| v.as_u64()) {
my_shares.with_label_values(&["randomx"]).set(shares as f64);
}

if let Some(shares) = sha3x_stats_json.get("num_my_shares").and_then(|v| v.as_u64()) {
my_shares.with_label_values(&["sha3x"]).set(shares as f64);
}

// Connection metrics
let connected_peers = Gauge::new("p2pool_connected_peers", "Number of connected peers").unwrap();
registry.register(Box::new(connected_peers.clone())).unwrap();
connected_peers.set(stats.connection_info.connected_peers as f64);

// Connection counters
let pending_incoming = Gauge::new("p2pool_pending_incoming", "Number of pending incoming connections").unwrap();
registry.register(Box::new(pending_incoming.clone())).unwrap();
pending_incoming.set(stats.connection_info.network_info.connection_counters.pending_incoming as f64);

let pending_outgoing = Gauge::new("p2pool_pending_outgoing", "Number of pending outgoing connections").unwrap();
registry.register(Box::new(pending_outgoing.clone())).unwrap();
pending_outgoing.set(stats.connection_info.network_info.connection_counters.pending_outgoing as f64);

let established_incoming = Gauge::new(
"p2pool_established_incoming",
"Number of established incoming connections",
)
.unwrap();
registry.register(Box::new(established_incoming.clone())).unwrap();
established_incoming.set(
stats
.connection_info
.network_info
.connection_counters
.established_incoming as f64,
);

let established_outgoing = Gauge::new(
"p2pool_established_outgoing",
"Number of established outgoing connections",
)
.unwrap();
registry.register(Box::new(established_outgoing.clone())).unwrap();
established_outgoing.set(
stats
.connection_info
.network_info
.connection_counters
.established_outgoing as f64,
);

// Peer list metrics
let whitelist_peers = IntGauge::new("p2pool_whitelist_peers", "Number of whitelisted peers").unwrap();
registry.register(Box::new(whitelist_peers.clone())).unwrap();
whitelist_peers.set(peers_info.0.len() as i64);

let greylist_peers = IntGauge::new("p2pool_greylist_peers", "Number of greylisted peers").unwrap();
registry.register(Box::new(greylist_peers.clone())).unwrap();
greylist_peers.set(peers_info.1.len() as i64);

Ok(())
}
Comment on lines +408 to +525
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid duplicate metric registrations on every request call.
Each time handle_metrics is invoked, get_metrics_data re-registers multiple metrics. Typically, we'd define and register shared metrics once at startup, then just update them within each request. Repeated registration can lead to duplicates or memory overhead. A recommended approach is to keep the Gauges in a static or higher scope, updating them instead of re-registering.

-async fn get_metrics_data(state: AppState, registry: &Registry) -> Result<(), StatusCode> {
-    // Register and set metrics
-    let chain_height = GaugeVec::new(Opts::new("p2pool_chain_height", "Current chain height"), &["algorithm"]).unwrap();
-    registry.register(Box::new(chain_height.clone())).unwrap();
-    // ...
+async fn get_metrics_data(state: AppState) -> Result<(), StatusCode> {
+    // Instead of registering metrics here, update previously registered metrics:
+    // chain_height.with_label_values(&["randomx"]).set(..);
+    // ...
}

Committable suggestion skipped: line range outside the PR's diff.

Loading