Skip to content

chore: upgrade to [email protected] and [email protected] #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 185 additions & 120 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ genawaiter = { version = "0.99.1", features = ["futures03"] }
hashlink = { version = "0.9.0", optional = true }
hex = "0.4.3"
indicatif = { version = "0.17.8", optional = true }
iroh-base = { version = "0.31" }
iroh-base = { version = "0.32" }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.31", default-features = false }
iroh = "0.31"
iroh = "0.32"
nested_enum_utils = { version = "0.1.0", optional = true }
num_cpus = "1.15.0"
oneshot = "0.1.8"
Expand All @@ -56,7 +56,7 @@ postcard = { version = "1", default-features = false, features = [
] }
quic-rpc = { version = "0.18", optional = true }
quic-rpc-derive = { version = "0.17", optional = true }
quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] }
quinn = { package = "iroh-quinn", version = "0.13", features = ["ring"] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this dep?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wooo no we don't!

rand = "0.8"
range-collections = "0.4.0"
redb = { version = "2.2.0", optional = true }
Expand All @@ -77,11 +77,11 @@ walkdir = { version = "2.5.0", optional = true }

# Examples
console = { version = "0.15.8", optional = true }
tracing-test = "0.2.5"

[dev-dependencies]
http-body = "1.0"
iroh-test = { version = "0.31" }
iroh = { version = "0.31", features = ["test-utils"] }
iroh = { version = "0.32", features = ["test-utils"] }
futures-buffered = "0.2.4"
proptest = "1.0.0"
serde_json = "1.0.107"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Here is a basic example of how to set up `iroh-blobs` with `iroh`:

```rust
use iroh::{protocol::Router, Endpoint};
use iroh_blobs::net_protocol::Blobs;
use iroh_blobs::{store::Store, net_protocol::Blobs};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions examples/custom-protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use anyhow::Result;
use clap::Parser;
use futures_lite::future::Boxed as BoxedFuture;
use iroh::{
endpoint::{get_remote_node_id, Connecting},
endpoint::Connecting,
protocol::{ProtocolHandler, Router},
Endpoint, NodeId,
};
Expand Down Expand Up @@ -149,7 +149,7 @@ impl ProtocolHandler for BlobSearch {
// Wait for the connection to be fully established.
let connection = connecting.await?;
// We can get the remote's node id from the connection.
let node_id = get_remote_node_id(&connection)?;
let node_id = connection.remote_node_id()?;
println!("accepted connection from {node_id}");

// Our protocol is a simple request-response protocol, so we expect the
Expand Down
76 changes: 37 additions & 39 deletions examples/fetch-fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@
//! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//!
//! Run the provide-bytes example first. It will give instructions on how to run this example properly.
use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::{Context, Result};
use iroh_blobs::{
get::fsm::{AtInitial, ConnectedNext, EndBlobNext},
hashseq::HashSeq,
protocol::GetRequest,
Hash,
BlobFormat,
};
use iroh_io::ConcatenateSliceWriter;
use tracing_subscriber::{prelude::*, EnvFilter};

mod connect;
use connect::{load_certs, make_client_endpoint};

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
pub fn setup_logging() {
Expand All @@ -29,50 +28,49 @@ pub fn setup_logging() {

#[tokio::main]
async fn main() -> Result<()> {
println!("\nfetch bytes example!");
println!("\nfetch fsm example!");
setup_logging();
let args: Vec<_> = std::env::args().collect();
if args.len() != 4 {
anyhow::bail!("usage: fetch-bytes [HASH] [SOCKET_ADDR] [FORMAT]");
if args.len() != 2 {
anyhow::bail!("usage: fetch-fsm [TICKET]");
}
let hash: Hash = args[1].parse().context("unable to parse [HASH]")?;
let addr: SocketAddr = args[2].parse().context("unable to parse [SOCKET_ADDR]")?;
let format = {
if args[3] != "blob" && args[3] != "collection" {
anyhow::bail!(
"expected either 'blob' or 'collection' for FORMAT argument, got {}",
args[3]
);
}
args[3].clone()
};
let ticket =
iroh_blobs::ticket::BlobTicket::from_str(&args[1]).context("unable to parse [TICKET]")?;

// load tls certificates
// This will error if you have not run the `provide-bytes` example
let roots = load_certs().await?;
let (node, hash, format) = ticket.into_parts();

// create an endpoint to listen for incoming connections
let endpoint = make_client_endpoint(roots)?;
println!("\nlistening on {}", endpoint.local_addr()?);
println!("fetching hash {hash} from {addr}");
let endpoint = iroh::Endpoint::builder()
.relay_mode(iroh::RelayMode::Disabled)
.alpns(vec![connect::EXAMPLE_ALPN.into()])
.bind()
.await?;
println!(
"\nlistening on {:?}",
endpoint.node_addr().await?.direct_addresses
);
println!("fetching hash {hash} from {:?}", node.node_id);

// connect
let connection = endpoint.connect(addr, "localhost")?.await?;

if format == "collection" {
// create a request for a collection
let request = GetRequest::all(hash);
// create the initial state of the finite state machine
let initial = iroh_blobs::get::fsm::start(connection, request);

write_collection(initial).await
} else {
// create a request for a single blob
let request = GetRequest::single(hash);
// create the initial state of the finite state machine
let initial = iroh_blobs::get::fsm::start(connection, request);

write_blob(initial).await
let connection = endpoint.connect(node, connect::EXAMPLE_ALPN).await?;

match format {
BlobFormat::HashSeq => {
// create a request for a collection
let request = GetRequest::all(hash);
// create the initial state of the finite state machine
let initial = iroh_blobs::get::fsm::start(connection, request);

write_collection(initial).await
}
BlobFormat::Raw => {
// create a request for a single blob
let request = GetRequest::single(hash);
// create the initial state of the finite state machine
let initial = iroh_blobs::get::fsm::start(connection, request);

write_blob(initial).await
}
}
}

Expand Down
72 changes: 35 additions & 37 deletions examples/fetch-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//!
//! Run the provide-bytes example first. It will give instructions on how to run this example properly.
use std::{io, net::SocketAddr};
use std::{io, str::FromStr};

use anyhow::{Context, Result};
use bao_tree::io::fsm::BaoContentItem;
Expand All @@ -14,13 +14,12 @@ use iroh_blobs::{
get::fsm::{AtInitial, BlobContentNext, ConnectedNext, EndBlobNext},
hashseq::HashSeq,
protocol::GetRequest,
Hash,
BlobFormat,
};
use tokio::io::AsyncWriteExt;
use tracing_subscriber::{prelude::*, EnvFilter};

mod connect;
use connect::{load_certs, make_client_endpoint};

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
pub fn setup_logging() {
Expand All @@ -36,51 +35,50 @@ async fn main() -> Result<()> {
println!("\nfetch stream example!");
setup_logging();
let args: Vec<_> = std::env::args().collect();
if args.len() != 4 {
anyhow::bail!("usage: fetch-bytes [HASH] [SOCKET_ADDR] [FORMAT]");
if args.len() != 2 {
anyhow::bail!("usage: fetch-stream [TICKET]");
}
let hash: Hash = args[1].parse().context("unable to parse [HASH]")?;
let addr: SocketAddr = args[2].parse().context("unable to parse [SOCKET_ADDR]")?;
let format = {
if args[3] != "blob" && args[3] != "collection" {
anyhow::bail!(
"expected either 'blob' or 'collection' for FORMAT argument, got {}",
args[3]
);
}
args[3].clone()
};
let ticket =
iroh_blobs::ticket::BlobTicket::from_str(&args[1]).context("unable to parse [TICKET]")?;

// load tls certificates
// This will error if you have not run the `provide-bytes` example
let roots = load_certs().await?;
let (node, hash, format) = ticket.into_parts();

// create an endpoint to listen for incoming connections
let endpoint = make_client_endpoint(roots)?;
println!("\nlistening on {}", endpoint.local_addr()?);
println!("fetching hash {hash} from {addr}");
let endpoint = iroh::Endpoint::builder()
.relay_mode(iroh::RelayMode::Disabled)
.alpns(vec![connect::EXAMPLE_ALPN.into()])
.bind()
.await?;
println!(
"\nlistening on {:?}",
endpoint.node_addr().await?.direct_addresses
);
println!("fetching hash {hash} from {:?}", node.node_id);

// connect
let connection = endpoint.connect(addr, "localhost")?.await?;
let connection = endpoint.connect(node, connect::EXAMPLE_ALPN).await?;

let mut stream = if format == "collection" {
// create a request for a collection
let request = GetRequest::all(hash);
let mut stream = match format {
BlobFormat::HashSeq => {
// create a request for a collection
let request = GetRequest::all(hash);

// create the initial state of the finite state machine
let initial = iroh_blobs::get::fsm::start(connection, request);
// create the initial state of the finite state machine
let initial = iroh_blobs::get::fsm::start(connection, request);

// create a stream that yields all the data of the blob
stream_children(initial).boxed_local()
} else {
// create a request for a single blob
let request = GetRequest::single(hash);
// create a stream that yields all the data of the blob
stream_children(initial).boxed_local()
}
BlobFormat::Raw => {
// create a request for a single blob
let request = GetRequest::single(hash);

// create the initial state of the finite state machine
let initial = iroh_blobs::get::fsm::start(connection, request);
// create the initial state of the finite state machine
let initial = iroh_blobs::get::fsm::start(connection, request);

// create a stream that yields all the data of the blob
stream_blob(initial).boxed_local()
// create a stream that yields all the data of the blob
stream_blob(initial).boxed_local()
}
};
while let Some(item) = stream.next().await {
let item = item?;
Expand Down
32 changes: 16 additions & 16 deletions examples/provide-bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
//! cargo run --example provide-bytes collection
//! To provide a collection (multiple blobs)
use anyhow::Result;
use iroh_blobs::{format::collection::Collection, util::local_pool::LocalPool, Hash};
use iroh_blobs::{format::collection::Collection, util::local_pool::LocalPool, BlobFormat, Hash};
use tracing::warn;
use tracing_subscriber::{prelude::*, EnvFilter};

mod connect;
use connect::{make_and_write_certs, make_server_endpoint, CERT_PATH};

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
pub fn setup_logging() {
Expand Down Expand Up @@ -45,7 +44,7 @@ async fn main() -> Result<()> {
};
println!("\nprovide bytes {format} example!");

let (db, hash) = if format == "collection" {
let (db, hash, format) = if format == "collection" {
let (mut db, names) = iroh_blobs::store::readonly_mem::Store::new([
("blob1", b"the first blob of bytes".to_vec()),
("blob2", b"the second blob of bytes".to_vec()),
Expand All @@ -56,28 +55,31 @@ async fn main() -> Result<()> {
.collect();
// add it to the db
let hash = db.insert_many(collection.to_blobs()).unwrap();
(db, hash)
(db, hash, BlobFormat::HashSeq)
} else {
// create a new database and add a blob
let (db, names) =
iroh_blobs::store::readonly_mem::Store::new([("hello", b"Hello World!".to_vec())]);

// get the hash of the content
let hash = names.get("hello").unwrap();
(db, Hash::from(hash.as_bytes()))
(db, Hash::from(hash.as_bytes()), BlobFormat::Raw)
};

// create tls certs and save to CERT_PATH
let (key, cert) = make_and_write_certs().await?;

// create an endpoint to listen for incoming connections
let endpoint = make_server_endpoint(key, cert)?;
let addr = endpoint.local_addr()?;
println!("\nlistening on {addr}");
let endpoint = iroh::Endpoint::builder()
.relay_mode(iroh::RelayMode::Disabled)
.alpns(vec![connect::EXAMPLE_ALPN.into()])
.bind()
.await?;
let addr = endpoint.node_addr().await?;
println!("\nlistening on {:?}", addr.direct_addresses);
println!("providing hash {hash}");

println!("\nfetch the content using a finite state machine by running the following example:\n\ncargo run --example fetch-fsm {hash} \"{addr}\" {format}");
println!("\nfetch the content using a stream by running the following example:\n\ncargo run --example fetch-stream {hash} \"{addr}\" {format}\n");
let ticket = iroh_blobs::ticket::BlobTicket::new(addr, hash, format)?;

println!("\nfetch the content using a finite state machine by running the following example:\n\ncargo run --example fetch-fsm {ticket}");
println!("\nfetch the content using a stream by running the following example:\n\ncargo run --example fetch-stream {ticket}\n");

// create a new local pool handle with 1 worker thread
let lp = LocalPool::single();
Expand All @@ -100,11 +102,10 @@ async fn main() -> Result<()> {

// spawn a task to handle the connection
tokio::spawn(async move {
let remote_addr = conn.remote_address();
let conn = match conn.await {
Ok(conn) => conn,
Err(err) => {
warn!(%remote_addr, "Error connecting: {err:#}");
warn!("Error connecting: {err:#}");
return;
}
};
Expand All @@ -115,7 +116,6 @@ async fn main() -> Result<()> {

match tokio::signal::ctrl_c().await {
Ok(()) => {
tokio::fs::remove_dir_all(std::path::PathBuf::from(CERT_PATH)).await?;
accept_task.abort();
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ impl DialerT for Dialer {
#[derive(Debug)]
struct Dialer {
endpoint: Endpoint,
pending: JoinSet<(NodeId, anyhow::Result<quinn::Connection>)>,
pending: JoinSet<(NodeId, anyhow::Result<endpoint::Connection>)>,
pending_dials: HashMap<NodeId, CancellationToken>,
}

Expand Down Expand Up @@ -1572,7 +1572,7 @@ impl Dialer {
}

impl Stream for Dialer {
type Item = (NodeId, anyhow::Result<quinn::Connection>);
type Item = (NodeId, anyhow::Result<endpoint::Connection>);

fn poll_next(
mut self: Pin<&mut Self>,
Expand Down
Loading
Loading