Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ futures-util = "0.3.30"
testdir = "0.9.1"

[features]
default = ["fs-store", "rpc", "net_protocol"]
default = ["fs-store", "net_protocol"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
net_protocol = ["downloader"]
net_protocol = ["downloader", "dep:futures-util"]
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
metrics = ["iroh-metrics/metrics"]
redb = ["dep:redb"]
Expand Down Expand Up @@ -136,6 +136,7 @@ name = "fetch-stream"

[[example]]
name = "transfer"
required-features = ["rpc"]

[[example]]
name = "hello-world-fetch"
Expand Down
31 changes: 16 additions & 15 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,7 @@
// TODO: reduce API surface and add documentation
#![allow(missing_docs)]

use std::{
collections::{BTreeMap, BTreeSet},
fmt::Debug,
ops::DerefMut,
sync::{Arc, OnceLock},
};
use std::{collections::BTreeSet, fmt::Debug, ops::DerefMut, sync::Arc};

use anyhow::{anyhow, bail, Result};
use futures_lite::future::Boxed as BoxedFuture;
Expand All @@ -31,7 +26,7 @@ use crate::{
progress::{AsyncChannelProgressSender, ProgressSender},
SetTagOption,
},
HashAndFormat, TempTag,
HashAndFormat,
};

/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
Expand All @@ -58,32 +53,33 @@ pub struct Blobs<S> {
pub(crate) store: S,
events: EventSender,
downloader: Downloader,
#[cfg(feature = "rpc")]
batches: tokio::sync::Mutex<BlobBatches>,
endpoint: Endpoint,
gc_state: Arc<std::sync::Mutex<GcState>>,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<OnceLock<crate::rpc::RpcHandler>>,
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
}

/// Name used for logging when new node addresses are added from gossip.
const BLOB_DOWNLOAD_SOURCE_NAME: &str = "blob_download";

/// Keeps track of all the currently active batch operations of the blobs api.
#[cfg(feature = "rpc")]
#[derive(Debug, Default)]
pub(crate) struct BlobBatches {
/// Currently active batches
batches: BTreeMap<BatchId, BlobBatch>,
batches: std::collections::BTreeMap<BatchId, BlobBatch>,
/// Used to generate new batch ids.
max: u64,
}

/// A single batch of blob operations
#[cfg(feature = "rpc")]
#[derive(Debug, Default)]
struct BlobBatch {
/// The tags in this batch.
tags: BTreeMap<HashAndFormat, Vec<TempTag>>,
tags: std::collections::BTreeMap<HashAndFormat, Vec<crate::TempTag>>,
}

#[cfg(feature = "rpc")]
impl BlobBatches {
/// Create a new unique batch id.
pub fn create(&mut self) -> BatchId {
Expand All @@ -93,7 +89,7 @@ impl BlobBatches {
}

/// Store a temp tag in a batch identified by a batch id.
pub fn store(&mut self, batch: BatchId, tt: TempTag) {
pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) {
let entry = self.batches.entry(batch).or_default();
entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
}
Expand Down Expand Up @@ -187,10 +183,11 @@ impl<S: crate::store::Store> Blobs<S> {
events,
downloader,
endpoint,
#[cfg(feature = "rpc")]
batches: Default::default(),
gc_state: Default::default(),
#[cfg(feature = "rpc")]
rpc_handler: Arc::new(OnceLock::new()),
rpc_handler: Arc::new(Default::default()),
}
}

Expand Down Expand Up @@ -252,6 +249,7 @@ impl<S: crate::store::Store> Blobs<S> {
Ok(())
}

#[cfg(feature = "rpc")]
pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
self.batches.lock().await
}
Expand Down Expand Up @@ -303,6 +301,9 @@ impl<S: crate::store::Store> Blobs<S> {
nodes: Vec<NodeAddr>,
progress: AsyncChannelProgressSender<DownloadProgress>,
) -> Result<Stats> {
/// Name used for logging when new node addresses are added from gossip.
const BLOB_DOWNLOAD_SOURCE_NAME: &str = "blob_download";

let mut node_ids = Vec::with_capacity(nodes.len());
let mut any_added = false;
for node in nodes {
Expand Down
2 changes: 1 addition & 1 deletion tests/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![cfg(feature = "net_protocol")]
#![cfg(all(feature = "net_protocol", feature = "rpc"))]
use std::{
sync::{Arc, Mutex},
time::Duration,
Expand Down
Loading