From db9d18a9f8e7d9d037bc8d3f51af3849e9100550 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 18 Nov 2025 12:12:53 +0100 Subject: [PATCH 01/17] fix --- iroh/src/magicsock/remote_map/remote_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 56d507c617..8b6ac7f143 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -283,7 +283,7 @@ impl RemoteStateActor { idle_timeout .as_mut() .set_future(time::sleep(ACTOR_MAX_IDLE_TIMEOUT)); - } else if idle_timeout.is_some() { + } else if idle_timeout.is_some() && (!self.connections.is_empty() || !inbox.is_idle()) { trace!("abort idle timeout"); idle_timeout.as_mut().set_none() } From 08061e53744ae44befa11e1a535eebb18f974a50 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 18 Nov 2025 12:18:47 +0100 Subject: [PATCH 02/17] make nicer --- iroh/src/magicsock/remote_map/remote_state.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 8b6ac7f143..b3bc1db998 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -278,12 +278,13 @@ impl RemoteStateActor { } } - if self.connections.is_empty() && inbox.is_idle() && idle_timeout.is_none() { + let is_idle = self.connections.is_empty() && inbox.is_idle(); + if idle_timeout.is_none() && is_idle { trace!("start idle timeout"); idle_timeout .as_mut() .set_future(time::sleep(ACTOR_MAX_IDLE_TIMEOUT)); - } else if idle_timeout.is_some() && (!self.connections.is_empty() || !inbox.is_idle()) { + } else if idle_timeout.is_some() && !is_idle { trace!("abort idle timeout"); idle_timeout.as_mut().set_none() } From 3ee7c6915e19611d7a78941a7a73528e957c2065 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 18 Nov 2025 12:30:46 +0100 Subject: [PATCH 03/17] fix: check if local_addrs are connected --- iroh/src/magicsock/remote_map/remote_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index b3bc1db998..30cb71007a 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -253,7 +253,7 @@ impl RemoteStateActor { self.selected_path.set(None).ok(); } } - _ = self.local_addrs.updated() => { + _ = self.local_addrs.updated(), if self.local_addrs.is_connected() => { trace!("local addrs updated, triggering holepunching"); self.trigger_holepunching().await; } From 9c196396261e7fd74169510e8ea185dce88e1071 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 18 Nov 2025 13:10:25 +0100 Subject: [PATCH 04/17] chore: add log --- iroh/src/magicsock/remote_map/remote_state.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 30cb71007a..036427bd12 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -333,6 +333,7 @@ impl RemoteStateActor { dst: transports::Addr, owned_transmit: OwnedTransmit, ) -> n0_error::Result<()> { + trace!(?dst, "send datagram"); let transmit = transports::Transmit { ecn: owned_transmit.ecn, contents: owned_transmit.contents.as_ref(), From ab321af5c8c5f3c1b30bf8d0b0271aac53baf0c2 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 18 Nov 2025 13:23:52 +0100 Subject: [PATCH 05/17] chore: trace to debug for netsime --- iroh/src/magicsock/remote_map/remote_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 036427bd12..aa4891088c 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -333,7 +333,7 @@ impl RemoteStateActor { dst: transports::Addr, owned_transmit: OwnedTransmit, ) -> n0_error::Result<()> { - trace!(?dst, "send datagram"); + debug!(?dst, "send datagram"); let transmit = transports::Transmit { ecn: owned_transmit.ecn, contents: owned_transmit.contents.as_ref(), From 717ac881993055be7a4e2d8f306b0c7fb4af4881 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 18 Nov 2025 13:47:02 +0100 Subject: [PATCH 06/17] fix: log failed addr in transports send --- iroh/src/magicsock/transports.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index d6a31f7256..f19004a460 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -539,9 +539,13 @@ impl TransportsSender { } } if any_match { - Err(io::Error::other("all available transports failed")) + Err(io::Error::other(format!( + "all available transports failed for destination {dst:?}" + ))) } else { - Err(io::Error::other("no transport available")) + Err(io::Error::other(format!( + "no transport available for destination {dst:?}" + ))) } } From 7c11665bc75926e5ed9ac38fdb87d23a5219e055 Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 18 Nov 2025 13:47:23 +0100 Subject: [PATCH 07/17] fix: don't break RemoteStateActor if sending datagram fails --- iroh/src/magicsock/remote_map/remote_state.rs | 37 +++++++------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index aa4891088c..820fbf9084 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -183,7 +183,7 @@ impl RemoteStateActor { } } - pub(super) fn start(mut self) -> RemoteStateHandle { + pub(super) fn start(self) -> RemoteStateHandle { let (tx, rx) = guarded_channel(16); let me = self.local_endpoint_id; let endpoint_id = self.endpoint_id; @@ -193,19 +193,12 @@ impl RemoteStateActor { // we don't explicitly set a span we get the spans from whatever call happens to // first create the actor, which is often very confusing as it then keeps those // spans for all logging of the actor. - let task = task::spawn( - async move { - if let Err(err) = self.run(rx).await { - error!("actor failed: {err:#}"); - } - } - .instrument(info_span!( - parent: None, - "RemoteStateActor", - me = %me.fmt_short(), - remote = %endpoint_id.fmt_short(), - )), - ); + let task = task::spawn(self.run(rx).instrument(info_span!( + parent: None, + "RemoteStateActor", + me = %me.fmt_short(), + remote = %endpoint_id.fmt_short(), + ))); RemoteStateHandle { sender: tx, _task: AbortOnDropHandle::new(task), @@ -217,10 +210,7 @@ impl RemoteStateActor { /// Note that the actor uses async handlers for tasks from the main loop. The actor is /// not processing items from the inbox while waiting on any async calls. So some /// discipline is needed to not turn pending for a long time. - async fn run( - &mut self, - mut inbox: GuardedReceiver, - ) -> n0_error::Result<()> { + async fn run(mut self, mut inbox: GuardedReceiver) { trace!("actor started"); let idle_timeout = MaybeFuture::None; tokio::pin!(idle_timeout); @@ -239,7 +229,7 @@ impl RemoteStateActor { biased; msg = inbox.recv() => { match msg { - Some(msg) => self.handle_message(msg).await?, + Some(msg) => self.handle_message(msg).await, None => break, } } @@ -290,18 +280,19 @@ impl RemoteStateActor { } } trace!("actor terminating"); - Ok(()) } /// Handles an actor message. /// /// Error returns are fatal and kill the actor. #[instrument(skip(self))] - async fn handle_message(&mut self, msg: RemoteStateMessage) -> n0_error::Result<()> { + async fn handle_message(&mut self, msg: RemoteStateMessage) { // trace!("handling message"); match msg { RemoteStateMessage::SendDatagram(transmit) => { - self.handle_msg_send_datagram(transmit).await?; + if let Err(err) = self.handle_msg_send_datagram(transmit).await { + warn!("failed to send datagram: {err:#}"); + } } RemoteStateMessage::AddConnection(handle, tx) => { self.handle_msg_add_connection(handle, tx).await; @@ -325,7 +316,6 @@ impl RemoteStateActor { self.handle_msg_latency(tx); } } - Ok(()) } async fn send_datagram( @@ -333,7 +323,6 @@ impl RemoteStateActor { dst: transports::Addr, owned_transmit: OwnedTransmit, ) -> n0_error::Result<()> { - debug!(?dst, "send datagram"); let transmit = transports::Transmit { ecn: owned_transmit.ecn, contents: owned_transmit.contents.as_ref(), From 899df002564ac1be3e05e26aeea156ee28256859 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 20 Nov 2025 09:47:09 +0100 Subject: [PATCH 08/17] Simplify the idle timeout logic & don't short-circuit in `handle_msg_send_datagram` (#3678) Intended for merging into https://github.com/n0-computer/iroh/pull/3676 - Makes `idle_timeout` always be a `time::Sleep` that is `.reset` once there is any activity. I find this much easier to reason about. - Fixes a bug where `handle_msg_send_datagram` short-circuits when sending on one path fails. Instead, it should *try* all paths, even if some of them fail. --- iroh/src/magicsock/remote_map/remote_state.rs | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 820fbf9084..cd21d6c58b 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -212,8 +212,8 @@ impl RemoteStateActor { /// discipline is needed to not turn pending for a long time. async fn run(mut self, mut inbox: GuardedReceiver) { trace!("actor started"); - let idle_timeout = MaybeFuture::None; - tokio::pin!(idle_timeout); + let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT); + n0_future::pin!(idle_timeout); loop { let scheduled_path_open = match self.scheduled_open_path { Some(when) => MaybeFuture::Some(time::sleep_until(when)), @@ -225,6 +225,11 @@ impl RemoteStateActor { None => MaybeFuture::None, }; n0_future::pin!(scheduled_hp); + if !inbox.is_idle() || !self.connections.is_empty() { + idle_timeout + .as_mut() + .reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); + } tokio::select! { biased; msg = inbox.recv() => { @@ -264,20 +269,12 @@ impl RemoteStateActor { if self.connections.is_empty() && inbox.close_if_idle() { trace!("idle timeout expired and still idle: terminate actor"); break; + } else { + // Seems like we weren't really idle, so we reset + idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); } } } - - let is_idle = self.connections.is_empty() && inbox.is_idle(); - if idle_timeout.is_none() && is_idle { - trace!("start idle timeout"); - idle_timeout - .as_mut() - .set_future(time::sleep(ACTOR_MAX_IDLE_TIMEOUT)); - } else if idle_timeout.is_some() && !is_idle { - trace!("abort idle timeout"); - idle_timeout.as_mut().set_none() - } } trace!("actor terminating"); } @@ -290,9 +287,7 @@ impl RemoteStateActor { // trace!("handling message"); match msg { RemoteStateMessage::SendDatagram(transmit) => { - if let Err(err) = self.handle_msg_send_datagram(transmit).await { - warn!("failed to send datagram: {err:#}"); - } + self.handle_msg_send_datagram(transmit).await; } RemoteStateMessage::AddConnection(handle, tx) => { self.handle_msg_add_connection(handle, tx).await; @@ -333,25 +328,36 @@ impl RemoteStateActor { } /// Handles [`RemoteStateMessage::SendDatagram`]. - /// - /// Error returns are fatal and kill the actor. - async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) -> n0_error::Result<()> { + async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) { + // Sending datagrams might fail, e.g. because we don't have the right transports set + // up to handle sending this owned transmit to. + // After all, we try every single path that we know (relay URL, IP address), even + // though we might not have a relay transport or ip-capable transport set up. + // So these errors must not be fatal for this actor (or even this operation). + if let Some(addr) = self.selected_path.get() { trace!(?addr, "sending datagram to selected path"); - self.send_datagram(addr, transmit).await?; + + if let Err(err) = self.send_datagram(addr.clone(), transmit).await { + debug!(?addr, "failed to send datagram on selected_path: {err:#}"); + } } else { trace!( paths = ?self.paths.keys().collect::>(), "sending datagram to all known paths", ); + if self.paths.is_empty() { + warn!("Cannot send datagrams: No paths to remote endpoint known"); + } for addr in self.paths.keys() { - self.send_datagram(addr.clone(), transmit.clone()).await?; + if let Err(err) = self.send_datagram(addr.clone(), transmit.clone()).await { + debug!(?addr, "failed to send datagram: {err:#}"); + } } // This message is received *before* a connection is added. So we do // not yet have a connection to holepunch. Instead we trigger // holepunching when AddConnection is received. } - Ok(()) } /// Handles [`RemoteStateMessage::AddConnection`]. From 2eeac0735a9fd8ee5792d8186ca601aa6a73361a Mon Sep 17 00:00:00 2001 From: Frando Date: Thu, 20 Nov 2025 10:26:24 +0100 Subject: [PATCH 09/17] fix: add error context add call site --- iroh/src/magicsock/remote_map/remote_state.rs | 6 +++++- iroh/src/magicsock/transports.rs | 10 ++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 2bfb0cff53..11daf9c131 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -7,6 +7,7 @@ use std::{ }; use iroh_base::{EndpointId, RelayUrl, TransportAddr}; +use n0_error::StackResultExt; use n0_future::{ Either, FuturesUnordered, MergeUnbounded, Stream, StreamExt, boxed::BoxStream, @@ -349,7 +350,10 @@ impl RemoteStateActor { contents: owned_transmit.contents.as_ref(), segment_size: owned_transmit.segment_size, }; - self.sender.send(&dst, None, &transmit).await?; + self.sender + .send(&dst, None, &transmit) + .await + .with_context(|_| format!("failed to send datagram to {dst:?}"))?; Ok(()) } diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index f19004a460..85efb5cf2e 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -539,13 +539,11 @@ impl TransportsSender { } } if any_match { - Err(io::Error::other(format!( - "all available transports failed for destination {dst:?}" - ))) + Err(io::Error::other("all available transports failed")) } else { - Err(io::Error::other(format!( - "no transport available for destination {dst:?}" - ))) + Err(io::Error::other( + "no transport available for this destination", + )) } } From ff20fa3aa55792de89dbfd53c27894ef9b53c28c Mon Sep 17 00:00:00 2001 From: Frando Date: Thu, 20 Nov 2025 10:27:39 +0100 Subject: [PATCH 10/17] chore: clippy --- iroh/src/magicsock/remote_map/remote_state.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 11daf9c131..d1ac91feab 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -386,10 +386,8 @@ impl RemoteStateActor { && self.local_addrs.peek().iter().any(|a| a.addr == *sockaddr) { trace!(%sockaddr, "not sending datagram to our own address"); - } else { - if let Err(err) = self.send_datagram(addr.clone(), transmit.clone()).await { - debug!(?addr, "failed to send datagram: {err:#}"); - } + } else if let Err(err) = self.send_datagram(addr.clone(), transmit.clone()).await { + debug!(?addr, "failed to send datagram: {err:#}"); } } // This message is received *before* a connection is added. So we do From 3f8d72a6996325bbfbf1b9c86d8a633fbb29e0e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 20 Nov 2025 10:31:35 +0100 Subject: [PATCH 11/17] Stop the `RemoteStateActor` when the direct addr watcher disconnects --- iroh/src/magicsock/remote_map/remote_state.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index d1ac91feab..0ade3a4ba3 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -275,7 +275,11 @@ impl RemoteStateActor { self.selected_path.set(None).ok(); } } - _ = self.local_addrs.updated(), if self.local_addrs.is_connected() => { + res = self.local_addrs.updated() => { + if let Err(n0_watcher::Disconnected) = res { + trace!("direct address watcher disconnected, shutting down"); + break; + } trace!("local addrs updated, triggering holepunching"); self.trigger_holepunching().await; } From 52d4382a4d49251240a1267d2d93e5f9d4359f4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 20 Nov 2025 10:35:08 +0100 Subject: [PATCH 12/17] Rename `local_addrs` to `local_direct_addrs` --- iroh/src/magicsock/remote_map.rs | 9 +++++---- iroh/src/magicsock/remote_map/remote_state.rs | 19 ++++++++++++------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/iroh/src/magicsock/remote_map.rs b/iroh/src/magicsock/remote_map.rs index 0861a039b0..76d8fdf0a4 100644 --- a/iroh/src/magicsock/remote_map.rs +++ b/iroh/src/magicsock/remote_map.rs @@ -57,7 +57,8 @@ pub(crate) struct RemoteMap { /// The endpoint ID of the local endpoint. local_endpoint_id: EndpointId, metrics: Arc, - local_addrs: n0_watcher::Direct>, + /// The "direct" addresses known for our local endpoint + local_direct_addrs: n0_watcher::Direct>, disco: DiscoState, sender: TransportsSender, discovery: ConcurrentDiscovery, @@ -69,7 +70,7 @@ impl RemoteMap { local_endpoint_id: EndpointId, metrics: Arc, - local_addrs: n0_watcher::Direct>, + local_direct_addrs: n0_watcher::Direct>, disco: DiscoState, sender: TransportsSender, discovery: ConcurrentDiscovery, @@ -80,7 +81,7 @@ impl RemoteMap { relay_mapped_addrs: Default::default(), local_endpoint_id, metrics, - local_addrs, + local_direct_addrs, disco, sender, discovery, @@ -138,7 +139,7 @@ impl RemoteMap { let handle = RemoteStateActor::new( eid, self.local_endpoint_id, - self.local_addrs.clone(), + self.local_direct_addrs.clone(), self.disco.clone(), self.relay_mapped_addrs.clone(), self.metrics.clone(), diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 0ade3a4ba3..6fda36e3c9 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -126,7 +126,8 @@ pub(super) struct RemoteStateActor { /// Our local addresses. /// /// These are our local addresses and any reflexive transport addresses. - local_addrs: n0_watcher::Direct>, + /// They are called "direct addresses" in the magic socket actor. + local_direct_addrs: n0_watcher::Direct>, /// Shared state to allow to encrypt DISCO messages to peers. disco: DiscoState, /// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s. @@ -181,7 +182,7 @@ impl RemoteStateActor { pub(super) fn new( endpoint_id: EndpointId, local_endpoint_id: EndpointId, - local_addrs: n0_watcher::Direct>, + local_direct_addrs: n0_watcher::Direct>, disco: DiscoState, relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>, metrics: Arc, @@ -192,7 +193,7 @@ impl RemoteStateActor { endpoint_id, local_endpoint_id, metrics, - local_addrs, + local_direct_addrs, relay_mapped_addrs, discovery, disco, @@ -275,7 +276,7 @@ impl RemoteStateActor { self.selected_path.set(None).ok(); } } - res = self.local_addrs.updated() => { + res = self.local_direct_addrs.updated() => { if let Err(n0_watcher::Disconnected) = res { trace!("direct address watcher disconnected, shutting down"); break; @@ -387,7 +388,11 @@ impl RemoteStateActor { // We never want to send to our local addresses. // The local address set is updated in the main loop so we can use `peek` here. if let transports::Addr::Ip(sockaddr) = addr - && self.local_addrs.peek().iter().any(|a| a.addr == *sockaddr) + && self + .local_direct_addrs + .peek() + .iter() + .any(|a| a.addr == *sockaddr) { trace!(%sockaddr, "not sending datagram to our own address"); } else if let Err(err) = self.send_datagram(addr.clone(), transmit.clone()).await { @@ -662,7 +667,7 @@ impl RemoteStateActor { let remote_addrs: BTreeSet = self.remote_hp_addrs(); let local_addrs: BTreeSet = self - .local_addrs + .local_direct_addrs .get() .iter() .map(|daddr| daddr.addr) @@ -754,7 +759,7 @@ impl RemoteStateActor { // Send the DISCO CallMeMaybe message over the relay. let my_numbers: Vec = self - .local_addrs + .local_direct_addrs .get() .iter() .map(|daddr| daddr.addr) From 64e9e2d00f9ebc51fdef974771274383919dc876 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 19 Nov 2025 12:11:24 +0100 Subject: [PATCH 13/17] Replace `RemoteStateHandle` with storing a `JoinSet` in `RemoteMap` --- iroh/src/magicsock/remote_map.rs | 46 +++++++++++++------ iroh/src/magicsock/remote_map/remote_state.rs | 45 +++++++----------- 2 files changed, 49 insertions(+), 42 deletions(-) diff --git a/iroh/src/magicsock/remote_map.rs b/iroh/src/magicsock/remote_map.rs index 76d8fdf0a4..b77b32d448 100644 --- a/iroh/src/magicsock/remote_map.rs +++ b/iroh/src/magicsock/remote_map.rs @@ -2,20 +2,22 @@ use std::{ collections::{BTreeSet, hash_map}, hash::Hash, net::{IpAddr, SocketAddr}, + ops::DerefMut, sync::{Arc, Mutex}, time::Duration, }; -use iroh_base::{EndpointId, RelayUrl}; +use iroh_base::{EndpointAddr, EndpointId, RelayUrl}; +use n0_future::task::JoinSet; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; -use tracing::warn; +use tracing::{debug, error, warn}; pub(crate) use self::remote_state::PathsWatcher; +use self::remote_state::RemoteStateActor; pub(super) use self::remote_state::RemoteStateMessage; pub use self::remote_state::{PathInfo, PathInfoList}; -use self::remote_state::{RemoteStateActor, RemoteStateHandle}; use super::{ DirectAddr, DiscoState, MagicsockMetrics, mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr}, @@ -45,7 +47,7 @@ pub(crate) struct RemoteMap { // State we keep about remote endpoints. // /// The actors tracking each remote endpoint. - actor_handles: Mutex>, + actor_senders: Mutex>>, /// The mapping between [`EndpointId`]s and [`EndpointIdMappedAddr`]s. pub(super) endpoint_mapped_addrs: AddrMap, /// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s. @@ -62,6 +64,7 @@ pub(crate) struct RemoteMap { disco: DiscoState, sender: TransportsSender, discovery: ConcurrentDiscovery, + actor_tasks: Mutex>>, } impl RemoteMap { @@ -76,7 +79,7 @@ impl RemoteMap { discovery: ConcurrentDiscovery, ) -> Self { Self { - actor_handles: Mutex::new(FxHashMap::default()), + actor_senders: Mutex::new(FxHashMap::default()), endpoint_mapped_addrs: Default::default(), relay_mapped_addrs: Default::default(), local_endpoint_id, @@ -85,6 +88,7 @@ impl RemoteMap { disco, sender, discovery, + actor_tasks: Default::default(), } } @@ -97,8 +101,19 @@ impl RemoteMap { /// This should be called periodically to remove handles to endpoint state actors /// that have shutdown after their idle timeout expired. pub(super) fn remove_closed_remote_state_actors(&self) { - let mut handles = self.actor_handles.lock().expect("poisoned"); - handles.retain(|_eid, handle| !handle.sender.is_closed()) + let mut senders = self.actor_senders.lock().expect("poisoned"); + senders.retain(|_eid, sender| !sender.is_closed()); + while let Some(result) = self.actor_tasks.lock().expect("poisoned").try_join_next() { + match result { + Ok(leftover_msgs) => debug!(?leftover_msgs, "TODO: handle leftover messages"), + Err(err) => { + if let Ok(panic) = err.try_into_panic() { + error!("RemoteStateActor panicked."); + std::panic::resume_unwind(panic); + } + } + } + } } /// Returns the sender for the [`RemoteStateActor`]. @@ -107,10 +122,10 @@ impl RemoteMap { /// /// [`RemoteStateActor`]: remote_state::RemoteStateActor pub(super) fn remote_state_actor(&self, eid: EndpointId) -> mpsc::Sender { - let mut handles = self.actor_handles.lock().expect("poisoned"); + let mut handles = self.actor_senders.lock().expect("poisoned"); match handles.entry(eid) { hash_map::Entry::Occupied(mut entry) => { - if let Some(sender) = entry.get().sender.get() { + if let Some(sender) = entry.get().get() { sender } else { // The actor is dead: Start a new actor. @@ -133,10 +148,13 @@ impl RemoteMap { fn start_remote_state_actor( &self, eid: EndpointId, - ) -> (RemoteStateHandle, mpsc::Sender) { + ) -> ( + GuardedSender, + mpsc::Sender, + ) { // Ensure there is a RemoteMappedAddr for this EndpointId. self.endpoint_mapped_addrs.get(&eid); - let handle = RemoteStateActor::new( + let sender = RemoteStateActor::new( eid, self.local_endpoint_id, self.local_direct_addrs.clone(), @@ -146,9 +164,9 @@ impl RemoteMap { self.sender.clone(), self.discovery.clone(), ) - .start(); - let sender = handle.sender.get().expect("just created"); - (handle, sender) + .start(self.actor_tasks.lock().expect("poisoned").deref_mut()); + let tx = sender.get().expect("just created"); + (sender, tx) } pub(super) fn handle_ping(&self, msg: disco::Ping, sender: EndpointId, src: transports::Addr) { diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 6fda36e3c9..f21b4f1a8b 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -11,7 +11,7 @@ use n0_error::StackResultExt; use n0_future::{ Either, FuturesUnordered, MergeUnbounded, Stream, StreamExt, boxed::BoxStream, - task::{self, AbortOnDropHandle}, + task::JoinSet, time::{self, Duration, Instant}, }; use n0_watcher::{Watchable, Watcher}; @@ -42,7 +42,7 @@ use crate::{ util::MaybeFuture, }; -mod guarded_channel; +pub(crate) mod guarded_channel; mod path_state; // TODO: use this @@ -211,7 +211,10 @@ impl RemoteStateActor { } } - pub(super) fn start(self) -> RemoteStateHandle { + pub(super) fn start( + self, + tasks: &mut JoinSet>, + ) -> GuardedSender { let (tx, rx) = guarded_channel(16); let me = self.local_endpoint_id; let endpoint_id = self.endpoint_id; @@ -221,16 +224,13 @@ impl RemoteStateActor { // we don't explicitly set a span we get the spans from whatever call happens to // first create the actor, which is often very confusing as it then keeps those // spans for all logging of the actor. - let task = task::spawn(self.run(rx).instrument(info_span!( + tasks.spawn(self.run(rx).instrument(info_span!( parent: None, "RemoteStateActor", me = %me.fmt_short(), remote = %endpoint_id.fmt_short(), ))); - RemoteStateHandle { - sender: tx, - _task: AbortOnDropHandle::new(task), - } + tx } /// Runs the main loop of the actor. @@ -238,11 +238,14 @@ impl RemoteStateActor { /// Note that the actor uses async handlers for tasks from the main loop. The actor is /// not processing items from the inbox while waiting on any async calls. So some /// discipline is needed to not turn pending for a long time. - async fn run(mut self, mut inbox: GuardedReceiver) { + async fn run( + mut self, + mut inbox: GuardedReceiver, + ) -> Vec { trace!("actor started"); let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT); n0_future::pin!(idle_timeout); - loop { + let leftover_msgs = loop { let scheduled_path_open = match self.scheduled_open_path { Some(when) => MaybeFuture::Some(time::sleep_until(when)), None => MaybeFuture::None, @@ -263,7 +266,7 @@ impl RemoteStateActor { msg = inbox.recv() => { match msg { Some(msg) => self.handle_message(msg).await, - None => break, + None => break vec![], } } Some((id, evt)) = self.path_events.next() => { @@ -303,15 +306,16 @@ impl RemoteStateActor { _ = &mut idle_timeout => { if self.connections.is_empty() && inbox.close_if_idle() { trace!("idle timeout expired and still idle: terminate actor"); - break; + break vec![]; } else { // Seems like we weren't really idle, so we reset idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); } } } - } + }; trace!("actor terminating"); + leftover_msgs } /// Handles an actor message. @@ -1107,21 +1111,6 @@ pub(crate) enum RemoteStateMessage { Latency(oneshot::Sender>), } -/// A handle to a [`RemoteStateActor`]. -/// -/// Dropping this will stop the actor. The actor will also stop after an idle timeout -/// if it has no connections, an empty inbox, and no other senders than the one stored -/// in the endpoint map exist. -#[derive(Debug)] -pub(super) struct RemoteStateHandle { - /// Sender for the channel into the [`RemoteStateActor`]. - /// - /// This is a [`GuardedSender`], from which we can get a sender but only if the receiver - /// hasn't been closed. - pub(super) sender: GuardedSender, - _task: AbortOnDropHandle<()>, -} - /// Information about a holepunch attempt. #[derive(Debug)] struct HolepunchAttempt { From 969754ddaee3859bc9d5a8de3b926a6129c1ed20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 19 Nov 2025 12:43:19 +0100 Subject: [PATCH 14/17] Remove `guarded_channel` and use `tokio::mpsc` instead. Return leftover messages --- iroh/src/magicsock/remote_map.rs | 33 ++------ iroh/src/magicsock/remote_map/remote_state.rs | 27 ++++--- .../remote_state/guarded_channel.rs | 78 ------------------- 3 files changed, 23 insertions(+), 115 deletions(-) delete mode 100644 iroh/src/magicsock/remote_map/remote_state/guarded_channel.rs diff --git a/iroh/src/magicsock/remote_map.rs b/iroh/src/magicsock/remote_map.rs index b77b32d448..23e3cedac2 100644 --- a/iroh/src/magicsock/remote_map.rs +++ b/iroh/src/magicsock/remote_map.rs @@ -7,7 +7,7 @@ use std::{ time::Duration, }; -use iroh_base::{EndpointAddr, EndpointId, RelayUrl}; +use iroh_base::{EndpointId, RelayUrl}; use n0_future::task::JoinSet; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -47,7 +47,7 @@ pub(crate) struct RemoteMap { // State we keep about remote endpoints. // /// The actors tracking each remote endpoint. - actor_senders: Mutex>>, + actor_senders: Mutex>>, /// The mapping between [`EndpointId`]s and [`EndpointIdMappedAddr`]s. pub(super) endpoint_mapped_addrs: AddrMap, /// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s. @@ -124,19 +124,10 @@ impl RemoteMap { pub(super) fn remote_state_actor(&self, eid: EndpointId) -> mpsc::Sender { let mut handles = self.actor_senders.lock().expect("poisoned"); match handles.entry(eid) { - hash_map::Entry::Occupied(mut entry) => { - if let Some(sender) = entry.get().get() { - sender - } else { - // The actor is dead: Start a new actor. - let (handle, sender) = self.start_remote_state_actor(eid); - entry.insert(handle); - sender - } - } + hash_map::Entry::Occupied(entry) => entry.get().clone(), hash_map::Entry::Vacant(entry) => { - let (handle, sender) = self.start_remote_state_actor(eid); - entry.insert(handle); + let sender = self.start_remote_state_actor(eid); + entry.insert(sender.clone()); sender } } @@ -145,16 +136,10 @@ impl RemoteMap { /// Starts a new remote state actor and returns a handle and a sender. /// /// The handle is not inserted into the endpoint map, this must be done by the caller of this function. - fn start_remote_state_actor( - &self, - eid: EndpointId, - ) -> ( - GuardedSender, - mpsc::Sender, - ) { + fn start_remote_state_actor(&self, eid: EndpointId) -> mpsc::Sender { // Ensure there is a RemoteMappedAddr for this EndpointId. self.endpoint_mapped_addrs.get(&eid); - let sender = RemoteStateActor::new( + RemoteStateActor::new( eid, self.local_endpoint_id, self.local_direct_addrs.clone(), @@ -164,9 +149,7 @@ impl RemoteMap { self.sender.clone(), self.discovery.clone(), ) - .start(self.actor_tasks.lock().expect("poisoned").deref_mut()); - let tx = sender.get().expect("just created"); - (sender, tx) + .start(self.actor_tasks.lock().expect("poisoned").deref_mut()) } pub(super) fn handle_ping(&self, msg: disco::Ping, sender: EndpointId, src: transports::Addr) { diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index f21b4f1a8b..7ecbfafbc4 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -20,14 +20,12 @@ use quinn_proto::{PathError, PathEvent, PathId, PathStatus}; use rustc_hash::FxHashMap; use smallvec::SmallVec; use sync_wrapper::SyncStream; -use tokio::sync::oneshot; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn}; -use self::{ - guarded_channel::{GuardedReceiver, GuardedSender, guarded_channel}, - path_state::RemotePathState, -}; +use self::path_state::RemotePathState; +use tokio::sync::{mpsc, oneshot}; + use super::Source; use crate::{ disco::{self}, @@ -42,7 +40,6 @@ use crate::{ util::MaybeFuture, }; -pub(crate) mod guarded_channel; mod path_state; // TODO: use this @@ -214,8 +211,8 @@ impl RemoteStateActor { pub(super) fn start( self, tasks: &mut JoinSet>, - ) -> GuardedSender { - let (tx, rx) = guarded_channel(16); + ) -> mpsc::Sender { + let (tx, rx) = mpsc::channel(16); let me = self.local_endpoint_id; let endpoint_id = self.endpoint_id; @@ -240,7 +237,7 @@ impl RemoteStateActor { /// discipline is needed to not turn pending for a long time. async fn run( mut self, - mut inbox: GuardedReceiver, + mut inbox: mpsc::Receiver, ) -> Vec { trace!("actor started"); let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT); @@ -256,7 +253,7 @@ impl RemoteStateActor { None => MaybeFuture::None, }; n0_future::pin!(scheduled_hp); - if !inbox.is_idle() || !self.connections.is_empty() { + if !inbox.is_empty() || !self.connections.is_empty() { idle_timeout .as_mut() .reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); @@ -304,9 +301,14 @@ impl RemoteStateActor { self.handle_discovery_item(item); } _ = &mut idle_timeout => { - if self.connections.is_empty() && inbox.close_if_idle() { + if self.connections.is_empty() && inbox.is_empty() { trace!("idle timeout expired and still idle: terminate actor"); - break vec![]; + inbox.close(); + // There might be a race between checking `inbox.is_empty()` and `inbox.close()`, + // so we pull out all messages that are left over. + let mut leftover_msgs = Vec::with_capacity(inbox.len()); + inbox.recv_many(&mut leftover_msgs, inbox.len()).await; + break leftover_msgs; } else { // Seems like we weren't really idle, so we reset idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); @@ -314,6 +316,7 @@ impl RemoteStateActor { } } }; + trace!("actor terminating"); leftover_msgs } diff --git a/iroh/src/magicsock/remote_map/remote_state/guarded_channel.rs b/iroh/src/magicsock/remote_map/remote_state/guarded_channel.rs deleted file mode 100644 index 2b3b3b7644..0000000000 --- a/iroh/src/magicsock/remote_map/remote_state/guarded_channel.rs +++ /dev/null @@ -1,78 +0,0 @@ -use std::sync::{Arc, Mutex}; - -use tokio::sync::mpsc; - -/// Creates a new [`mpsc`] channel where the receiver can only close if there are no active senders. -pub(super) fn guarded_channel(cap: usize) -> (GuardedSender, GuardedReceiver) { - let (tx, rx) = mpsc::channel(cap); - let tx = Arc::new(Mutex::new(Some(tx))); - (GuardedSender { tx: tx.clone() }, GuardedReceiver { tx, rx }) -} - -#[derive(Debug)] -pub(crate) struct GuardedSender { - tx: Arc>>>, -} - -impl GuardedSender { - /// Returns a sender to the channel. - /// - /// Returns a new sender if the channel is not closed. It is guaranteed that - /// [`GuardedReceiver::close_if_idle`] will not return `true` until the sender is dropped. - /// Returns `None` if the channel has been closed. - pub(crate) fn get(&self) -> Option> { - self.tx.lock().expect("poisoned").clone() - } - - /// Returns `true` if the channel has been closed. - pub(crate) fn is_closed(&self) -> bool { - self.tx.lock().expect("poisoned").is_none() - } -} - -#[derive(Debug)] -pub(super) struct GuardedReceiver { - rx: mpsc::Receiver, - tx: Arc>>>, -} - -impl GuardedReceiver { - /// Receives the next value for this receiver. - /// - /// See [`mpsc::Receiver::recv`]. - pub(super) async fn recv(&mut self) -> Option { - self.rx.recv().await - } - - /// Returns `true` if the inbox is empty and no senders to the inbox exist. - pub(super) fn is_idle(&self) -> bool { - self.rx.is_empty() && self.rx.sender_strong_count() <= 1 - } - - /// Closes the channel if the channel is idle. - /// - /// Returns `true` if the channel is idle and has now been closed, and `false` if the channel - /// is not idle and therefore has not been not closed. - /// - /// Uses a lock internally to make sure that there cannot be a race condition between - /// calling this and a new sender being created. - pub(super) fn close_if_idle(&mut self) -> bool { - let mut guard = self.tx.lock().expect("poisoned"); - if self.is_idle() { - *guard = None; - self.rx.close(); - true - } else { - false - } - } -} - -impl Drop for GuardedReceiver { - fn drop(&mut self) { - let mut guard = self.tx.lock().expect("poisoned"); - *guard = None; - self.rx.close(); - drop(guard) - } -} From 4c4d11ddeaa170e266f32150e8c90c9cf9c4316c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 20 Nov 2025 09:49:26 +0100 Subject: [PATCH 15/17] Send back leftover messages --- iroh/src/magicsock/remote_map.rs | 32 ++++++++++++++++--- iroh/src/magicsock/remote_map/remote_state.rs | 13 +++++--- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/iroh/src/magicsock/remote_map.rs b/iroh/src/magicsock/remote_map.rs index 23e3cedac2..7db34204ab 100644 --- a/iroh/src/magicsock/remote_map.rs +++ b/iroh/src/magicsock/remote_map.rs @@ -64,7 +64,7 @@ pub(crate) struct RemoteMap { disco: DiscoState, sender: TransportsSender, discovery: ConcurrentDiscovery, - actor_tasks: Mutex>>, + actor_tasks: Mutex)>>, } impl RemoteMap { @@ -105,7 +105,22 @@ impl RemoteMap { senders.retain(|_eid, sender| !sender.is_closed()); while let Some(result) = self.actor_tasks.lock().expect("poisoned").try_join_next() { match result { - Ok(leftover_msgs) => debug!(?leftover_msgs, "TODO: handle leftover messages"), + Ok((eid, leftover_msgs)) => { + let entry = senders.entry(eid); + if leftover_msgs.is_empty() { + match entry { + hash_map::Entry::Occupied(occupied_entry) => occupied_entry.remove(), + hash_map::Entry::Vacant(_) => { + panic!("this should be impossible TODO(matheus23)"); + } + }; + } else { + // The remote actor got messages while it was closing, so we're restarting + debug!(%eid, "restarting terminated remote state actor: messages received during shutdown"); + let sender = self.start_remote_state_actor(eid, leftover_msgs); + entry.insert_entry(sender); + } + } Err(err) => { if let Ok(panic) = err.try_into_panic() { error!("RemoteStateActor panicked."); @@ -126,7 +141,7 @@ impl RemoteMap { match handles.entry(eid) { hash_map::Entry::Occupied(entry) => entry.get().clone(), hash_map::Entry::Vacant(entry) => { - let sender = self.start_remote_state_actor(eid); + let sender = self.start_remote_state_actor(eid, vec![]); entry.insert(sender.clone()); sender } @@ -136,7 +151,11 @@ impl RemoteMap { /// Starts a new remote state actor and returns a handle and a sender. /// /// The handle is not inserted into the endpoint map, this must be done by the caller of this function. - fn start_remote_state_actor(&self, eid: EndpointId) -> mpsc::Sender { + fn start_remote_state_actor( + &self, + eid: EndpointId, + initial_msgs: Vec, + ) -> mpsc::Sender { // Ensure there is a RemoteMappedAddr for this EndpointId. self.endpoint_mapped_addrs.get(&eid); RemoteStateActor::new( @@ -149,7 +168,10 @@ impl RemoteMap { self.sender.clone(), self.discovery.clone(), ) - .start(self.actor_tasks.lock().expect("poisoned").deref_mut()) + .start( + initial_msgs, + self.actor_tasks.lock().expect("poisoned").deref_mut(), + ) } pub(super) fn handle_ping(&self, msg: disco::Ping, sender: EndpointId, src: transports::Addr) { diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 7ecbfafbc4..d8a03fa72a 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -210,7 +210,8 @@ impl RemoteStateActor { pub(super) fn start( self, - tasks: &mut JoinSet>, + initial_msgs: Vec, + tasks: &mut JoinSet<(EndpointId, Vec)>, ) -> mpsc::Sender { let (tx, rx) = mpsc::channel(16); let me = self.local_endpoint_id; @@ -221,7 +222,7 @@ impl RemoteStateActor { // we don't explicitly set a span we get the spans from whatever call happens to // first create the actor, which is often very confusing as it then keeps those // spans for all logging of the actor. - tasks.spawn(self.run(rx).instrument(info_span!( + tasks.spawn(self.run(initial_msgs, rx).instrument(info_span!( parent: None, "RemoteStateActor", me = %me.fmt_short(), @@ -237,9 +238,13 @@ impl RemoteStateActor { /// discipline is needed to not turn pending for a long time. async fn run( mut self, + initial_msgs: Vec, mut inbox: mpsc::Receiver, - ) -> Vec { + ) -> (EndpointId, Vec) { trace!("actor started"); + for msg in initial_msgs { + self.handle_message(msg).await; + } let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT); n0_future::pin!(idle_timeout); let leftover_msgs = loop { @@ -318,7 +323,7 @@ impl RemoteStateActor { }; trace!("actor terminating"); - leftover_msgs + (self.endpoint_id, leftover_msgs) } /// Handles an actor message. From 5cd4dd13219d302e113cfbd033448a20e68a4887 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 20 Nov 2025 11:29:44 +0100 Subject: [PATCH 16/17] `cargo make format` --- iroh/src/magicsock/remote_map/remote_state.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index d8a03fa72a..81d2647a17 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -20,12 +20,11 @@ use quinn_proto::{PathError, PathEvent, PathId, PathStatus}; use rustc_hash::FxHashMap; use smallvec::SmallVec; use sync_wrapper::SyncStream; +use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn}; use self::path_state::RemotePathState; -use tokio::sync::{mpsc, oneshot}; - use super::Source; use crate::{ disco::{self}, From 6e09bda10bf3fbe4ec6b182d49f5dec1934b44dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 20 Nov 2025 11:37:00 +0100 Subject: [PATCH 17/17] Move leftover msg fetching to the end of `run` --- iroh/src/magicsock/remote_map/remote_state.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index 81d2647a17..e503abe16b 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -246,7 +246,7 @@ impl RemoteStateActor { } let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT); n0_future::pin!(idle_timeout); - let leftover_msgs = loop { + loop { let scheduled_path_open = match self.scheduled_open_path { Some(when) => MaybeFuture::Some(time::sleep_until(when)), None => MaybeFuture::None, @@ -267,7 +267,7 @@ impl RemoteStateActor { msg = inbox.recv() => { match msg { Some(msg) => self.handle_message(msg).await, - None => break vec![], + None => break, } } Some((id, evt)) = self.path_events.next() => { @@ -307,19 +307,20 @@ impl RemoteStateActor { _ = &mut idle_timeout => { if self.connections.is_empty() && inbox.is_empty() { trace!("idle timeout expired and still idle: terminate actor"); - inbox.close(); - // There might be a race between checking `inbox.is_empty()` and `inbox.close()`, - // so we pull out all messages that are left over. - let mut leftover_msgs = Vec::with_capacity(inbox.len()); - inbox.recv_many(&mut leftover_msgs, inbox.len()).await; - break leftover_msgs; + break; } else { // Seems like we weren't really idle, so we reset idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); } } } - }; + } + + inbox.close(); + // There might be a race between checking `inbox.is_empty()` and `inbox.close()`, + // so we pull out all messages that are left over. + let mut leftover_msgs = Vec::with_capacity(inbox.len()); + inbox.recv_many(&mut leftover_msgs, inbox.len()).await; trace!("actor terminating"); (self.endpoint_id, leftover_msgs)