Skip to content

feat: update to new protocolhandler #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 56 additions & 89 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,5 @@ panic = 'abort'
incremental = false

[patch.crates-io]
# iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" }
# iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
# iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh" }
iroh = { git = "https://github.com/n0-computer/iroh" }
2 changes: 1 addition & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ ignore = [

[sources]
allow-git = [
# "https://github.com/n0-computer/iroh.git",
"https://github.com/n0-computer/iroh.git",
]
5 changes: 3 additions & 2 deletions examples/custom-protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ impl ProtocolHandler for BlobSearch {
///
/// The returned future runs on a newly spawned tokio task, so it can run as long as
/// the connection lasts.
fn accept(self: Arc<Self>, connecting: Connecting) -> BoxedFuture<Result<()>> {
fn accept(&self, connecting: Connecting) -> BoxedFuture<Result<()>> {
let this = self.clone();
// We have to return a boxed future from the handler.
Box::pin(async move {
// Wait for the connection to be fully established.
Expand All @@ -162,7 +163,7 @@ impl ProtocolHandler for BlobSearch {

// Now, we can perform the actual query on our local database.
let query = String::from_utf8(query_bytes)?;
let hashes = self.query_local(&query);
let hashes = this.query_local(&query);

// We want to return a list of hashes. We do the simplest thing possible, and just send
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is
Expand Down
88 changes: 14 additions & 74 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ impl Default for GcState {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Blobs<S> {
rt: LocalPoolHandle,
pub(crate) store: S,
events: EventSender,
downloader: Downloader,
#[cfg(feature = "rpc")]
batches: tokio::sync::Mutex<BlobBatches>,
batches: Arc<tokio::sync::Mutex<BlobBatches>>,
endpoint: Endpoint,
gc_state: Arc<std::sync::Mutex<GcState>>,
#[cfg(feature = "rpc")]
Expand Down Expand Up @@ -131,15 +131,15 @@ impl<S: crate::store::Store> Builder<S> {

/// Build the Blobs protocol handler.
/// You need to provide a local pool handle and an endpoint.
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc<Blobs<S>> {
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Blobs<S> {
let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
Arc::new(Blobs::new(
Blobs::new(
self.store,
rt.clone(),
self.events.unwrap_or_default(),
downloader,
endpoint.clone(),
))
)
}
}

Expand Down Expand Up @@ -391,82 +391,22 @@ impl<S: crate::store::Store> Blobs<S> {
}
}

// trait BlobsInner: Debug + Send + Sync + 'static {
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()>;
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
// fn client(self: Arc<Self>) -> MemClient;
// fn local_pool_handle(&self) -> &LocalPoolHandle;
// fn downloader(&self) -> &Downloader;
// }

// #[derive(Debug)]
// struct Blobs2 {
// inner: Arc<dyn BlobsInner>,
// }

// impl Blobs2 {
// fn client(&self) -> MemClient {
// self.inner.clone().client()
// }

// fn local_pool_handle(&self) -> &LocalPoolHandle {
// self.inner.local_pool_handle()
// }

// fn downloader(&self) -> &Downloader {
// self.inner.downloader()
// }
// }

// impl<S: crate::store::Store> BlobsInner for Blobs<S> {
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
// ProtocolHandler::shutdown(self)
// }

// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
// ProtocolHandler::accept(self, conn)
// }

// fn client(self: Arc<Self>) -> MemClient {
// Blobs::client(self)
// }

// fn local_pool_handle(&self) -> &LocalPoolHandle {
// self.rt()
// }

// fn downloader(&self) -> &Downloader {
// self.downloader()
// }
// }

// impl ProtocolHandler for Blobs2 {
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
// self.inner.clone().accept(conn)
// }

// fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
// self.inner.clone().shutdown()
// }
// }

impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
let db = self.store.clone();
let events = self.events.clone();
let rt = self.rt.clone();

Box::pin(async move {
crate::provider::handle_connection(
conn.await?,
self.store.clone(),
self.events.clone(),
self.rt.clone(),
)
.await;
crate::provider::handle_connection(conn.await?, db, events, rt).await;
Ok(())
})
}

fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
fn shutdown(&self) -> BoxedFuture<()> {
let store = self.store.clone();
Box::pin(async move {
self.store.shutdown().await;
store.shutdown().await;
})
}
}
Expand Down
32 changes: 6 additions & 26 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use std::{
io,
ops::Deref,
sync::{Arc, Mutex},
};

Expand Down Expand Up @@ -63,48 +62,29 @@ const RPC_BLOB_GET_CHANNEL_CAP: usize = 2;

impl<D: crate::store::Store> Blobs<D> {
/// Get a client for the blobs protocol
pub fn client(self: Arc<Self>) -> blobs::MemClient {
pub fn client(&self) -> blobs::MemClient {
let client = self
.rpc_handler
.get_or_init(|| RpcHandler::new(&self))
.get_or_init(|| RpcHandler::new(self))
.client
.clone();
blobs::Client::new(client)
}

/// Handle an RPC request
pub async fn handle_rpc_request<C>(
self: Arc<Self>,
self,
msg: Request,
chan: RpcChannel<RpcService, C>,
) -> std::result::Result<(), RpcServerError<C>>
where
C: ChannelTypes<RpcService>,
{
use Request::*;
let handler = Handler(self);
match msg {
Blobs(msg) => handler.handle_blobs_request(msg, chan).await,
Tags(msg) => handler.handle_tags_request(msg, chan).await,
Request::Blobs(msg) => self.handle_blobs_request(msg, chan).await,
Request::Tags(msg) => self.handle_tags_request(msg, chan).await,
}
}
}

#[derive(Clone)]
struct Handler<S>(Arc<Blobs<S>>);

impl<S> Deref for Handler<S> {
type Target = Blobs<S>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<D: crate::store::Store> Handler<D> {
fn store(&self) -> &D {
&self.0.store
}

/// Handle a tags request
pub async fn handle_tags_request<C>(
Expand Down Expand Up @@ -903,7 +883,7 @@ pub(crate) struct RpcHandler {
}

impl RpcHandler {
fn new<D: crate::store::Store>(blobs: &Arc<Blobs<D>>) -> Self {
fn new<D: crate::store::Store>(blobs: &Blobs<D>) -> Self {
let blobs = blobs.clone();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
Expand Down
6 changes: 3 additions & 3 deletions src/rpc/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ mod tests {

mod node {
//! An iroh node that just has the blobs transport
use std::{path::Path, sync::Arc};
use std::path::Path;

use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId};
use tokio_util::task::AbortOnDropHandle;
Expand Down Expand Up @@ -1068,13 +1068,13 @@ mod tests {
// Setup blobs
let downloader =
Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone());
let blobs = Arc::new(Blobs::new(
let blobs = Blobs::new(
store.clone(),
local_pool.handle().clone(),
events,
downloader,
endpoint.clone(),
));
);
router = router.accept(crate::ALPN, blobs.clone());

// Build the router
Expand Down
3 changes: 1 addition & 2 deletions tests/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
io,
io::{Cursor, Write},
path::PathBuf,
sync::Arc,
time::Duration,
};

Expand Down Expand Up @@ -41,7 +40,7 @@ use tokio::io::AsyncReadExt;
#[derive(Debug)]
pub struct Node<S> {
pub router: iroh::protocol::Router,
pub blobs: Arc<Blobs<S>>,
pub blobs: Blobs<S>,
pub store: S,
pub _local_pool: LocalPool,
}
Expand Down
Loading