Skip to content

refactor!: update to iroh 0.35 and non-global metrics tracking #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
699 changes: 328 additions & 371 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ futures-buffered = "0.2.4"
futures-lite = "2.3.0"
futures-util = { version = "0.3.25" }
hex = "0.4"
iroh-base = { version = "0.34", features = ["ticket"] }
iroh-blobs = { version = "0.34" }
iroh-gossip = { version = "0.34", optional = true, features = ["net"] }
iroh-metrics = { version = "0.32", default-features = false }
iroh = { version = "0.34", optional = true }
iroh-base = { version = "0.35", features = ["ticket"] }
iroh-blobs = { version = "0.35" }
iroh-gossip = { version = "0.35", optional = true, features = ["net"] }
iroh-metrics = { version = "0.34", default-features = false }
iroh = { version = "0.35", optional = true }
num_enum = "0.7"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
rand = "0.8.5"
Expand All @@ -58,8 +58,8 @@ tracing = "0.1"

# rpc
nested_enum_utils = { version = "0.1.0", optional = true }
quic-rpc = { version = "0.19", optional = true }
quic-rpc-derive = { version = "0.19", optional = true }
quic-rpc = { version = "0.20", optional = true }
quic-rpc-derive = { version = "0.20", optional = true }
serde-error = { version = "0.1.3", optional = true }
portable-atomic = { version = "1.9.0", optional = true }

Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ async fn main() -> anyhow::Result<()> {
.accept(BLOBS_ALPN, blobs)
.accept(GOSSIP_ALPN, gossip)
.accept(DOCS_ALPN, docs)
.spawn()
.await?;
.spawn();

// do fun stuff with docs!
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ allow = [
"MIT",
"Zlib",
"MPL-2.0", # https://fossa.com/blog/open-source-software-licenses-101-mozilla-public-license-2-0/
"Unicode-3.0"
"Unicode-3.0",
"Unlicense" # https://unlicense.org/
]

[[licenses.clarify]]
Expand Down
18 changes: 16 additions & 2 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use futures_util::FutureExt;
use iroh_blobs::Hash;
use iroh_metrics::inc;
use serde::{Deserialize, Serialize};
use tokio::{sync::oneshot, task::JoinSet};
use tracing::{debug, error, error_span, trace, warn};
Expand Down Expand Up @@ -224,6 +223,7 @@ struct OpenReplica {
pub struct SyncHandle {
tx: async_channel::Sender<Action>,
join_handle: Arc<Option<JoinHandle<()>>>,
metrics: Arc<Metrics>,
}

/// Options when opening a replica.
Expand Down Expand Up @@ -255,13 +255,15 @@ impl SyncHandle {
content_status_callback: Option<ContentStatusCallback>,
me: String,
) -> SyncHandle {
let metrics = Arc::new(Metrics::default());
let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
let actor = Actor {
store,
states: Default::default(),
action_rx,
content_status_callback,
tasks: Default::default(),
metrics: metrics.clone(),
};
let join_handle = std::thread::Builder::new()
.name("sync-actor".to_string())
Expand All @@ -278,9 +280,15 @@ impl SyncHandle {
SyncHandle {
tx: action_tx,
join_handle,
metrics,
}
}

/// Returns the metrics collected in this sync actor.
pub fn metrics(&self) -> &Arc<Metrics> {
&self.metrics
}

pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
let (reply, rx) = oneshot::channel();
let action = ReplicaAction::Open { reply, opts };
Expand Down Expand Up @@ -599,6 +607,7 @@ struct Actor {
action_rx: async_channel::Receiver<Action>,
content_status_callback: Option<ContentStatusCallback>,
tasks: JoinSet<()>,
metrics: Arc<Metrics>,
}

impl Actor {
Expand Down Expand Up @@ -634,7 +643,7 @@ impl Actor {
}
};
trace!(%action, "tick");
inc!(Metrics, actor_tick_main);
self.metrics.actor_tick_main.inc();
match action {
Action::Shutdown { reply } => {
break reply;
Expand Down Expand Up @@ -750,6 +759,8 @@ impl Actor {
let author = get_author(&mut this.store, &author)?;
let mut replica = this.states.replica(namespace, &mut this.store)?;
replica.insert(&key, &author, hash, len)?;
this.metrics.new_entries_local.inc();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metric were previously tracked in sync::Replica::insert_entry

this.metrics.new_entries_local_size.inc_by(len);
Ok(())
}),
ReplicaAction::DeletePrefix { author, key, reply } => {
Expand All @@ -769,7 +780,10 @@ impl Actor {
let mut replica = this
.states
.replica_if_syncing(&namespace, &mut this.store)?;
let len = entry.content_len();
replica.insert_remote_entry(entry, from, content_status)?;
this.metrics.new_entries_remote.inc();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metric were previously tracked in sync::Replica::insert_entry

this.metrics.new_entries_remote_size.inc_by(len);
Ok(())
}),

Expand Down
9 changes: 8 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ pub use self::{
state::{Origin, SyncReason},
};
use crate::{
actor::SyncHandle, Author, AuthorId, ContentStatus, ContentStatusCallback, Entry, NamespaceId,
actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
Entry, NamespaceId,
};

mod gossip;
Expand Down Expand Up @@ -90,6 +91,7 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
downloader,
to_live_actor_recv,
live_actor_tx.clone(),
sync.metrics().clone(),
);
let actor_handle = tokio::task::spawn(
async move {
Expand Down Expand Up @@ -155,6 +157,11 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
&self.blob_store
}

/// Returns the metrics tracked for this engine.
pub fn metrics(&self) -> &Arc<Metrics> {
self.sync.metrics()
}

/// Start to sync a document.
///
/// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,
Expand Down
30 changes: 21 additions & 9 deletions src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::SystemTime,
};

Expand All @@ -15,7 +16,6 @@ use iroh_blobs::{
Hash, HashAndFormat,
};
use iroh_gossip::net::Gossip;
use iroh_metrics::inc;
use serde::{Deserialize, Serialize};
use tokio::{
sync::{self, mpsc, oneshot},
Expand Down Expand Up @@ -180,6 +180,7 @@ pub struct LiveActor<B: iroh_blobs::store::Store> {

/// Sync state per replica and peer
state: NamespaceStates,
metrics: Arc<Metrics>,
}
impl<B: iroh_blobs::store::Store> LiveActor<B> {
/// Create the live actor.
Expand All @@ -192,6 +193,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
downloader: Downloader,
inbox: mpsc::Receiver<ToLiveActor>,
sync_actor_tx: mpsc::Sender<ToLiveActor>,
metrics: Arc<Metrics>,
) -> Self {
let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
Expand All @@ -212,6 +214,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
state: Default::default(),
missing_hashes: Default::default(),
queued_hashes: Default::default(),
metrics,
}
}

Expand All @@ -236,13 +239,13 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
loop {
i += 1;
trace!(?i, "tick wait");
inc!(Metrics, doc_live_tick_main);
self.metrics.doc_live_tick_main.inc();
tokio::select! {
biased;
msg = self.inbox.recv() => {
let msg = msg.context("to_actor closed")?;
trace!(?i, %msg, "tick: to_actor");
inc!(Metrics, doc_live_tick_actor);
self.metrics.doc_live_tick_actor.inc();
match msg {
ToLiveActor::Shutdown { reply } => {
break Ok(reply);
Expand All @@ -254,28 +257,28 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}
event = self.replica_events_rx.recv() => {
trace!(?i, "tick: replica_event");
inc!(Metrics, doc_live_tick_replica_event);
self.metrics.doc_live_tick_replica_event.inc();
let event = event.context("replica_events closed")?;
if let Err(err) = self.on_replica_event(event).await {
error!(?err, "Failed to process replica event");
}
}
Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
trace!(?i, "tick: running_sync_connect");
inc!(Metrics, doc_live_tick_running_sync_connect);
self.metrics.doc_live_tick_running_sync_connect.inc();
let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
self.on_sync_via_connect_finished(namespace, peer, reason, res).await;

}
Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
trace!(?i, "tick: running_sync_accept");
inc!(Metrics, doc_live_tick_running_sync_accept);
self.metrics.doc_live_tick_running_sync_accept.inc();
let res = res.context("running_sync_accept closed")?;
self.on_sync_via_accept_finished(res).await;
}
Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
trace!(?i, "tick: pending_downloads");
inc!(Metrics, doc_live_tick_pending_downloads);
self.metrics.doc_live_tick_pending_downloads.inc();
let (namespace, hash, res) = res.context("pending_downloads closed")?;
self.on_download_ready(namespace, hash, res).await;
}
Expand Down Expand Up @@ -362,8 +365,16 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}
let endpoint = self.endpoint.clone();
let sync = self.sync.clone();
let metrics = self.metrics.clone();
let fut = async move {
let res = connect_and_sync(&endpoint, &sync, namespace, NodeAddr::new(peer)).await;
let res = connect_and_sync(
&endpoint,
&sync,
namespace,
NodeAddr::new(peer),
Some(&metrics),
)
.await;
(namespace, peer, reason, res)
}
.instrument(Span::current());
Expand Down Expand Up @@ -787,8 +798,9 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
};
debug!("incoming connection");
let sync = self.sync.clone();
let metrics = self.metrics.clone();
self.running_sync_accept.spawn(
async move { handle_connection(sync, conn, accept_request_cb).await }
async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
.instrument(Span::current()),
);
}
Expand Down
82 changes: 23 additions & 59 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,85 +1,49 @@
//! Metrics for iroh-docs

use iroh_metrics::{
core::{Counter, Metric},
struct_iterable::Iterable,
};
use iroh_metrics::{Counter, MetricsGroup};

/// Metrics for iroh-docs
#[allow(missing_docs)]
#[derive(Debug, Clone, Iterable)]
#[derive(Debug, Default, MetricsGroup)]
pub struct Metrics {
/// Number of document entries added locally
pub new_entries_local: Counter,
/// Number of document entries added by peers
pub new_entries_remote: Counter,
/// Total size of entry contents added locally
pub new_entries_local_size: Counter,
/// Total size of entry contents added by peers
pub new_entries_remote_size: Counter,
pub sync_via_connect_success: Counter,
pub sync_via_connect_failure: Counter,
/// Number of successful syncs (via accept)
pub sync_via_accept_success: Counter,
/// Number of failed syncs (via accept)
pub sync_via_accept_failure: Counter,
/// Number of successful syncs (via connect)
pub sync_via_connect_success: Counter,
/// Number of failed syncs (via connect)
pub sync_via_connect_failure: Counter,

/// Number of times the main actor loop ticked
pub actor_tick_main: Counter,

/// Number of times the gossip actor loop ticked
pub doc_gossip_tick_main: Counter,
/// Number of times the gossip actor processed an event
pub doc_gossip_tick_event: Counter,
/// Number of times the gossip actor processed an actor event
pub doc_gossip_tick_actor: Counter,
/// Number of times the gossip actor processed a pending join
pub doc_gossip_tick_pending_join: Counter,

/// Number of times the live actor loop ticked
pub doc_live_tick_main: Counter,
/// Number of times the live actor processed an actor event
pub doc_live_tick_actor: Counter,
/// Number of times the live actor processed a replica event
pub doc_live_tick_replica_event: Counter,
/// Number of times the live actor processed a running sync connect
pub doc_live_tick_running_sync_connect: Counter,
/// Number of times the live actor processed a running sync accept
pub doc_live_tick_running_sync_accept: Counter,
/// Number of times the live actor processed a pending download
pub doc_live_tick_pending_downloads: Counter,
}

impl Default for Metrics {
fn default() -> Self {
Self {
new_entries_local: Counter::new("Number of document entries added locally"),
new_entries_remote: Counter::new("Number of document entries added by peers"),
new_entries_local_size: Counter::new("Total size of entry contents added locally"),
new_entries_remote_size: Counter::new("Total size of entry contents added by peers"),
sync_via_accept_success: Counter::new("Number of successful syncs (via accept)"),
sync_via_accept_failure: Counter::new("Number of failed syncs (via accept)"),
sync_via_connect_success: Counter::new("Number of successful syncs (via connect)"),
sync_via_connect_failure: Counter::new("Number of failed syncs (via connect)"),

actor_tick_main: Counter::new("Number of times the main actor loop ticked"),

doc_gossip_tick_main: Counter::new("Number of times the gossip actor loop ticked"),
doc_gossip_tick_event: Counter::new(
"Number of times the gossip actor processed an event",
),
doc_gossip_tick_actor: Counter::new(
"Number of times the gossip actor processed an actor event",
),
doc_gossip_tick_pending_join: Counter::new(
"Number of times the gossip actor processed a pending join",
),

doc_live_tick_main: Counter::new("Number of times the live actor loop ticked"),
doc_live_tick_actor: Counter::new(
"Number of times the live actor processed an actor event",
),
doc_live_tick_replica_event: Counter::new(
"Number of times the live actor processed a replica event",
),
doc_live_tick_running_sync_connect: Counter::new(
"Number of times the live actor processed a running sync connect",
),
doc_live_tick_running_sync_accept: Counter::new(
"Number of times the live actor processed a running sync accept",
),
doc_live_tick_pending_downloads: Counter::new(
"Number of times the live actor processed a pending download",
),
}
}
}

impl Metric for Metrics {
fn name() -> &'static str {
"iroh_docs"
}
}
Loading
Loading