diff --git a/iroh/src/magicsock/remote_map.rs b/iroh/src/magicsock/remote_map.rs index 0861a039b0..7db34204ab 100644 --- a/iroh/src/magicsock/remote_map.rs +++ b/iroh/src/magicsock/remote_map.rs @@ -2,20 +2,22 @@ use std::{ collections::{BTreeSet, hash_map}, hash::Hash, net::{IpAddr, SocketAddr}, + ops::DerefMut, sync::{Arc, Mutex}, time::Duration, }; use iroh_base::{EndpointId, RelayUrl}; +use n0_future::task::JoinSet; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; -use tracing::warn; +use tracing::{debug, error, warn}; pub(crate) use self::remote_state::PathsWatcher; +use self::remote_state::RemoteStateActor; pub(super) use self::remote_state::RemoteStateMessage; pub use self::remote_state::{PathInfo, PathInfoList}; -use self::remote_state::{RemoteStateActor, RemoteStateHandle}; use super::{ DirectAddr, DiscoState, MagicsockMetrics, mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr}, @@ -45,7 +47,7 @@ pub(crate) struct RemoteMap { // State we keep about remote endpoints. // /// The actors tracking each remote endpoint. - actor_handles: Mutex>, + actor_senders: Mutex>>, /// The mapping between [`EndpointId`]s and [`EndpointIdMappedAddr`]s. pub(super) endpoint_mapped_addrs: AddrMap, /// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s. @@ -57,10 +59,12 @@ pub(crate) struct RemoteMap { /// The endpoint ID of the local endpoint. local_endpoint_id: EndpointId, metrics: Arc, - local_addrs: n0_watcher::Direct>, + /// The "direct" addresses known for our local endpoint + local_direct_addrs: n0_watcher::Direct>, disco: DiscoState, sender: TransportsSender, discovery: ConcurrentDiscovery, + actor_tasks: Mutex)>>, } impl RemoteMap { @@ -69,21 +73,22 @@ impl RemoteMap { local_endpoint_id: EndpointId, metrics: Arc, - local_addrs: n0_watcher::Direct>, + local_direct_addrs: n0_watcher::Direct>, disco: DiscoState, sender: TransportsSender, discovery: ConcurrentDiscovery, ) -> Self { Self { - actor_handles: Mutex::new(FxHashMap::default()), + actor_senders: Mutex::new(FxHashMap::default()), endpoint_mapped_addrs: Default::default(), relay_mapped_addrs: Default::default(), local_endpoint_id, metrics, - local_addrs, + local_direct_addrs, disco, sender, discovery, + actor_tasks: Default::default(), } } @@ -96,8 +101,34 @@ impl RemoteMap { /// This should be called periodically to remove handles to endpoint state actors /// that have shutdown after their idle timeout expired. pub(super) fn remove_closed_remote_state_actors(&self) { - let mut handles = self.actor_handles.lock().expect("poisoned"); - handles.retain(|_eid, handle| !handle.sender.is_closed()) + let mut senders = self.actor_senders.lock().expect("poisoned"); + senders.retain(|_eid, sender| !sender.is_closed()); + while let Some(result) = self.actor_tasks.lock().expect("poisoned").try_join_next() { + match result { + Ok((eid, leftover_msgs)) => { + let entry = senders.entry(eid); + if leftover_msgs.is_empty() { + match entry { + hash_map::Entry::Occupied(occupied_entry) => occupied_entry.remove(), + hash_map::Entry::Vacant(_) => { + panic!("this should be impossible TODO(matheus23)"); + } + }; + } else { + // The remote actor got messages while it was closing, so we're restarting + debug!(%eid, "restarting terminated remote state actor: messages received during shutdown"); + let sender = self.start_remote_state_actor(eid, leftover_msgs); + entry.insert_entry(sender); + } + } + Err(err) => { + if let Ok(panic) = err.try_into_panic() { + error!("RemoteStateActor panicked."); + std::panic::resume_unwind(panic); + } + } + } + } } /// Returns the sender for the [`RemoteStateActor`]. @@ -106,21 +137,12 @@ impl RemoteMap { /// /// [`RemoteStateActor`]: remote_state::RemoteStateActor pub(super) fn remote_state_actor(&self, eid: EndpointId) -> mpsc::Sender { - let mut handles = self.actor_handles.lock().expect("poisoned"); + let mut handles = self.actor_senders.lock().expect("poisoned"); match handles.entry(eid) { - hash_map::Entry::Occupied(mut entry) => { - if let Some(sender) = entry.get().sender.get() { - sender - } else { - // The actor is dead: Start a new actor. - let (handle, sender) = self.start_remote_state_actor(eid); - entry.insert(handle); - sender - } - } + hash_map::Entry::Occupied(entry) => entry.get().clone(), hash_map::Entry::Vacant(entry) => { - let (handle, sender) = self.start_remote_state_actor(eid); - entry.insert(handle); + let sender = self.start_remote_state_actor(eid, vec![]); + entry.insert(sender.clone()); sender } } @@ -132,22 +154,24 @@ impl RemoteMap { fn start_remote_state_actor( &self, eid: EndpointId, - ) -> (RemoteStateHandle, mpsc::Sender) { + initial_msgs: Vec, + ) -> mpsc::Sender { // Ensure there is a RemoteMappedAddr for this EndpointId. self.endpoint_mapped_addrs.get(&eid); - let handle = RemoteStateActor::new( + RemoteStateActor::new( eid, self.local_endpoint_id, - self.local_addrs.clone(), + self.local_direct_addrs.clone(), self.disco.clone(), self.relay_mapped_addrs.clone(), self.metrics.clone(), self.sender.clone(), self.discovery.clone(), ) - .start(); - let sender = handle.sender.get().expect("just created"); - (handle, sender) + .start( + initial_msgs, + self.actor_tasks.lock().expect("poisoned").deref_mut(), + ) } pub(super) fn handle_ping(&self, msg: disco::Ping, sender: EndpointId, src: transports::Addr) { diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index f7f13f9557..e503abe16b 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -7,10 +7,11 @@ use std::{ }; use iroh_base::{EndpointId, RelayUrl, TransportAddr}; +use n0_error::StackResultExt; use n0_future::{ Either, FuturesUnordered, MergeUnbounded, Stream, StreamExt, boxed::BoxStream, - task::{self, AbortOnDropHandle}, + task::JoinSet, time::{self, Duration, Instant}, }; use n0_watcher::{Watchable, Watcher}; @@ -19,14 +20,11 @@ use quinn_proto::{PathError, PathEvent, PathId, PathStatus}; use rustc_hash::FxHashMap; use smallvec::SmallVec; use sync_wrapper::SyncStream; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn}; -use self::{ - guarded_channel::{GuardedReceiver, GuardedSender, guarded_channel}, - path_state::RemotePathState, -}; +use self::path_state::RemotePathState; use super::Source; use crate::{ disco::{self}, @@ -41,7 +39,6 @@ use crate::{ util::MaybeFuture, }; -mod guarded_channel; mod path_state; // TODO: use this @@ -125,7 +122,8 @@ pub(super) struct RemoteStateActor { /// Our local addresses. /// /// These are our local addresses and any reflexive transport addresses. - local_addrs: n0_watcher::Direct>, + /// They are called "direct addresses" in the magic socket actor. + local_direct_addrs: n0_watcher::Direct>, /// Shared state to allow to encrypt DISCO messages to peers. disco: DiscoState, /// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s. @@ -180,7 +178,7 @@ impl RemoteStateActor { pub(super) fn new( endpoint_id: EndpointId, local_endpoint_id: EndpointId, - local_addrs: n0_watcher::Direct>, + local_direct_addrs: n0_watcher::Direct>, disco: DiscoState, relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>, metrics: Arc, @@ -191,7 +189,7 @@ impl RemoteStateActor { endpoint_id, local_endpoint_id, metrics, - local_addrs, + local_direct_addrs, relay_mapped_addrs, discovery, disco, @@ -209,8 +207,12 @@ impl RemoteStateActor { } } - pub(super) fn start(self) -> RemoteStateHandle { - let (tx, rx) = guarded_channel(16); + pub(super) fn start( + self, + initial_msgs: Vec, + tasks: &mut JoinSet<(EndpointId, Vec)>, + ) -> mpsc::Sender { + let (tx, rx) = mpsc::channel(16); let me = self.local_endpoint_id; let endpoint_id = self.endpoint_id; @@ -219,23 +221,13 @@ impl RemoteStateActor { // we don't explicitly set a span we get the spans from whatever call happens to // first create the actor, which is often very confusing as it then keeps those // spans for all logging of the actor. - let task = task::spawn( - async move { - if let Err(err) = self.run(rx).await { - error!("actor failed: {err:#}"); - } - } - .instrument(info_span!( - parent: None, - "RemoteStateActor", - me = %me.fmt_short(), - remote = %endpoint_id.fmt_short(), - )), - ); - RemoteStateHandle { - sender: tx, - _task: AbortOnDropHandle::new(task), - } + tasks.spawn(self.run(initial_msgs, rx).instrument(info_span!( + parent: None, + "RemoteStateActor", + me = %me.fmt_short(), + remote = %endpoint_id.fmt_short(), + ))); + tx } /// Runs the main loop of the actor. @@ -243,10 +235,17 @@ impl RemoteStateActor { /// Note that the actor uses async handlers for tasks from the main loop. The actor is /// not processing items from the inbox while waiting on any async calls. So some /// discipline is needed to not turn pending for a long time. - async fn run(mut self, mut inbox: GuardedReceiver) -> n0_error::Result<()> { + async fn run( + mut self, + initial_msgs: Vec, + mut inbox: mpsc::Receiver, + ) -> (EndpointId, Vec) { trace!("actor started"); - let idle_timeout = MaybeFuture::None; - tokio::pin!(idle_timeout); + for msg in initial_msgs { + self.handle_message(msg).await; + } + let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT); + n0_future::pin!(idle_timeout); loop { let scheduled_path_open = match self.scheduled_open_path { Some(when) => MaybeFuture::Some(time::sleep_until(when)), @@ -258,11 +257,16 @@ impl RemoteStateActor { None => MaybeFuture::None, }; n0_future::pin!(scheduled_hp); + if !inbox.is_empty() || !self.connections.is_empty() { + idle_timeout + .as_mut() + .reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); + } tokio::select! { biased; msg = inbox.recv() => { match msg { - Some(msg) => self.handle_message(msg).await?, + Some(msg) => self.handle_message(msg).await, None => break, } } @@ -276,7 +280,11 @@ impl RemoteStateActor { self.selected_path.set(None).ok(); } } - _ = self.local_addrs.updated() => { + res = self.local_direct_addrs.updated() => { + if let Err(n0_watcher::Disconnected) = res { + trace!("direct address watcher disconnected, shutting down"); + break; + } trace!("local addrs updated, triggering holepunching"); self.trigger_holepunching().await; } @@ -297,36 +305,36 @@ impl RemoteStateActor { self.handle_discovery_item(item); } _ = &mut idle_timeout => { - if self.connections.is_empty() && inbox.close_if_idle() { + if self.connections.is_empty() && inbox.is_empty() { trace!("idle timeout expired and still idle: terminate actor"); break; + } else { + // Seems like we weren't really idle, so we reset + idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); } } } - - if self.connections.is_empty() && inbox.is_idle() && idle_timeout.is_none() { - trace!("start idle timeout"); - idle_timeout - .as_mut() - .set_future(time::sleep(ACTOR_MAX_IDLE_TIMEOUT)); - } else if idle_timeout.is_some() { - trace!("abort idle timeout"); - idle_timeout.as_mut().set_none() - } } + + inbox.close(); + // There might be a race between checking `inbox.is_empty()` and `inbox.close()`, + // so we pull out all messages that are left over. + let mut leftover_msgs = Vec::with_capacity(inbox.len()); + inbox.recv_many(&mut leftover_msgs, inbox.len()).await; + trace!("actor terminating"); - Ok(()) + (self.endpoint_id, leftover_msgs) } /// Handles an actor message. /// /// Error returns are fatal and kill the actor. #[instrument(skip(self))] - async fn handle_message(&mut self, msg: RemoteStateMessage) -> n0_error::Result<()> { + async fn handle_message(&mut self, msg: RemoteStateMessage) { // trace!("handling message"); match msg { RemoteStateMessage::SendDatagram(transmit) => { - self.handle_msg_send_datagram(transmit).await?; + self.handle_msg_send_datagram(transmit).await; } RemoteStateMessage::AddConnection(handle, tx) => { self.handle_msg_add_connection(handle, tx).await; @@ -347,7 +355,6 @@ impl RemoteStateActor { self.handle_msg_latency(tx); } } - Ok(()) } async fn send_datagram( @@ -360,38 +367,54 @@ impl RemoteStateActor { contents: owned_transmit.contents.as_ref(), segment_size: owned_transmit.segment_size, }; - self.sender.send(&dst, None, &transmit).await?; + self.sender + .send(&dst, None, &transmit) + .await + .with_context(|_| format!("failed to send datagram to {dst:?}"))?; Ok(()) } /// Handles [`RemoteStateMessage::SendDatagram`]. - /// - /// Error returns are fatal and kill the actor. - async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) -> n0_error::Result<()> { + async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) { + // Sending datagrams might fail, e.g. because we don't have the right transports set + // up to handle sending this owned transmit to. + // After all, we try every single path that we know (relay URL, IP address), even + // though we might not have a relay transport or ip-capable transport set up. + // So these errors must not be fatal for this actor (or even this operation). + if let Some(addr) = self.selected_path.get() { trace!(?addr, "sending datagram to selected path"); - self.send_datagram(addr, transmit).await?; + + if let Err(err) = self.send_datagram(addr.clone(), transmit).await { + debug!(?addr, "failed to send datagram on selected_path: {err:#}"); + } } else { trace!( paths = ?self.paths.addrs().collect::>(), "sending datagram to all known paths", ); + if self.paths.is_empty() { + warn!("Cannot send datagrams: No paths to remote endpoint known"); + } for addr in self.paths.addrs() { // We never want to send to our local addresses. // The local address set is updated in the main loop so we can use `peek` here. if let transports::Addr::Ip(sockaddr) = addr - && self.local_addrs.peek().iter().any(|a| a.addr == *sockaddr) + && self + .local_direct_addrs + .peek() + .iter() + .any(|a| a.addr == *sockaddr) { trace!(%sockaddr, "not sending datagram to our own address"); - } else { - self.send_datagram(addr.clone(), transmit.clone()).await?; + } else if let Err(err) = self.send_datagram(addr.clone(), transmit.clone()).await { + debug!(?addr, "failed to send datagram: {err:#}"); } } // This message is received *before* a connection is added. So we do // not yet have a connection to holepunch. Instead we trigger // holepunching when AddConnection is received. } - Ok(()) } /// Handles [`RemoteStateMessage::AddConnection`]. @@ -656,7 +679,7 @@ impl RemoteStateActor { let remote_addrs: BTreeSet = self.remote_hp_addrs(); let local_addrs: BTreeSet = self - .local_addrs + .local_direct_addrs .get() .iter() .map(|daddr| daddr.addr) @@ -748,7 +771,7 @@ impl RemoteStateActor { // Send the DISCO CallMeMaybe message over the relay. let my_numbers: Vec = self - .local_addrs + .local_direct_addrs .get() .iter() .map(|daddr| daddr.addr) @@ -1096,21 +1119,6 @@ pub(crate) enum RemoteStateMessage { Latency(oneshot::Sender>), } -/// A handle to a [`RemoteStateActor`]. -/// -/// Dropping this will stop the actor. The actor will also stop after an idle timeout -/// if it has no connections, an empty inbox, and no other senders than the one stored -/// in the endpoint map exist. -#[derive(Debug)] -pub(super) struct RemoteStateHandle { - /// Sender for the channel into the [`RemoteStateActor`]. - /// - /// This is a [`GuardedSender`], from which we can get a sender but only if the receiver - /// hasn't been closed. - pub(super) sender: GuardedSender, - _task: AbortOnDropHandle<()>, -} - /// Information about a holepunch attempt. #[derive(Debug)] struct HolepunchAttempt { diff --git a/iroh/src/magicsock/remote_map/remote_state/guarded_channel.rs b/iroh/src/magicsock/remote_map/remote_state/guarded_channel.rs deleted file mode 100644 index 2b3b3b7644..0000000000 --- a/iroh/src/magicsock/remote_map/remote_state/guarded_channel.rs +++ /dev/null @@ -1,78 +0,0 @@ -use std::sync::{Arc, Mutex}; - -use tokio::sync::mpsc; - -/// Creates a new [`mpsc`] channel where the receiver can only close if there are no active senders. -pub(super) fn guarded_channel(cap: usize) -> (GuardedSender, GuardedReceiver) { - let (tx, rx) = mpsc::channel(cap); - let tx = Arc::new(Mutex::new(Some(tx))); - (GuardedSender { tx: tx.clone() }, GuardedReceiver { tx, rx }) -} - -#[derive(Debug)] -pub(crate) struct GuardedSender { - tx: Arc>>>, -} - -impl GuardedSender { - /// Returns a sender to the channel. - /// - /// Returns a new sender if the channel is not closed. It is guaranteed that - /// [`GuardedReceiver::close_if_idle`] will not return `true` until the sender is dropped. - /// Returns `None` if the channel has been closed. - pub(crate) fn get(&self) -> Option> { - self.tx.lock().expect("poisoned").clone() - } - - /// Returns `true` if the channel has been closed. - pub(crate) fn is_closed(&self) -> bool { - self.tx.lock().expect("poisoned").is_none() - } -} - -#[derive(Debug)] -pub(super) struct GuardedReceiver { - rx: mpsc::Receiver, - tx: Arc>>>, -} - -impl GuardedReceiver { - /// Receives the next value for this receiver. - /// - /// See [`mpsc::Receiver::recv`]. - pub(super) async fn recv(&mut self) -> Option { - self.rx.recv().await - } - - /// Returns `true` if the inbox is empty and no senders to the inbox exist. - pub(super) fn is_idle(&self) -> bool { - self.rx.is_empty() && self.rx.sender_strong_count() <= 1 - } - - /// Closes the channel if the channel is idle. - /// - /// Returns `true` if the channel is idle and has now been closed, and `false` if the channel - /// is not idle and therefore has not been not closed. - /// - /// Uses a lock internally to make sure that there cannot be a race condition between - /// calling this and a new sender being created. - pub(super) fn close_if_idle(&mut self) -> bool { - let mut guard = self.tx.lock().expect("poisoned"); - if self.is_idle() { - *guard = None; - self.rx.close(); - true - } else { - false - } - } -} - -impl Drop for GuardedReceiver { - fn drop(&mut self) { - let mut guard = self.tx.lock().expect("poisoned"); - *guard = None; - self.rx.close(); - drop(guard) - } -} diff --git a/iroh/src/magicsock/remote_map/remote_state/path_state.rs b/iroh/src/magicsock/remote_map/remote_state/path_state.rs index aeea1b2a5f..a35c947650 100644 --- a/iroh/src/magicsock/remote_map/remote_state/path_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state/path_state.rs @@ -117,6 +117,11 @@ impl RemotePathState { self.paths.keys() } + /// Returns whether this stores any addresses. + pub(super) fn is_empty(&self) -> bool { + self.paths.is_empty() + } + /// Replies to all pending resolve requests. /// /// This is a no-op if no requests are queued. Replies `Ok` if we have any known paths, diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index d6a31f7256..85efb5cf2e 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -541,7 +541,9 @@ impl TransportsSender { if any_match { Err(io::Error::other("all available transports failed")) } else { - Err(io::Error::other("no transport available")) + Err(io::Error::other( + "no transport available for this destination", + )) } }