diff --git a/Cargo.lock b/Cargo.lock index a61e49a84..88703ab08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,9 +66,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.86" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8" [[package]] name = "arrayref" @@ -412,6 +412,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" +[[package]] +name = "cordyceps" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec10f0a762d93c4498d2e97a333805cb6250d60bead623f71d8034f9a4152ba3" +dependencies = [ + "loom", + "tracing", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -952,10 +962,11 @@ dependencies = [ [[package]] name = "futures-buffered" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fa130f3777d0d4b0993653c20bc433026d3290627693c4ed1b18dd237357ab" +checksum = "34acda8ae8b63fbe0b2195c998b180cff89a8212fb2622a78b572a9f1c6f7684" dependencies = [ + "cordyceps", "diatomic-waker", "futures-core", "pin-project-lite", @@ -1115,6 +1126,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "generator" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "windows 0.48.0", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -1571,8 +1595,7 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "iroh-base" version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28a777d7e0b3e2fdab4ad1b21b64be87a43ac3ceb2a2ccef905e480ad3317396" +source = "git+https://github.com/n0-computer/iroh?branch=main#6c6827d63ec12d9c9583b73b5530a7641060535c" dependencies = [ "aead", "anyhow", @@ -1589,7 +1612,6 @@ dependencies = [ "rand_core", "redb 2.1.1", "serde", - "serde-error", "ssh-key", "thiserror", "ttl_cache", @@ -1632,6 +1654,7 @@ dependencies = [ "iroh-metrics", "iroh-net", "iroh-quinn", + "iroh-router", "iroh-test", "num_cpus", "oneshot", @@ -1678,8 +1701,7 @@ dependencies = [ [[package]] name = "iroh-metrics" version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c78cf30022e1c7a10fc0ae0a6ba83f131b7c3b92d4876f6c97aba93fe534be6" +source = "git+https://github.com/n0-computer/iroh?branch=main#6c6827d63ec12d9c9583b73b5530a7641060535c" dependencies = [ "anyhow", "erased_set", @@ -1699,8 +1721,7 @@ dependencies = [ [[package]] name = "iroh-net" version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34192d8846fc59d6669fb80a485b430215ecc1bf3c2b9df4f8a92370fe37e13a" +source = "git+https://github.com/n0-computer/iroh?branch=main#6c6827d63ec12d9c9583b73b5530a7641060535c" dependencies = [ "anyhow", "backoff", @@ -1822,6 +1843,21 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "iroh-router" +version = "0.27.0" +source = "git+https://github.com/n0-computer/iroh?branch=main#6c6827d63ec12d9c9583b73b5530a7641060535c" +dependencies = [ + "anyhow", + "futures-buffered", + "futures-lite 2.3.0", + "futures-util", + "iroh-net", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "iroh-test" version = "0.27.0" @@ -1924,6 +1960,19 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "loom" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lru" version = "0.12.3" @@ -2015,13 +2064,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -3349,6 +3399,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -3941,28 +3997,27 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.1" +version = "1.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" +checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -4476,6 +4531,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows" version = "0.51.1" diff --git a/Cargo.toml b/Cargo.toml index 4054d56f7..12a50028e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ iroh-base = { version = "0.27.0", features = ["redb"] } iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.27.0", default-features = false } iroh-net = { version = "0.27.0" } +iroh-router = "0.27.0" num_cpus = "1.15.0" oneshot = "0.1.8" parking_lot = { version = "0.12.1", optional = true } @@ -113,3 +114,9 @@ debug-assertions = false opt-level = 3 panic = 'abort' incremental = false + +[patch.crates-io] +iroh-router = { git = "https://github.com/n0-computer/iroh", branch = "main" } +iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" } +iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" } +iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" } diff --git a/deny.toml b/deny.toml index 7db03d129..267ca7a65 100644 --- a/deny.toml +++ b/deny.toml @@ -34,3 +34,8 @@ license-files = [ ignore = [ "RUSTSEC-2024-0370", # unmaintained, no upgrade available ] + +[sources] +allow-git = [ + "https://github.com/n0-computer/iroh.git", +] diff --git a/src/lib.rs b/src/lib.rs index 2821cdf3e..b6358fabc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,9 @@ pub mod format; pub mod get; pub mod hashseq; pub mod metrics; +#[cfg(feature = "downloader")] +#[cfg_attr(iroh_docsrs, doc(cfg(feature = "downloader")))] +pub mod net_protocol; pub mod protocol; pub mod provider; pub mod store; diff --git a/src/net_protocol.rs b/src/net_protocol.rs new file mode 100644 index 000000000..d1de1da65 --- /dev/null +++ b/src/net_protocol.rs @@ -0,0 +1,309 @@ +//! Adaptation of `iroh-blobs` as an `iroh` protocol. + +// TODO: reduce API surface and add documentation +#![allow(missing_docs)] + +use std::{collections::BTreeMap, sync::Arc}; + +use anyhow::{anyhow, Result}; +use futures_lite::future::Boxed as BoxedFuture; +use iroh_base::hash::{BlobFormat, Hash}; +use iroh_net::{endpoint::Connecting, Endpoint, NodeAddr}; +use iroh_router::ProtocolHandler; +use serde::{Deserialize, Serialize}; +use tracing::{debug, warn}; + +use crate::{ + downloader::{DownloadRequest, Downloader}, + get::{ + db::{DownloadProgress, GetState}, + Stats, + }, + provider::EventSender, + util::{ + local_pool::LocalPoolHandle, + progress::{AsyncChannelProgressSender, ProgressSender}, + SetTagOption, + }, + HashAndFormat, TempTag, +}; + +#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] +pub struct BatchId(u64); + +/// A request to the node to download and share the data specified by the hash. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BlobDownloadRequest { + /// This mandatory field contains the hash of the data to download and share. + pub hash: Hash, + /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as + /// well. + pub format: BlobFormat, + /// This mandatory field specifies the nodes to download the data from. + /// + /// If set to more than a single node, they will all be tried. If `mode` is set to + /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds. + /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel, + /// if the concurrency limits permit. + pub nodes: Vec, + /// Optional tag to tag the data with. + pub tag: SetTagOption, + /// Whether to directly start the download or add it to the download queue. + pub mode: DownloadMode, +} + +/// Set the mode for whether to directly start the download or add it to the download queue. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DownloadMode { + /// Start the download right away. + /// + /// No concurrency limits or queuing will be applied. It is up to the user to manage download + /// concurrency. + Direct, + /// Queue the download. + /// + /// The download queue will be processed in-order, while respecting the downloader concurrency limits. + Queued, +} + +#[derive(Debug)] +pub struct Blobs { + rt: LocalPoolHandle, + store: S, + events: EventSender, + downloader: Downloader, + batches: tokio::sync::Mutex, +} + +/// Name used for logging when new node addresses are added from gossip. +const BLOB_DOWNLOAD_SOURCE_NAME: &str = "blob_download"; + +/// Keeps track of all the currently active batch operations of the blobs api. +#[derive(Debug, Default)] +pub struct BlobBatches { + /// Currently active batches + batches: BTreeMap, + /// Used to generate new batch ids. + max: u64, +} + +/// A single batch of blob operations +#[derive(Debug, Default)] +struct BlobBatch { + /// The tags in this batch. + tags: BTreeMap>, +} + +impl BlobBatches { + /// Create a new unique batch id. + pub fn create(&mut self) -> BatchId { + let id = self.max; + self.max += 1; + BatchId(id) + } + + /// Store a temp tag in a batch identified by a batch id. + pub fn store(&mut self, batch: BatchId, tt: TempTag) { + let entry = self.batches.entry(batch).or_default(); + entry.tags.entry(tt.hash_and_format()).or_default().push(tt); + } + + /// Remove a tag from a batch. + pub fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat) -> Result<()> { + if let Some(batch) = self.batches.get_mut(&batch) { + if let Some(tags) = batch.tags.get_mut(content) { + tags.pop(); + if tags.is_empty() { + batch.tags.remove(content); + } + return Ok(()); + } + } + // this can happen if we try to upgrade a tag from an expired batch + anyhow::bail!("tag not found in batch"); + } + + /// Remove an entire batch. + pub fn remove(&mut self, batch: BatchId) { + self.batches.remove(&batch); + } +} + +impl Blobs { + pub fn new_with_events( + store: S, + rt: LocalPoolHandle, + events: EventSender, + downloader: Downloader, + ) -> Self { + Self { + rt, + store, + events, + downloader, + batches: Default::default(), + } + } + + pub fn store(&self) -> &S { + &self.store + } + + pub async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> { + self.batches.lock().await + } + + pub async fn download( + &self, + endpoint: Endpoint, + req: BlobDownloadRequest, + progress: AsyncChannelProgressSender, + ) -> Result<()> { + let BlobDownloadRequest { + hash, + format, + nodes, + tag, + mode, + } = req; + let hash_and_format = HashAndFormat { hash, format }; + let temp_tag = self.store.temp_tag(hash_and_format); + let stats = match mode { + DownloadMode::Queued => { + self.download_queued(endpoint, hash_and_format, nodes, progress.clone()) + .await? + } + DownloadMode::Direct => { + self.download_direct_from_nodes(endpoint, hash_and_format, nodes, progress.clone()) + .await? + } + }; + + progress.send(DownloadProgress::AllDone(stats)).await.ok(); + match tag { + SetTagOption::Named(tag) => { + self.store.set_tag(tag, Some(hash_and_format)).await?; + } + SetTagOption::Auto => { + self.store.create_tag(hash_and_format).await?; + } + } + drop(temp_tag); + + Ok(()) + } + + async fn download_queued( + &self, + endpoint: Endpoint, + hash_and_format: HashAndFormat, + nodes: Vec, + progress: AsyncChannelProgressSender, + ) -> Result { + let mut node_ids = Vec::with_capacity(nodes.len()); + let mut any_added = false; + for node in nodes { + node_ids.push(node.node_id); + if !node.info.is_empty() { + endpoint.add_node_addr_with_source(node, BLOB_DOWNLOAD_SOURCE_NAME)?; + any_added = true; + } + } + let can_download = !node_ids.is_empty() && (any_added || endpoint.discovery().is_some()); + anyhow::ensure!(can_download, "no way to reach a node for download"); + let req = DownloadRequest::new(hash_and_format, node_ids).progress_sender(progress); + let handle = self.downloader.queue(req).await; + let stats = handle.await?; + Ok(stats) + } + + #[tracing::instrument("download_direct", skip_all, fields(hash=%hash_and_format.hash.fmt_short()))] + async fn download_direct_from_nodes( + &self, + endpoint: Endpoint, + hash_and_format: HashAndFormat, + nodes: Vec, + progress: AsyncChannelProgressSender, + ) -> Result { + let mut last_err = None; + let mut remaining_nodes = nodes.len(); + let mut nodes_iter = nodes.into_iter(); + 'outer: loop { + match crate::get::db::get_to_db_in_steps( + self.store.clone(), + hash_and_format, + progress.clone(), + ) + .await? + { + GetState::Complete(stats) => return Ok(stats), + GetState::NeedsConn(needs_conn) => { + let (conn, node_id) = 'inner: loop { + match nodes_iter.next() { + None => break 'outer, + Some(node) => { + remaining_nodes -= 1; + let node_id = node.node_id; + if node_id == endpoint.node_id() { + debug!( + ?remaining_nodes, + "skip node {} (it is the node id of ourselves)", + node_id.fmt_short() + ); + continue 'inner; + } + match endpoint.connect(node, crate::protocol::ALPN).await { + Ok(conn) => break 'inner (conn, node_id), + Err(err) => { + debug!( + ?remaining_nodes, + "failed to connect to {}: {err}", + node_id.fmt_short() + ); + continue 'inner; + } + } + } + } + }; + match needs_conn.proceed(conn).await { + Ok(stats) => return Ok(stats), + Err(err) => { + warn!( + ?remaining_nodes, + "failed to download from {}: {err}", + node_id.fmt_short() + ); + last_err = Some(err); + } + } + } + } + } + match last_err { + Some(err) => Err(err.into()), + None => Err(anyhow!("No nodes to download from provided")), + } + } +} + +impl ProtocolHandler for Blobs { + fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { + Box::pin(async move { + crate::provider::handle_connection( + conn.await?, + self.store.clone(), + self.events.clone(), + self.rt.clone(), + ) + .await; + Ok(()) + }) + } + + fn shutdown(self: Arc) -> BoxedFuture<()> { + Box::pin(async move { + self.store.shutdown().await; + }) + } +}