Skip to content

refactor!: update to latest iroh-metrics version, use non-global metrics collection #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
643 changes: 282 additions & 361 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ genawaiter = { version = "0.99.1", features = ["futures03"] }
hashlink = { version = "0.9.0", optional = true }
hex = "0.4.3"
indicatif = { version = "0.17.8", optional = true }
iroh-base = { version = "0.34" }
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.32", default-features = false }
iroh = "0.34"
iroh-metrics = { version = "0.34", default-features = false }
iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }
nested_enum_utils = { version = "0.1.0", optional = true }
num_cpus = "1.15.0"
oneshot = "0.1.8"
Expand Down Expand Up @@ -80,7 +80,7 @@ tracing-test = "0.2.5"

[dev-dependencies]
http-body = "1.0"
iroh = { version = "0.34", features = ["test-utils"] }
iroh = { git = "https://github.com/n0-computer/iroh", branch = "main", features = ["test-utils"] }
quinn = { package = "iroh-quinn", version = "0.13", features = ["ring"] }
futures-buffered = "0.2.4"
proptest = "1.0.0"
Expand Down
7 changes: 5 additions & 2 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ allow = [
"MIT",
"Zlib",
"MPL-2.0", # https://fossa.com/blog/open-source-software-licenses-101-mozilla-public-license-2-0/
"Unicode-3.0"
"Unicode-3.0",
"Unlicense", # https://unlicense.org/
]

[[licenses.clarify]]
Expand All @@ -38,4 +39,6 @@ ignore = [
]

