From d14abcb7d30cc4fc7dcffc951074ec85ce9e9e2b Mon Sep 17 00:00:00 2001 From: Quinn Wilton Date: Mon, 29 Jan 2024 12:03:57 -0800 Subject: [PATCH] feat: redial `node_addresses` at an interval on connection close (#529) This resolves #400. --- homestar-runtime/config/defaults.toml | 1 + homestar-runtime/src/event_handler.rs | 3 + homestar-runtime/src/event_handler/cache.rs | 6 + homestar-runtime/src/event_handler/event.rs | 8 + .../src/event_handler/swarm_event.rs | 70 ++++ .../src/settings/libp2p_config.rs | 4 + .../tests/network/notification.rs | 376 ++++++++++++++++++ 7 files changed, 468 insertions(+) diff --git a/homestar-runtime/config/defaults.toml b/homestar-runtime/config/defaults.toml index 48181388..f2d65c53 100644 --- a/homestar-runtime/config/defaults.toml +++ b/homestar-runtime/config/defaults.toml @@ -26,6 +26,7 @@ announce_addresses = [] transport_connection_timeout = 60 max_connected_peers = 32 max_announce_addresses = 10 +dial_interval = 30 [node.network.libp2p.mdns] enable = true diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 14460d51..facc796a 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -148,6 +148,7 @@ struct Rendezvous { // Connected peers configuration and state struct Connections { + dial_interval: Duration, peers: FnvHashMap, max_peers: u32, } @@ -190,6 +191,7 @@ where query_senders: FnvHashMap::default(), request_response_senders: FnvHashMap::default(), connections: Connections { + dial_interval: settings.libp2p.dial_interval, peers: FnvHashMap::default(), max_peers: settings.libp2p.max_connected_peers, }, @@ -231,6 +233,7 @@ where query_senders: FnvHashMap::default(), request_response_senders: FnvHashMap::default(), connections: Connections { + dial_interval: settings.libp2p.dial_interval, peers: FnvHashMap::default(), max_peers: settings.libp2p.max_connected_peers, }, diff --git a/homestar-runtime/src/event_handler/cache.rs b/homestar-runtime/src/event_handler/cache.rs index a4fedb8b..10e5b89a 100644 --- a/homestar-runtime/src/event_handler/cache.rs +++ b/homestar-runtime/src/event_handler/cache.rs @@ -51,6 +51,7 @@ pub(crate) enum CacheData { pub(crate) enum DispatchEvent { RegisterPeer, DiscoverPeers, + DialPeer, } /// Setup a cache with an eviction listener. @@ -78,6 +79,11 @@ pub(crate) fn setup_cache( let _ = tx.send(Event::DiscoverPeers(rendezvous_node.to_owned())); }; } + DispatchEvent::DialPeer => { + if let Some(CacheData::Peer(node)) = val.data.get("node") { + let _ = tx.send(Event::DialPeer(node.to_owned())); + }; + } } } }; diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index a239b05e..ce095e7e 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -132,6 +132,8 @@ pub(crate) enum Event { DiscoverPeers(PeerId), /// Dynamically get listeners for the swarm. GetNodeInfo(AsyncChannelSender), + /// Dial a peer. + DialPeer(PeerId), } #[allow(unreachable_patterns)] @@ -293,6 +295,12 @@ impl Event { ); } } + Event::DialPeer(peer_id) => { + event_handler + .swarm + .dial(peer_id) + .map_err(anyhow::Error::new)?; + } _ => {} } Ok(()) diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index b8a3d0e1..5fb0162c 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -1157,6 +1157,32 @@ async fn handle_swarm_event( peer_id = peer_id.to_string(), "removed peer from kademlia table" ); + } else { + debug!( + subject = "libp2p.conn.closed", + category = "handle_swarm_event", + peer_id = peer_id.to_string(), + "redialing trusted peer in {interval:?}", + interval = event_handler.connections.dial_interval + ); + + // Dial peers again at dial interval + event_handler + .cache + .insert( + format!("{}-dial", peer_id), + CacheValue::new( + event_handler.connections.dial_interval, + HashMap::from([ + ( + "on_expiration".to_string(), + CacheData::OnExpiration(cache::DispatchEvent::DialPeer), + ), + ("node".to_string(), CacheData::Peer(peer_id)), + ]), + ), + ) + .await; } #[cfg(feature = "websocket-notify")] @@ -1182,6 +1208,50 @@ async fn handle_swarm_event( "outgoing connection error" ); + // Redial peer if in configured peers + if let Some(peer_id) = peer_id { + if event_handler.node_addresses.iter().any(|multiaddr| { + if let Some(id) = multiaddr.peer_id() { + id == peer_id + } else { + // TODO: We may want to check the multiadress without relying on + // the peer ID. This would give more flexibility when configuring nodes. + warn!( + subject = "libp2p.outgoing.err", + category = "handle_swarm_event", + "Configured peer must include a peer ID: {multiaddr}" + ); + false + } + }) { + debug!( + subject = "libp2p.outgoing.err", + category = "handle_swarm_event", + peer_id = peer_id.to_string(), + "redialing trusted peer in {interval:?}", + interval = event_handler.connections.dial_interval + ); + + // Dial peers again at dial interval + event_handler + .cache + .insert( + format!("{}-dial", peer_id), + CacheValue::new( + event_handler.connections.dial_interval, + HashMap::from([ + ( + "on_expiration".to_string(), + CacheData::OnExpiration(cache::DispatchEvent::DialPeer), + ), + ("node".to_string(), CacheData::Peer(peer_id)), + ]), + ), + ) + .await; + } + } + #[cfg(feature = "websocket-notify")] notification::emit_event( event_handler.ws_evt_sender(), diff --git a/homestar-runtime/src/settings/libp2p_config.rs b/homestar-runtime/src/settings/libp2p_config.rs index 733e5465..bb913873 100644 --- a/homestar-runtime/src/settings/libp2p_config.rs +++ b/homestar-runtime/src/settings/libp2p_config.rs @@ -39,6 +39,9 @@ pub(crate) struct Libp2p { /// Transport connection timeout. #[serde_as(as = "DurationSeconds")] pub(crate) transport_connection_timeout: Duration, + /// Dial interval. + #[serde_as(as = "DurationSeconds")] + pub(crate) dial_interval: Duration, } /// DHT settings. @@ -138,6 +141,7 @@ impl Default for Libp2p { pubsub: Pubsub::default(), rendezvous: Rendezvous::default(), transport_connection_timeout: Duration::new(60, 0), + dial_interval: Duration::new(30, 0), } } } diff --git a/homestar-runtime/tests/network/notification.rs b/homestar-runtime/tests/network/notification.rs index 40b2bf03..88f2c132 100644 --- a/homestar-runtime/tests/network/notification.rs +++ b/homestar-runtime/tests/network/notification.rs @@ -184,3 +184,379 @@ fn test_connection_notifications_integration() -> Result<()> { Ok(()) } + +#[test] +fn test_libp2p_redial_on_connection_closed_integration() -> Result<()> { + let proc_info1 = ProcInfo::new().unwrap(); + let proc_info2 = ProcInfo::new().unwrap(); + + let rpc_port1 = proc_info1.rpc_port; + let rpc_port2 = proc_info2.rpc_port; + let metrics_port1 = proc_info1.metrics_port; + let metrics_port2 = proc_info2.metrics_port; + let ws_port1 = proc_info1.ws_port; + let ws_port2 = proc_info2.ws_port; + let listen_addr1 = listen_addr(proc_info1.listen_port); + let listen_addr2 = listen_addr(proc_info2.listen_port); + let node_addrb = multiaddr(proc_info2.listen_port, SECP256K1MULTIHASH); + + let toml1 = format!( + r#" + [node] + [node.network.keypair_config] + existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }} + [node.network.libp2p] + listen_address = "{listen_addr1}" + node_addresses = ["{node_addrb}"] + dial_interval = 3 + [node.network.libp2p.mdns] + enable = false + [node.network.libp2p.rendezvous] + enable_client = false + [node.network.metrics] + port = {metrics_port1} + [node.network.rpc] + port = {rpc_port1} + [node.network.webserver] + port = {ws_port1} + "# + ); + + let toml2 = format!( + r#" + [node] + [node.network.keypair_config] + existing = {{ key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" }} + [node.network.libp2p] + listen_address = "{listen_addr2}" + node_addresses = [] + [node.network.libp2p.mdns] + enable = false + [node.network.libp2p.rendezvous] + enable_client = false + [node.network.metrics] + port = {metrics_port2} + [node.network.rpc] + port = {rpc_port2} + [node.network.webserver] + port = {ws_port2} + "# + ); + + let config1 = make_config!(toml1); + let config2 = make_config!(toml2); + + let homestar_proc1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg(config1.filename()) + .arg("--db") + .arg(&proc_info1.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let _proc_guard1 = ChildGuard::new(homestar_proc1); + + if wait_for_socket_connection(ws_port1, 1000).is_err() { + panic!("Homestar server/runtime failed to start in time"); + } + + tokio_test::block_on(async { + let ws_url = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port1); + let client = WsClientBuilder::default() + .build(ws_url.clone()) + .await + .unwrap(); + + let mut sub1: Subscription> = client + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg(config2.filename()) + .arg("--db") + .arg(&proc_info2.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let proc_guard2 = ChildGuard::new(homestar_proc2); + + // Poll for connection established message + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["type"].as_str().unwrap() == "network:connectionEstablished" { + break; + } + } else { + panic!("Node one did not establish a connection with node two in time.") + } + } + + kill_homestar(proc_guard2.take(), None); + + // Poll for connection closed message + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["type"].as_str().unwrap() == "network:connectionClosed" { + break; + } + } else { + panic!("Connection between node one and node two did not close in time.") + } + } + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg(config2.filename()) + .arg("--db") + .arg(&proc_info2.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let _proc_guard2 = ChildGuard::new(homestar_proc2); + + // Poll for connection established message + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["type"].as_str().unwrap() == "network:connectionEstablished" { + break; + } + } else { + panic!("Node one did not redial node two in time.") + } + } + }); + + Ok(()) +} + +#[test] +fn test_libp2p_redial_on_connection_error_integration() -> Result<()> { + let proc_info1 = ProcInfo::new().unwrap(); + let proc_info2 = ProcInfo::new().unwrap(); + + let rpc_port1 = proc_info1.rpc_port; + let rpc_port2 = proc_info2.rpc_port; + let metrics_port1 = proc_info1.metrics_port; + let metrics_port2 = proc_info2.metrics_port; + let ws_port1 = proc_info1.ws_port; + let ws_port2 = proc_info2.ws_port; + let listen_addr1 = listen_addr(proc_info1.listen_port); + let listen_addr2 = listen_addr(proc_info2.listen_port); + let node_addrb = multiaddr(proc_info2.listen_port, SECP256K1MULTIHASH); + + let toml1 = format!( + r#" + [node] + [node.network.keypair_config] + existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }} + [node.network.libp2p] + listen_address = "{listen_addr1}" + node_addresses = ["{node_addrb}"] + dial_interval = 3 + [node.network.libp2p.mdns] + enable = false + [node.network.libp2p.rendezvous] + enable_client = false + [node.network.metrics] + port = {metrics_port1} + [node.network.rpc] + port = {rpc_port1} + [node.network.webserver] + port = {ws_port1} + "# + ); + + let toml2 = format!( + r#" + [node] + [node.network.keypair_config] + existing = {{ key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" }} + [node.network.libp2p] + listen_address = "{listen_addr2}" + node_addresses = [] + [node.network.libp2p.mdns] + enable = false + [node.network.libp2p.rendezvous] + enable_client = false + [node.network.metrics] + port = {metrics_port2} + [node.network.rpc] + port = {rpc_port2} + [node.network.webserver] + port = {ws_port2} + "# + ); + + let config1 = make_config!(toml1); + let config2 = make_config!(toml2); + + let homestar_proc1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg(config1.filename()) + .arg("--db") + .arg(&proc_info1.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let _proc_guard1 = ChildGuard::new(homestar_proc1); + + if wait_for_socket_connection(ws_port1, 1000).is_err() { + panic!("Homestar server/runtime failed to start in time"); + } + + tokio_test::block_on(async { + let ws_url = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port1); + let client = WsClientBuilder::default() + .build(ws_url.clone()) + .await + .unwrap(); + + let mut sub1: Subscription> = client + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg(config2.filename()) + .arg("--db") + .arg(&proc_info2.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let proc_guard2 = ChildGuard::new(homestar_proc2); + + // Poll for connection established message + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["type"].as_str().unwrap() == "network:connectionEstablished" { + break; + } + } else { + panic!("Node one did not establish a connection with node two in time.") + } + } + + kill_homestar(proc_guard2.take(), None); + + // Poll for connection closed message + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["type"].as_str().unwrap() == "network:connectionClosed" { + break; + } + } else { + panic!("Connection between node one and node two did not close in time.") + } + } + + // Poll for outgoing connection error message + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["type"].as_str().unwrap() == "network:outgoingConnectionError" { + break; + } + } else { + panic!("Connection between node one and node two did not close in time.") + } + } + + // Poll for outgoing connection error message + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["type"].as_str().unwrap() == "network:outgoingConnectionError" { + break; + } + } else { + panic!("Connection between node one and node two did not close in time.") + } + } + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg(config2.filename()) + .arg("--db") + .arg(&proc_info2.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let _proc_guard2 = ChildGuard::new(homestar_proc2); + + // Poll for connection established message + loop { + if let Ok(msg) = sub1.next().with_timeout(Duration::from_secs(30)).await { + let json: serde_json::Value = + serde_json::from_slice(&msg.unwrap().unwrap()).unwrap(); + + if json["type"].as_str().unwrap() == "network:connectionEstablished" { + break; + } + } else { + panic!("Node one did not redial node two in time.") + } + } + }); + + Ok(()) +}