Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ async fn main() -> anyhow::Result<()> {

// create a protocol handler using an in-memory blob store.
let store = MemStore::new();
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
let blobs = BlobsProtocol::new(&store, None);

// build the router
let router = Router::builder(endpoint)
.accept(iroh_blobs::ALPN, blobs.clone())
.spawn();

let tag = blobs.add_slice(b"Hello world").await?;
println!("We are now serving {}", blobs.ticket(tag).await?);
let _ = endpoint.online().await;
let addr = endpoint.node_addr().initialized().await;
let ticket = BlobTicket::new(addr, tag.hash, tag.format);
println!("We are now serving {}", ticket);

// wait for control-c
tokio::signal::ctrl_c().await;
Expand Down
2 changes: 1 addition & 1 deletion examples/custom-protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn listen(text: Vec<String>) -> Result<()> {
proto.insert_and_index(text).await?;
}
// Build the iroh-blobs protocol handler, which is used to download blobs.
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
let blobs = BlobsProtocol::new(&store, None);

// create a router that handles both our custom protocol and the iroh-blobs protocol.
let node = Router::builder(endpoint)
Expand Down
4 changes: 2 additions & 2 deletions examples/expiring-tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ async fn main() -> anyhow::Result<()> {
let expires_at = SystemTime::now()
.checked_add(Duration::from_secs(10))
.unwrap();
create_expiring_tag(&store, &[*a.hash(), *b.hash()], "expiring", expires_at).await?;
create_expiring_tag(&store, &[a.hash(), b.hash()], "expiring", expires_at).await?;

// add a single blob and tag it with an expiry date 60 seconds in the future
let c = batch.add_bytes("blob 3".as_bytes()).await?;
let expires_at = SystemTime::now()
.checked_add(Duration::from_secs(60))
.unwrap();
create_expiring_tag(&store, &[*c.hash()], "expiring", expires_at).await?;
create_expiring_tag(&store, &[c.hash()], "expiring", expires_at).await?;
// batch goes out of scope, so data is only protected by the tags we created
}

Expand Down
2 changes: 1 addition & 1 deletion examples/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ async fn setup(store: MemStore, events: EventSender) -> Result<(Router, NodeAddr
.await?;
let _ = endpoint.home_relay().initialized().await;
let addr = endpoint.node_addr().initialized().await;
let blobs = BlobsProtocol::new(&store, endpoint.clone(), Some(events));
let blobs = BlobsProtocol::new(&store, Some(events));
let router = Router::builder(endpoint)
.accept(iroh_blobs::ALPN, blobs)
.spawn();
Expand Down
2 changes: 1 addition & 1 deletion examples/mdns-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn accept(path: &Path) -> Result<()> {
.await?;
let builder = Router::builder(endpoint.clone());
let store = MemStore::new();
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
let blobs = BlobsProtocol::new(&store, None);
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let node = builder.spawn();

Expand Down
2 changes: 1 addition & 1 deletion examples/random_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ async fn provide(args: ProvideArgs) -> anyhow::Result<()> {
.bind()
.await?;
let (dump_task, events_tx) = dump_provider_events(args.allow_push);
let blobs = iroh_blobs::BlobsProtocol::new(&store, endpoint.clone(), Some(events_tx));
let blobs = iroh_blobs::BlobsProtocol::new(&store, Some(events_tx));
let router = iroh::protocol::Router::builder(endpoint.clone())
.accept(iroh_blobs::ALPN, blobs)
.spawn();
Expand Down
8 changes: 4 additions & 4 deletions examples/transfer-collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Node {

// this BlobsProtocol accepts connections from other nodes and serves blobs from the store
// we pass None to skip subscribing to request events
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
let blobs = BlobsProtocol::new(&store, None);
// Routers group one or more protocols together to accept connections from other nodes,
// here we're only using one, but could add more in a real world use case as needed
let router = Router::builder(endpoint)
Expand Down Expand Up @@ -80,14 +80,14 @@ impl Node {

let collection_items = collection_items
.iter()
.map(|(name, tag)| (name.to_string(), *tag.hash()))
.map(|(name, tag)| (name.to_string(), tag.hash()))
.collect::<Vec<_>>();

let collection = Collection::from_iter(collection_items);

let tt = collection.store(&self.store).await?;
self.store.tags().create(*tt.hash_and_format()).await?;
Ok(*tt.hash())
self.store.tags().create(tt.hash_and_format()).await?;
Ok(tt.hash())
}

/// retrieve an entire collection from a given hash and provider
Expand Down
2 changes: 1 addition & 1 deletion examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async fn main() -> anyhow::Result<()> {
// We initialize an in-memory backing store for iroh-blobs
let store = MemStore::new();
// Then we initialize a struct that can accept blobs requests over iroh connections
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
let blobs = BlobsProtocol::new(&store, None);

// Grab all passed in arguments, the first one is the binary itself, so we skip it.
let args: Vec<String> = std::env::args().skip(1).collect();
Expand Down
8 changes: 4 additions & 4 deletions src/api/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,20 +656,20 @@ impl<'a> AddProgress<'a> {
pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
let blobs = self.blobs.clone();
let tt = self.temp_tag().await?;
let haf = *tt.hash_and_format();
let haf = tt.hash_and_format();
let tags = Tags::ref_from_sender(&blobs.client);
tags.set(name, *tt.hash_and_format()).await?;
tags.set(name, haf).await?;
drop(tt);
Ok(haf)
}

pub async fn with_tag(self) -> RequestResult<TagInfo> {
let blobs = self.blobs.clone();
let tt = self.temp_tag().await?;
let hash = *tt.hash();
let hash = tt.hash();
let format = tt.format();
let tags = Tags::ref_from_sender(&blobs.client);
let name = tags.create(*tt.hash_and_format()).await?;
let name = tags.create(tt.hash_and_format()).await?;
drop(tt);
Ok(TagInfo { name, hash, format })
}
Expand Down
2 changes: 1 addition & 1 deletion src/api/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ mod tests {
let store = FsStore::load(td.path().join("blobs.db")).await?;
let blobs = store.blobs();
let tt = blobs.add_slice(b"test").temp_tag().await?;
let hash = *tt.hash();
let hash = tt.hash();
let info = store.remote().local(hash).await?;
assert_eq!(info.bitfield.ranges, ChunkRanges::all());
assert_eq!(info.local_bytes(), 4);
Expand Down
2 changes: 1 addition & 1 deletion src/format/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl Collection {
let (links, meta) = self.into_parts();
let meta_bytes = postcard::to_stdvec(&meta)?;
let meta_tag = db.add_bytes(meta_bytes).temp_tag().await?;
let links_bytes = std::iter::once(*meta_tag.hash())
let links_bytes = std::iter::once(meta_tag.hash())
.chain(links)
.collect::<HashSeq>();
let links_tag = db
Expand Down
8 changes: 1 addition & 7 deletions src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,7 @@ pub mod fsm {
///
/// This is similar to [`bao_tree::io::DecodeError`], but takes into account
/// that we are reading from a [`RecvStream`], so read errors will be
/// propagated as [`DecodeError::Read`], containing a [`ReadError`].
/// This carries more concrete information about the error than an [`io::Error`].
/// propagated as [`DecodeError::Read`], containing a [`io::Error`].
///
/// When the provider finds that it does not have a chunk that we requested,
/// or that the chunk is invalid, it will stop sending data without producing
Expand All @@ -646,11 +645,6 @@ pub mod fsm {
/// variants indicate that the provider has sent us invalid data. A well-behaved
/// provider should never do this, so this is an indication that the provider is
/// not behaving correctly.
///
/// The [`DecodeError::DecodeIo`] variant is just a fallback for any other io error that
/// is not actually a [`DecodeError::Read`].
///
/// [`ReadError`]: endpoint::ReadError
#[common_fields({
backtrace: Option<Backtrace>,
#[snafu(implicit)]
Expand Down
28 changes: 6 additions & 22 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
//! let endpoint = Endpoint::builder().discovery_n0().bind().await?;
//!
//! // create a blobs protocol handler
//! let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
//! let blobs = BlobsProtocol::new(&store, None);
//!
//! // create a router and add the blobs protocol handler
//! let router = Router::builder(endpoint)
//! .accept(iroh_blobs::ALPN, blobs.clone())
//! .spawn();
//!
//! endpoint.online().await;
//! let addr = endpoint.node_addr().initialized().await;
//! // this data is now globally available using the ticket
//! let ticket = blobs.ticket(t).await?;
//! let ticket = BlobTicket::new(addr, t.hash, t.format).await?;
//! println!("ticket: {}", ticket);
//!
//! // wait for control-c to exit
Expand All @@ -41,16 +43,14 @@ use std::{fmt::Debug, ops::Deref, sync::Arc};
use iroh::{
endpoint::Connection,
protocol::{AcceptError, ProtocolHandler},
Endpoint, Watcher,
};
use tracing::error;

use crate::{api::Store, provider::events::EventSender, ticket::BlobTicket, HashAndFormat};
use crate::{api::Store, provider::events::EventSender};

#[derive(Debug)]
pub(crate) struct BlobsInner {
pub(crate) store: Store,
pub(crate) endpoint: Endpoint,
pub(crate) events: EventSender,
}

Expand All @@ -69,11 +69,10 @@ impl Deref for BlobsProtocol {
}

impl BlobsProtocol {
pub fn new(store: &Store, endpoint: Endpoint, events: Option<EventSender>) -> Self {
pub fn new(store: &Store, events: Option<EventSender>) -> Self {
Self {
inner: Arc::new(BlobsInner {
store: store.clone(),
endpoint,
events: events.unwrap_or(EventSender::DEFAULT),
}),
}
Expand All @@ -82,21 +81,6 @@ impl BlobsProtocol {
pub fn store(&self) -> &Store {
&self.inner.store
}

pub fn endpoint(&self) -> &Endpoint {
&self.inner.endpoint
}

/// Create a ticket for content on this node.
///
/// Note that this does not check whether the content is partially or fully available. It is
/// just a convenience method to create a ticket from content and the address of this node.
pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
let content = content.into();
let addr = self.inner.endpoint.node_addr().initialized().await;
let ticket = BlobTicket::new(addr, content.hash, content.format);
Ok(ticket)
}
}

impl ProtocolHandler for BlobsProtocol {
Expand Down
22 changes: 11 additions & 11 deletions src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,7 @@ pub mod tests {
let stream = bytes_to_stream(expected.clone(), 1023);
let obs = store.observe(expected_hash);
let tt = store.add_stream(stream).await.temp_tag().await?;
assert_eq!(expected_hash, *tt.hash());
assert_eq!(expected_hash, tt.hash());
// we must at some point see completion, otherwise the test will hang
obs.await_completion().await?;
let actual = store.get_bytes(expected_hash).await?;
Expand Down Expand Up @@ -2043,8 +2043,8 @@ pub mod tests {
.await?
.collect::<HashSet<_>>()
.await;
assert!(tts.contains(tt1.hash_and_format()));
assert!(tts.contains(tt2.hash_and_format()));
assert!(tts.contains(&tt1.hash_and_format()));
assert!(tts.contains(&tt2.hash_and_format()));
drop(batch);
store.sync_db().await?;
store.wait_idle().await?;
Expand All @@ -2055,8 +2055,8 @@ pub mod tests {
.collect::<HashSet<_>>()
.await;
// temp tag went out of scope, so it does not work anymore
assert!(!tts.contains(tt1.hash_and_format()));
assert!(!tts.contains(tt2.hash_and_format()));
assert!(!tts.contains(&tt1.hash_and_format()));
assert!(!tts.contains(&tt2.hash_and_format()));
drop(tt1);
drop(tt2);
Ok(())
Expand Down Expand Up @@ -2089,29 +2089,29 @@ pub mod tests {
let data = vec![0u8; size];
let data = Bytes::from(data);
let tt = store.add_bytes(data.clone()).temp_tag().await?;
data_by_hash.insert(*tt.hash(), data);
data_by_hash.insert(tt.hash(), data);
hashes.push(tt);
}
store.sync_db().await?;
for tt in &hashes {
let hash = *tt.hash();
let hash = tt.hash();
let path = testdir.path().join(format!("{hash}.txt"));
store.export(hash, path).await?;
}
for tt in &hashes {
let hash = tt.hash();
let data = store
.export_bao(*hash, ChunkRanges::all())
.export_bao(hash, ChunkRanges::all())
.data_to_vec()
.await
.unwrap();
assert_eq!(data, data_by_hash[hash].to_vec());
assert_eq!(data, data_by_hash[&hash].to_vec());
let bao = store
.export_bao(*hash, ChunkRanges::all())
.export_bao(hash, ChunkRanges::all())
.bao_to_vec()
.await
.unwrap();
bao_by_hash.insert(*hash, bao);
bao_by_hash.insert(hash, bao);
}
store.dump().await?;

Expand Down
26 changes: 13 additions & 13 deletions src/store/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,15 @@ mod tests {
let ft = blobs.add_slice("f").temp_tag().await?;
let gt = blobs.add_slice("g").temp_tag().await?;
let ht = blobs.add_slice("h").with_named_tag("h").await?;
let a = *at.hash();
let b = *bt.hash();
let c = *ct.hash();
let d = *dt.hash();
let e = *et.hash();
let f = *ft.hash();
let g = *gt.hash();
let a = at.hash();
let b = bt.hash();
let c = ct.hash();
let d = dt.hash();
let e = et.hash();
let f = ft.hash();
let g = gt.hash();
let h = ht.hash;
store.tags().set("c", *ct.hash_and_format()).await?;
store.tags().set("c", ct.hash_and_format()).await?;
let dehs = [d, e].into_iter().collect::<HashSeq>();
let hehs = blobs
.add_bytes_with_opts(AddBytesOptions {
Expand All @@ -287,7 +287,7 @@ mod tests {
})
.temp_tag()
.await?;
store.tags().set("fg", *fghs.hash_and_format()).await?;
store.tags().set("fg", fghs.hash_and_format()).await?;
drop(fghs);
drop(bt);
store.tags().delete("h").await?;
Expand Down Expand Up @@ -335,11 +335,11 @@ mod tests {
.temp_tag()
.await?;
let ah = a.hash();
let data_path = options.data_path(ah);
let outboard_path = options.outboard_path(ah);
let data_path = options.data_path(&ah);
let outboard_path = options.outboard_path(&ah);
assert!(data_path.exists());
assert!(outboard_path.exists());
assert!(store.has(*ah).await?);
assert!(store.has(ah).await?);
drop(a);
gc_run_once(store, &mut live).await?;
assert!(!data_path.exists());
Expand Down Expand Up @@ -410,7 +410,7 @@ mod tests {

async fn gc_check_deletion(store: &Store) -> TestResult {
let temp_tag = store.add_bytes(b"foo".to_vec()).temp_tag().await?;
let hash = *temp_tag.hash();
let hash = temp_tag.hash();
assert_eq!(store.get_bytes(hash).await?.as_ref(), b"foo");
drop(temp_tag);
let mut live = HashSet::new();
Expand Down
2 changes: 1 addition & 1 deletion src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ mod tests {
async fn smoke() -> TestResult<()> {
let store = MemStore::new();
let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
let hash = *tt.hash();
let hash = tt.hash();
println!("hash: {hash:?}");
let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
while let Some(item) = stream.next().await {
Expand Down
Loading
Loading