[sources]
allow-git = []
allow-git = [
"https://github.com/n0-computer/iroh.git",
]
41 changes: 30 additions & 11 deletions src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use anyhow::anyhow;
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
use hashlink::LinkedHashSet;
use iroh::{endpoint, Endpoint, NodeAddr, NodeId};
use iroh_metrics::inc;
use tokio::{
sync::{mpsc, oneshot},
task::JoinSet,
Expand All @@ -55,7 +54,7 @@ use tokio_util::{either::Either, sync::CancellationToken, time::delay_queue};
use tracing::{debug, error, error_span, trace, warn, Instrument};

use crate::{
get::{db::DownloadProgress, Stats},
get::{db::DownloadProgress, error::GetError, Stats},
metrics::Metrics,
store::Store,
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
Expand Down Expand Up @@ -98,7 +97,7 @@ pub enum FailureAction {
/// The request was cancelled by us.
AllIntentsDropped,
/// An error occurred that prevents the request from being retried at all.
AbortRequest(anyhow::Error),
AbortRequest(GetError),
/// An error occurred that suggests the node should not be used in general.
DropPeer(anyhow::Error),
/// An error occurred in which neither the node nor the request are at fault.
Expand Down Expand Up @@ -332,6 +331,7 @@ pub struct Downloader {
next_id: Arc<AtomicU64>,
/// Channel to communicate with the service.
msg_tx: mpsc::Sender<Message>,
metrics: Arc<Metrics>,
}

impl Downloader {
Expand All @@ -354,23 +354,33 @@ impl Downloader {
where
S: Store,
{
let metrics = Arc::new(Metrics::default());
let me = endpoint.node_id().fmt_short();
let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
let dialer = Dialer::new(endpoint);

let metrics_clone = metrics.clone();
let create_future = move || {
let getter = get::IoGetter {
store: store.clone(),
};

let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);
let service = Service::new(
getter,
dialer,
concurrency_limits,
retry_config,
msg_rx,
metrics_clone,
);

service.run().instrument(error_span!("downloader", %me))
};
rt.spawn_detached(create_future);
Self {
next_id: Arc::new(AtomicU64::new(0)),
msg_tx,
metrics,
}
}

Expand Down Expand Up @@ -424,6 +434,11 @@ impl Downloader {
debug!(?msg, "nodes have not been sent")
}
}

/// Returns the metrics collected for this downloader.
pub fn metrics(&self) -> &Arc<Metrics> {
&self.metrics
}
}

/// Messages the service can receive.
Expand Down Expand Up @@ -565,6 +580,7 @@ struct Service<G: Getter, D: DialerT> {
in_progress_downloads: JoinSet<(DownloadKind, InternalDownloadResult)>,
/// Progress tracker
progress_tracker: ProgressTracker,
metrics: Arc<Metrics>,
}
impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
fn new(
Expand All @@ -573,6 +589,7 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
concurrency_limits: ConcurrencyLimits,
retry_config: RetryConfig,
msg_rx: mpsc::Receiver<Message>,
metrics: Arc<Metrics>,
) -> Self {
Service {
getter,
Expand All @@ -590,23 +607,24 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
in_progress_downloads: Default::default(),
progress_tracker: ProgressTracker::new(),
queue: Default::default(),
metrics,
}
}

/// Main loop for the service.
async fn run(mut self) {
loop {
trace!("wait for tick");
inc!(Metrics, downloader_tick_main);
self.metrics.downloader_tick_main.inc();
tokio::select! {
Some((node, conn_result)) = self.dialer.next() => {
trace!(node=%node.fmt_short(), "tick: connection ready");
inc!(Metrics, downloader_tick_connection_ready);
self.metrics.downloader_tick_connection_ready.inc();
self.on_connection_ready(node, conn_result);
}
maybe_msg = self.msg_rx.recv() => {
trace!(msg=?maybe_msg, "tick: message received");
inc!(Metrics, downloader_tick_message_received);
self.metrics.downloader_tick_message_received.inc();
match maybe_msg {
Some(msg) => self.handle_message(msg).await,
None => return self.shutdown().await,
Expand All @@ -616,25 +634,26 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
match res {
Ok((kind, result)) => {
trace!(%kind, "tick: transfer completed");
inc!(Metrics, downloader_tick_transfer_completed);
self::get::track_metrics(&result, &self.metrics);
self.metrics.downloader_tick_transfer_completed.inc();
self.on_download_completed(kind, result);
}
Err(err) => {
warn!(?err, "transfer task panicked");
inc!(Metrics, downloader_tick_transfer_failed);
self.metrics.downloader_tick_transfer_failed.inc();
}
}
}
Some(expired) = self.retry_nodes_queue.next() => {
let node = expired.into_inner();
trace!(node=%node.fmt_short(), "tick: retry node");
inc!(Metrics, downloader_tick_retry_node);
self.metrics.downloader_tick_retry_node.inc();
self.on_retry_wait_elapsed(node);
}
Some(expired) = self.goodbye_nodes_queue.next() => {
let node = expired.into_inner();
trace!(node=%node.fmt_short(), "tick: goodbye node");
inc!(Metrics, downloader_tick_goodbye_node);
self.metrics.downloader_tick_goodbye_node.inc();
self.disconnect_idle_node(node, "idle expired");
}
}
Expand Down
33 changes: 18 additions & 15 deletions src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use crate::{
impl From<GetError> for FailureAction {
fn from(e: GetError) -> Self {
match e {
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
e @ GetError::NotFound(_) => FailureAction::AbortRequest(e),
e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e),
// TODO: what do we want to do on local failures?
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e),
}
}
}
Expand Down Expand Up @@ -61,8 +61,6 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
async move {
let res = self.proceed(conn).await;
#[cfg(feature = "metrics")]
track_metrics(&res);
match res {
Ok(stats) => Ok(stats),
Err(err) => Err(err.into()),
Expand All @@ -72,11 +70,10 @@ impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsCon
}
}

#[cfg(feature = "metrics")]
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
use iroh_metrics::{inc, inc_by};

use crate::metrics::Metrics;
pub(super) fn track_metrics(
res: &Result<crate::get::Stats, FailureAction>,
metrics: &crate::metrics::Metrics,
) {
match res {
Ok(stats) => {
let crate::get::Stats {
Expand All @@ -85,13 +82,19 @@ fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
elapsed,
} = stats;

inc!(Metrics, downloads_success);
inc_by!(Metrics, download_bytes_total, *bytes_written);
inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
metrics.downloads_success.inc();
metrics.download_bytes_total.inc_by(*bytes_written);
metrics
.download_time_total
.inc_by(elapsed.as_millis() as u64);
}
Err(e) => match &e {
GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
_ => inc!(Metrics, downloads_error),
FailureAction::AbortRequest(GetError::NotFound(_)) => {
metrics.downloads_notfound.inc();
}
_ => {
metrics.downloads_error.inc();
}
},
}
}
12 changes: 11 additions & 1 deletion src/downloader/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,28 @@ impl Downloader {
retry_config: RetryConfig,
) -> (Self, LocalPool) {
let (msg_tx, msg_rx) = mpsc::channel(super::SERVICE_CHANNEL_CAPACITY);
let metrics = Arc::new(Metrics::default());

let lp = LocalPool::default();
let metrics_clone = metrics.clone();
lp.spawn_detached(move || async move {
// we want to see the logs of the service
let service = Service::new(getter, dialer, concurrency_limits, retry_config, msg_rx);
let service = Service::new(
getter,
dialer,
concurrency_limits,
retry_config,
msg_rx,
metrics_clone,
);
service.run().await
});

(
Downloader {
next_id: Arc::new(AtomicU64::new(0)),
msg_tx,
metrics,
},
lp,
)
Expand Down
67 changes: 21 additions & 46 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,40 @@
//! Metrics for iroh-blobs

use iroh_metrics::{
core::{Counter, Metric},
struct_iterable::Iterable,
};
use iroh_metrics::{Counter, MetricsGroup};

/// Enum of metrics for the module
#[allow(missing_docs)]
#[derive(Debug, Clone, Iterable)]
#[derive(Debug, MetricsGroup, Default)]
#[metrics(name = "iroh-blobs")]
pub struct Metrics {
/// Total number of content bytes downloaded
pub download_bytes_total: Counter,
/// Total time in ms spent downloading content bytes
pub download_time_total: Counter,
/// Total number of successful downloads
pub downloads_success: Counter,
/// Total number of downloads failed with error
pub downloads_error: Counter,
/// Total number of downloads failed with not found
pub downloads_notfound: Counter,

/// Number of times the main pub downloader actor loop ticked
pub downloader_tick_main: Counter,

/// Number of times the pub downloader actor ticked for a connection ready
pub downloader_tick_connection_ready: Counter,

/// Number of times the pub downloader actor ticked for a message received
pub downloader_tick_message_received: Counter,

/// Number of times the pub downloader actor ticked for a transfer completed
pub downloader_tick_transfer_completed: Counter,
pub downloader_tick_transfer_failed: Counter,
pub downloader_tick_retry_node: Counter,
pub downloader_tick_goodbye_node: Counter,
}

impl Default for Metrics {
fn default() -> Self {
Self {
download_bytes_total: Counter::new("Total number of content bytes downloaded"),
download_time_total: Counter::new("Total time in ms spent downloading content bytes"),
downloads_success: Counter::new("Total number of successful downloads"),
downloads_error: Counter::new("Total number of downloads failed with error"),
downloads_notfound: Counter::new("Total number of downloads failed with not found"),
/// Number of times the pub downloader actor ticked for a transfer failed
pub downloader_tick_transfer_failed: Counter,

downloader_tick_main: Counter::new(
"Number of times the main downloader actor loop ticked",
),
downloader_tick_connection_ready: Counter::new(
"Number of times the downloader actor ticked for a connection ready",
),
downloader_tick_message_received: Counter::new(
"Number of times the downloader actor ticked for a message received",
),
downloader_tick_transfer_completed: Counter::new(
"Number of times the downloader actor ticked for a transfer completed",
),
downloader_tick_transfer_failed: Counter::new(
"Number of times the downloader actor ticked for a transfer failed",
),
downloader_tick_retry_node: Counter::new(
"Number of times the downloader actor ticked for a retry node",
),
downloader_tick_goodbye_node: Counter::new(
"Number of times the downloader actor ticked for a goodbye node",
),
}
}
}
/// Number of times the pub downloader actor ticked for a retry node
pub downloader_tick_retry_node: Counter,

impl Metric for Metrics {
fn name() -> &'static str {
"iroh-blobs"
}
/// Number of times the pub downloader actor ticked for a goodbye node
pub downloader_tick_goodbye_node: Counter,
}
5 changes: 5 additions & 0 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tracing::debug;

use crate::{
downloader::{ConcurrencyLimits, Downloader, RetryConfig},
metrics::Metrics,
provider::EventSender,
store::GcConfig,
util::{
Expand Down Expand Up @@ -258,6 +259,10 @@ impl<S: crate::store::Store> Blobs<S> {
&self.inner.store
}

pub fn metrics(&self) -> &Arc<Metrics> {
self.downloader().metrics()
}

pub fn events(&self) -> &EventSender {
&self.inner.events
}
Expand Down
Loading