diff --git a/network/src/p2p/handler.rs b/network/src/p2p/handler.rs index 462552117b..12f7c6de97 100644 --- a/network/src/p2p/handler.rs +++ b/network/src/p2p/handler.rs @@ -506,6 +506,7 @@ impl IoHandler for Handler { is_inbound: true, } => { let mut inbound_connections = self.inbound_connections.write(); + let outbound_connections = self.outbound_connections.write(); let target = connection.peer_addr(); self.peer_db.insert(*target); if let Some(token) = self.inbound_tokens.lock().gen() { @@ -517,17 +518,26 @@ impl IoHandler for Handler { remote_node_id, token ); - assert_eq!( - None, - self.remote_node_ids_reverse.write().insert(remote_node_id, token), - "{}:{} is already registered", - remote_node_id, - token - ); - - let t = inbound_connections.insert(token, connection); - assert!(t.is_none()); - io.register_stream(token); + if !self.remote_node_ids_reverse.write().contains_key(&remote_node_id) { + assert_eq!( + None, + self.remote_node_ids_reverse.write().insert(remote_node_id, token), + "{}:{} is already registered", + remote_node_id, + token + ); + } + let can_insert = inbound_connections + .values() + .all(|in_connection| in_connection.peer_addr() != connection.peer_addr()); + let is_not_out = outbound_connections + .values() + .all(|out_connection| out_connection.peer_addr() != connection.peer_addr()); + if can_insert && is_not_out { + let t = inbound_connections.insert(token, connection); + assert!(t.is_none()); + io.register_stream(token); + } } else { cwarn!(NETWORK, "Cannot establish an inbound connection"); } @@ -537,6 +547,7 @@ impl IoHandler for Handler { is_inbound: false, } => { let mut outbound_connections = self.outbound_connections.write(); + let inbound_connections = self.inbound_connections.write(); if let Some(token) = self.outbound_tokens.lock().gen() { let peer_addr = *connection.peer_addr(); let remote_node_id = peer_addr.into(); @@ -547,13 +558,15 @@ impl IoHandler for Handler { remote_node_id, token ); - assert_eq!( - None, - self.remote_node_ids_reverse.write().insert(remote_node_id, token), - "{}:{} is already registered", - remote_node_id, - token - ); + if !self.remote_node_ids_reverse.write().contains_key(&remote_node_id) { + assert_eq!( + None, + self.remote_node_ids_reverse.write().insert(remote_node_id, token), + "{}:{} is already registered", + remote_node_id, + token + ); + } let mut network_message_size = 0; for (name, versions) in self.client.extension_versions() { @@ -566,9 +579,18 @@ impl IoHandler for Handler { network_message_size, ); } - let t = outbound_connections.insert(token, connection); - assert!(t.is_none()); - io.register_stream(token); + let can_insert = outbound_connections + .values() + .all(|out_connection| out_connection.peer_addr() == connection.peer_addr()); + let is_not_in = inbound_connections + .values() + .all(|in_connection| in_connection.peer_addr() == connection.peer_addr()); + + if can_insert && is_not_in { + let t = outbound_connections.insert(token, connection); + assert!(t.is_none()); + io.register_stream(token); + } } else { cwarn!(NETWORK, "Cannot establish an outbound connection"); } diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index ce9e216472..0e778af927 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -476,8 +476,10 @@ impl NetworkExtension for Extension { cinfo!(SYNC, "New peer detected #{}", id); self.send_status(id); - let t = self.connected_nodes.insert(*id); - debug_assert!(t, "{} is already added to peer list", id); + if !self.connected_nodes.contains(id) { + let t = self.connected_nodes.insert(*id); + debug_assert!(t, "{} is already added to peer list", id); + } let token = self.token_generator.gen().expect("Token generator is full"); let token_info = TokenInfo { @@ -485,13 +487,16 @@ impl NetworkExtension for Extension { request_id: None, }; - let t = self.requests.insert(*id, Vec::new()); - debug_assert_eq!(None, t); - let t = self.tokens_info.insert(token, token_info); - debug_assert_eq!(None, t); - let t = self.tokens.insert(*id, token); - debug_assert_eq!(None, t); - debug_assert!(t.is_none()); + if !self.requests.contains_key(id) { + let t = self.requests.insert(*id, Vec::new()); + debug_assert_eq!(None, t); + } + self.tokens_info.entry(token).or_insert(token_info); + if !self.tokens.contains_key(id) { + let t = self.tokens.insert(*id, token); + debug_assert_eq!(None, t); + debug_assert!(t.is_none()); + } } fn on_node_removed(&mut self, id: &NodeId) {