diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 75a90acfc8..3de7d36f5f 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -64,7 +64,7 @@ use std::{ fmt::Debug, fs, io::{Read, Write}, - net::SocketAddr, + net::{IpAddr, SocketAddr}, num::NonZeroUsize, path::PathBuf, }; @@ -721,7 +721,7 @@ impl NetworkBuilder { network_discovery: NetworkDiscovery::new(&peer_id), bootstrap_peers: Default::default(), live_connected_peers: Default::default(), - latest_connected_peers: Default::default(), + latest_established_connection_ids: Default::default(), handling_statistics: Default::default(), handled_times: 0, hard_disk_write_error: 0, @@ -820,10 +820,9 @@ pub struct SwarmDriver { // Peers that having live connection to. Any peer got contacted during kad network query // will have live connection established. And they may not appear in the RT. pub(crate) live_connected_peers: BTreeMap, - /// The peers that we recently connected to. - /// This is a limited list used to prevent log spamming. - /// Use `live_connected_peers` for a full list. - pub(crate) latest_connected_peers: HashMap, + /// The list of recently established connections ids. + /// This is used to prevent log spamming. + pub(crate) latest_established_connection_ids: HashMap, // Record the handling time of the recent 10 for each handling kind. handling_statistics: BTreeMap>, handled_times: usize, @@ -887,15 +886,17 @@ impl SwarmDriver { swarm_event = self.swarm.select_next_some() => { // Refer to the handle_swarm_events::IncomingConnectionError for more info on why we skip - // processing the event for one round. - if let Some(previous_event) = previous_incoming_connection_error_event.take() { - if let Err(err) = self.handle_swarm_events(previous_event) { - warn!("Error while handling swarm event: {err}"); + // processing the IncomingConnectionError event here. + if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) || matches!(swarm_event, SwarmEvent::ConnectionEstablished {..}) { + if let Some(previous_event) = previous_incoming_connection_error_event.take() { + if let Err(err) = self.handle_swarm_events(previous_event) { + warn!("Error while handling swarm event: {err}"); + } + } + if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) { + previous_incoming_connection_error_event = Some(swarm_event); + continue; } - } - if matches!(swarm_event, SwarmEvent::IncomingConnectionError {..}) { - previous_incoming_connection_error_event = Some(swarm_event); - continue; } // logging for handling events happens inside handle_swarm_events diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs index dbbb4f85ba..bffdfa425d 100644 --- a/sn_networking/src/event/swarm.rs +++ b/sn_networking/src/event/swarm.rs @@ -7,8 +7,8 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - event::NodeEvent, multiaddr_is_global, multiaddr_strip_p2p, relay_manager::is_a_relayed_peer, - target_arch::Instant, NetworkEvent, Result, SwarmDriver, + event::NodeEvent, multiaddr_get_ip, multiaddr_is_global, multiaddr_strip_p2p, + relay_manager::is_a_relayed_peer, target_arch::Instant, NetworkEvent, Result, SwarmDriver, }; #[cfg(feature = "local")] use libp2p::mdns; @@ -19,7 +19,7 @@ use libp2p::{ multiaddr::Protocol, swarm::{ dial_opts::{DialOpts, PeerCondition}, - DialError, SwarmEvent, + ConnectionId, DialError, SwarmEvent, }, Multiaddr, PeerId, TransportError, }; @@ -364,7 +364,10 @@ impl SwarmDriver { connection_id, (peer_id, Instant::now() + Duration::from_secs(60)), ); - self.insert_latest_connected_peers(endpoint.get_remote_address().clone()); + self.insert_latest_established_connection_ids( + connection_id, + endpoint.get_remote_address(), + ); self.record_connection_metrics(); if endpoint.is_dialer() { @@ -381,9 +384,6 @@ impl SwarmDriver { event_string = "ConnectionClosed"; debug!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint)); let _ = self.live_connected_peers.remove(&connection_id); - let _ = self - .latest_connected_peers - .remove(endpoint.get_remote_address()); self.record_connection_metrics(); } SwarmEvent::OutgoingConnectionError { @@ -513,7 +513,9 @@ impl SwarmDriver { error, } => { event_string = "Incoming ConnErr"; - // Only log if this for a non-existing connection. + // Only log as ERROR if the the connection is not adjacent to an already established connection id from + // the same IP address. + // // If a peer contains multiple transports/listen addrs, we might try to open multiple connections, // and if the first one passes, we would get error on the rest. We don't want to log these. // @@ -522,8 +524,10 @@ impl SwarmDriver { // giving time for ConnectionEstablished to be hopefully processed. // And since we don't do anything critical with this event, the order and time of processing is // not critical. - if !self.latest_connected_peers.contains_key(&send_back_addr) { + if self.should_we_log_incoming_connection_error(connection_id, &send_back_addr) { error!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}"); + } else { + debug!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}"); } let _ = self.live_connected_peers.remove(&connection_id); self.record_connection_metrics(); @@ -675,29 +679,61 @@ impl SwarmDriver { } } - /// Insert into the latest connected peers list. This list does not contain all the connected peers. - /// Older entries are removed if the list exceeds 50. - fn insert_latest_connected_peers(&mut self, addr: Multiaddr) { - let old_instant = self - .latest_connected_peers - .entry(addr) - .or_insert_with(Instant::now); - *old_instant = Instant::now(); + /// Insert the latest established connection id into the list. + fn insert_latest_established_connection_ids(&mut self, id: ConnectionId, addr: &Multiaddr) { + let Ok(id) = format!("{id}").parse::() else { + return; + }; + let Some(ip_addr) = multiaddr_get_ip(addr) else { + return; + }; + + let _ = self + .latest_established_connection_ids + .insert(id, (ip_addr, Instant::now())); - while self.latest_connected_peers.len() >= 50 { + while self.latest_established_connection_ids.len() >= 50 { // remove the oldest entry - let Some(oldest) = self - .latest_connected_peers + let Some(oldest_key) = self + .latest_established_connection_ids .iter() - .min_by_key(|(_, time)| *time) - .map(|(addr, _)| addr.clone()) + .min_by_key(|(_, (_, time))| *time) + .map(|(id, _)| *id) else { break; }; - self.latest_connected_peers.remove(&oldest); + self.latest_established_connection_ids.remove(&oldest_key); } } + + // Do not log IncomingConnectionError if the ConnectionId is adjacent to an already established connection. + fn should_we_log_incoming_connection_error(&self, id: ConnectionId, addr: &Multiaddr) -> bool { + let Ok(id) = format!("{id}").parse::() else { + return true; + }; + let Some(ip_addr) = multiaddr_get_ip(addr) else { + return true; + }; + + // This should prevent most of the cases where we get an IncomingConnectionError for a peer with multiple + // transports/listen addrs. + if let Some((established_ip_addr, _)) = + self.latest_established_connection_ids.get(&(id - 1)) + { + if established_ip_addr == &ip_addr { + return false; + } + } else if let Some((established_ip_addr, _)) = + self.latest_established_connection_ids.get(&(id + 1)) + { + if established_ip_addr == &ip_addr { + return false; + } + } + + true + } } /// Helper function to print formatted connection role info.