Skip to content

Commit 224ae8c

Browse files
author
“ramfox”
committed
added tests for prune_paths, added EndpointActorState::prune_paths method, and called it whenever new paths are added
1 parent 50de570 commit 224ae8c

File tree

2 files changed

+242
-51
lines changed

2 files changed

+242
-51
lines changed

iroh/src/magicsock/remote_map.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,6 @@ pub enum Source {
235235
_0: Private,
236236
},
237237
/// We established a connection on this address.
238-
///
239-
/// Currently this means the path was in uses as [`PathId::ZERO`] when the a connection
240-
/// was added to the `RemoteStateActor`.
241-
///
242-
/// [`PathId::ZERO`]: quinn_proto::PathId::ZERO
243238
#[strum(serialize = "Connection")]
244239
Connection {
245240
/// private marker

iroh/src/magicsock/remote_map/remote_state.rs

Lines changed: 242 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::{BTreeSet, VecDeque},
2+
collections::{BTreeSet, HashSet, VecDeque},
33
net::SocketAddr,
44
pin::Pin,
55
sync::Arc,
@@ -442,6 +442,8 @@ impl RemoteStateActor {
442442
self.paths
443443
.insert(path_remote, Source::Connection { _0: Private });
444444
self.select_path();
445+
// TODO(ramfox): do we need to prune paths here?
446+
self.prune_paths();
445447

446448
if path_remote_is_ip {
447449
// We may have raced this with a relay address. Try and add any
@@ -492,6 +494,9 @@ impl RemoteStateActor {
492494
self.send_disco_message(dst, disco::Message::Ping(ping))
493495
.await;
494496
}
497+
// prune any unused/inactive paths, now that we have added potential
498+
// new ones
499+
self.prune_paths()
495500
}
496501

497502
/// Handles [`RemoteStateMessage::PingReceived`].
@@ -525,6 +530,7 @@ impl RemoteStateActor {
525530

526531
trace!("ping received, triggering holepunching");
527532
self.trigger_holepunching().await;
533+
// TODO(ramfox): potentially prune addrs here?
528534
}
529535

530536
/// Handles [`RemoteStateMessage::PongReceived`].
@@ -744,6 +750,7 @@ impl RemoteStateActor {
744750
self.paths.disco_ping_sent(addr.clone(), msg.tx_id);
745751
self.send_disco_message(addr, disco::Message::Ping(msg))
746752
.await;
753+
// TODO(ramfox): potentially prune addrs here?
747754
}
748755

749756
// Send the DISCO CallMeMaybe message over the relay.
@@ -895,6 +902,7 @@ impl RemoteStateActor {
895902
}
896903

897904
self.select_path();
905+
self.prune_paths();
898906
}
899907
PathEvent::Abandoned { id, path_stats } => {
900908
trace!(?path_stats, "path abandoned");
@@ -1046,6 +1054,25 @@ impl RemoteStateActor {
10461054
}
10471055
}
10481056
}
1057+
1058+
fn prune_paths(&mut self) {
1059+
// if the total number of paths, relay or ip, is less
1060+
// than the max inactive ip addrs we allow, bail early
1061+
if self.paths.len() < MAX_INACTIVE_IP_ADDRESSES {
1062+
return;
1063+
}
1064+
let open_paths = self
1065+
.connections
1066+
.values()
1067+
.map(|state| state.open_paths.values())
1068+
.flatten();
1069+
prune_paths(
1070+
&mut self.paths,
1071+
&self.pending_open_paths,
1072+
&self.selected_path.get(),
1073+
open_paths,
1074+
);
1075+
}
10491076
}
10501077

10511078
/// Messages to send to the [`RemoteStateActor`].
@@ -1428,82 +1455,251 @@ fn to_transports_addr(
14281455
})
14291456
}
14301457

1431-
fn prune_paths(
1458+
fn prune_paths<'a>(
14321459
paths: &mut FxHashMap<transports::Addr, PathState>,
14331460
pending: &VecDeque<transports::Addr>,
14341461
selected_path: &Option<transports::Addr>,
1435-
open_paths: &Vec<transports::Addr>,
1462+
open_paths: impl Iterator<Item = &'a transports::Addr>,
14361463
) {
1437-
let ip_paths: BTreeSet<_> = paths
1438-
.keys()
1439-
.filter(|p| {
1440-
if p.is_ip() {
1441-
return true;
1442-
}
1443-
return false;
1444-
})
1445-
.cloned()
1446-
.collect();
1464+
let ip_count = paths.keys().filter(|p| p.is_ip()).count();
14471465
// if the total number of ip paths is less than the allowed number of inactive
14481466
// paths, just return early;
1449-
if ip_paths.len() < MAX_INACTIVE_IP_ADDRESSES {
1467+
if ip_count < MAX_INACTIVE_IP_ADDRESSES {
14501468
return;
14511469
}
14521470

1453-
let mut protected_paths = std::collections::BTreeSet::new();
1471+
let ip_paths: HashSet<_> = paths.keys().filter(|p| p.is_ip()).collect();
1472+
1473+
let mut protected_paths = HashSet::new();
14541474
for addr in pending {
1455-
protected_paths.insert(addr.clone());
1475+
protected_paths.insert(addr);
14561476
}
14571477
if let Some(path) = selected_path {
1458-
protected_paths.insert(path.clone());
1478+
protected_paths.insert(path);
14591479
}
14601480
for path in open_paths {
1461-
protected_paths.insert(path.clone());
1481+
protected_paths.insert(path);
14621482
}
14631483

1464-
let inactive_paths: Vec<_> = ip_paths.difference(&protected_paths).collect();
1484+
let inactive_paths: HashSet<_> = ip_paths
1485+
.difference(&protected_paths)
1486+
// cloned here so we can use `paths.retain` later
1487+
.map(|&addr| addr.clone())
1488+
.collect();
14651489

14661490
if inactive_paths.len() < MAX_INACTIVE_IP_ADDRESSES {
14671491
return;
14681492
}
14691493

1470-
let mut keep_paths = Vec::new();
14711494
let now = Instant::now();
1472-
// if the last instance in the source was CONST time ago, it can be pruned
1473-
for (addr, state) in paths {
1474-
if inactive_paths.contains(&&addr) {
1475-
let mut is_expired = true;
1476-
for (_source, instant) in &state.sources {
1477-
// it's been less than LAST_SOURCE_PRUNE_DURATION since we
1478-
// last learned about this source
1479-
if *instant + LAST_SOURCE_PRUNE_DURATION < now {
1480-
is_expired = false;
1481-
break;
1482-
}
1483-
}
1484-
if !is_expired {
1485-
keep_paths.push(addr);
1486-
}
1487-
continue;
1495+
1496+
paths.retain(|addr, state| {
1497+
if inactive_paths.contains(addr) {
1498+
keep_path(state, &now)
14881499
} else {
1489-
keep_paths.push(addr);
1500+
// keep all active paths
1501+
true
14901502
}
1491-
}
1503+
});
1504+
}
14921505

1493-
*paths = paths
1494-
.iter()
1495-
.to_owned()
1496-
.filter(|(addr, _)| keep_paths.contains(addr))
1497-
.map(|(addr, state)| (addr.clone(), state.clone()))
1498-
.collect();
1506+
/// Based on the [`PathState`], returns true if we should keep this path.
1507+
///
1508+
/// Currently we have two criteria:
1509+
/// 1) This path has sent a Ping
1510+
/// 2) The last time we learned about this address was greater than LAST_SOURCE_PRUNE_DURATION
1511+
fn keep_path(state: &PathState, now: &Instant) -> bool {
1512+
// if we have never sent a ping, don't remove it
1513+
state.ping_sent.is_none()
1514+
|| state
1515+
.sources
1516+
.values()
1517+
// only keep it if this path contains recent sources
1518+
.any(|instant| *instant + LAST_SOURCE_PRUNE_DURATION > *now)
14991519
}
15001520

15011521
#[cfg(test)]
15021522
mod tests {
1523+
use super::super::Source;
1524+
use super::{PathState, prune_paths};
1525+
use crate::disco::TransactionId;
1526+
use crate::magicsock::{endpoint_map::Private, transports};
15031527
use n0_error::Result;
1528+
use n0_future::time::{Duration, Instant};
1529+
use rustc_hash::FxHashMap;
1530+
use std::collections::VecDeque;
1531+
use std::net::{Ipv4Addr, SocketAddr};
1532+
1533+
/// Create a test IP address with specific port
1534+
fn test_ip_addr(port: u16) -> transports::Addr {
1535+
transports::Addr::Ip(SocketAddr::new(
1536+
std::net::IpAddr::V4(Ipv4Addr::LOCALHOST),
1537+
port,
1538+
))
1539+
}
1540+
1541+
/// Create a PathState with sources at a specific time offset
1542+
fn test_path_state(time_offset: Duration, sent_ping: bool) -> PathState {
1543+
let mut state = PathState::default();
1544+
if sent_ping {
1545+
state.ping_sent = Some(TransactionId::default());
1546+
}
1547+
state.sources.insert(
1548+
Source::Connection { _0: Private },
1549+
Instant::now() - time_offset,
1550+
);
1551+
state
1552+
}
1553+
1554+
#[test]
1555+
fn test_prune_paths_too_few_total_paths() -> Result {
1556+
// create fewer than MAX_INACTIVE_IP_ADDRESSES paths
1557+
let mut paths = FxHashMap::default();
1558+
for i in 0..15 {
1559+
paths.insert(
1560+
test_ip_addr(i),
1561+
test_path_state(Duration::from_secs(0), false),
1562+
);
1563+
}
1564+
1565+
let pending = VecDeque::new();
1566+
let selected_path = None;
1567+
let open_paths = Vec::new();
1568+
1569+
let initial_len = paths.len();
1570+
// should not prune because we have fewer than MAX_INACTIVE_IP_ADDRESSES paths
1571+
prune_paths(&mut paths, &pending, &selected_path, open_paths.iter());
1572+
assert_eq!(
1573+
paths.len(),
1574+
initial_len,
1575+
"Expected no paths to be pruned when total IP paths < MAX_INACTIVE_IP_ADDRESSES"
1576+
);
1577+
1578+
Ok(())
1579+
}
1580+
1581+
#[test]
1582+
fn test_prune_paths_too_few_inactive_paths() -> Result {
1583+
// create MAX_INACTIVE_IP_ADDRESSES + 5 paths
1584+
let mut paths = FxHashMap::default();
1585+
for i in 0..25 {
1586+
paths.insert(
1587+
test_ip_addr(i),
1588+
test_path_state(Duration::from_secs(0), false),
1589+
);
1590+
}
1591+
1592+
// mark 10 of them as "active" by adding them to open_paths
1593+
let open_paths: Vec<transports::Addr> = (0..10).map(|i| test_ip_addr(i)).collect();
1594+
1595+
let pending = VecDeque::new();
1596+
let selected_path = None;
1597+
1598+
let initial_len = paths.len();
1599+
// now we have 25 total paths, but only 15 inactive paths (25 - 10 = 15)
1600+
// which is less than MAX_INACTIVE_IP_ADDRESSES (20)
1601+
prune_paths(&mut paths, &pending, &selected_path, open_paths.iter());
1602+
assert_eq!(
1603+
paths.len(),
1604+
initial_len,
1605+
"Expected no paths to be pruned when inactive paths < MAX_INACTIVE_IP_ADDRESSES"
1606+
);
1607+
1608+
Ok(())
1609+
}
15041610

15051611
#[test]
1506-
fn test_prune_paths() -> Result {
1507-
todo!();
1612+
fn test_prune_paths_prunes_old_inactive_paths() -> Result {
1613+
// create MAX_INACTIVE_IP_ADDRESSES + 10 paths
1614+
let mut paths = FxHashMap::default();
1615+
1616+
// add 20 paths with recent sources (within 2 minutes)
1617+
for i in 0..20 {
1618+
paths.insert(
1619+
test_ip_addr(i),
1620+
test_path_state(Duration::from_secs(60), true), // 1 minute ago
1621+
);
1622+
}
1623+
1624+
// add 10 paths with old sources (more than 2 minutes ago)
1625+
for i in 20..30 {
1626+
paths.insert(
1627+
test_ip_addr(i),
1628+
test_path_state(Duration::from_secs(180), true), // 3 minutes ago
1629+
);
1630+
}
1631+
1632+
let pending = VecDeque::new();
1633+
let selected_path = None;
1634+
let open_paths = Vec::new();
1635+
1636+
// we have 30 total paths, all inactive
1637+
// paths with sources older than LAST_SOURCE_PRUNE_DURATION should be pruned
1638+
prune_paths(&mut paths, &pending, &selected_path, open_paths.iter());
1639+
1640+
// we should have kept the 20 recent paths
1641+
assert_eq!(
1642+
paths.len(),
1643+
20,
1644+
"Expected to keep 20 paths with recent sources"
1645+
);
1646+
1647+
// verify that the kept paths are the ones with recent sources
1648+
for i in 0..20 {
1649+
let addr = test_ip_addr(i);
1650+
assert!(
1651+
paths.contains_key(&addr),
1652+
"Expected to keep path with recent source: {:?}",
1653+
addr
1654+
);
1655+
}
1656+
1657+
// verify that the old paths were removed
1658+
for i in 20..30 {
1659+
let addr = test_ip_addr(i);
1660+
assert!(
1661+
!paths.contains_key(&addr),
1662+
"Expected to prune path with old source: {:?}",
1663+
addr
1664+
);
1665+
}
1666+
1667+
Ok(())
1668+
}
1669+
1670+
#[test]
1671+
fn test_prune_paths_protects_selected_and_open_paths() -> Result {
1672+
// create MAX_INACTIVE_IP_ADDRESSES + 10 paths, all with old sources
1673+
let mut paths = FxHashMap::default();
1674+
for i in 0..30 {
1675+
paths.insert(
1676+
test_ip_addr(i),
1677+
test_path_state(Duration::from_secs(180), true), // 3 minutes ago
1678+
);
1679+
}
1680+
1681+
let pending = VecDeque::new();
1682+
// mark one path as selected
1683+
let selected_path = Some(test_ip_addr(0));
1684+
// mark a few paths as open
1685+
let open_paths = vec![test_ip_addr(1), test_ip_addr(2)];
1686+
1687+
prune_paths(&mut paths, &pending, &selected_path, open_paths.iter());
1688+
1689+
// protected paths should still be in the result even though they have old sources
1690+
assert!(
1691+
paths.contains_key(&test_ip_addr(0)),
1692+
"Expected to keep selected path even with old source"
1693+
);
1694+
assert!(
1695+
paths.contains_key(&test_ip_addr(1)),
1696+
"Expected to keep open path even with old source"
1697+
);
1698+
assert!(
1699+
paths.contains_key(&test_ip_addr(2)),
1700+
"Expected to keep open path even with old source"
1701+
);
1702+
1703+
Ok(())
15081704
}
15091705
}

0 commit comments

Comments
 (0)