diff --git a/scylla/src/transport/cluster.rs b/scylla/src/transport/cluster.rs index e8cce1500c..bb64ef7475 100644 --- a/scylla/src/transport/cluster.rs +++ b/scylla/src/transport/cluster.rs @@ -320,6 +320,9 @@ impl ClusterData { } }; + // Reset the up marker to what the source of truth says. + node.change_up_marker(peer.up); + new_known_peers.insert(peer.address, node.clone()); if let Some(dc) = &node.datacenter { @@ -472,8 +475,8 @@ impl ClusterWorker { // later as planned. match status { - StatusChangeEvent::Down(addr) => self.change_node_down_marker(addr, true), - StatusChangeEvent::Up(addr) => self.change_node_down_marker(addr, false), + StatusChangeEvent::Down(addr) => self.change_node_up_marker(addr, false), + StatusChangeEvent::Up(addr) => self.change_node_up_marker(addr, true), } continue; }, @@ -514,7 +517,7 @@ impl ClusterWorker { } } - fn change_node_down_marker(&mut self, addr: SocketAddr, is_down: bool) { + fn change_node_up_marker(&mut self, addr: SocketAddr, is_up: bool) { let cluster_data = self.cluster_data.load_full(); let node = match cluster_data.known_peers.get(&addr) { @@ -525,7 +528,7 @@ impl ClusterWorker { } }; - node.change_down_marker(is_down); + node.change_up_marker(is_up); } async fn handle_use_keyspace_request( diff --git a/scylla/src/transport/load_balancing/mod.rs b/scylla/src/transport/load_balancing/mod.rs index aef40d0821..e656a060ef 100644 --- a/scylla/src/transport/load_balancing/mod.rs +++ b/scylla/src/transport/load_balancing/mod.rs @@ -182,6 +182,7 @@ mod tests { address: tests::id_to_invalid_addr(*id), tokens: Vec::new(), untranslated_address: Some(tests::id_to_invalid_addr(*id)), + up: true, }) .collect::>(); @@ -238,6 +239,7 @@ mod tests { Token { value: 500 }, ], untranslated_address: None, + up: true, }, Peer { datacenter: Some("eu".into()), @@ -249,6 +251,7 @@ mod tests { Token { value: 300 }, ], untranslated_address: None, + up: true, }, Peer { datacenter: Some("us".into()), @@ -256,6 +259,7 @@ mod tests { address: tests::id_to_invalid_addr(3), tokens: vec![Token { value: 200 }, Token { value: 400 }], untranslated_address: None, + up: true, }, ]; diff --git a/scylla/src/transport/load_balancing/token_aware.rs b/scylla/src/transport/load_balancing/token_aware.rs index 5207350c54..0a01403c03 100644 --- a/scylla/src/transport/load_balancing/token_aware.rs +++ b/scylla/src/transport/load_balancing/token_aware.rs @@ -305,6 +305,7 @@ mod tests { address: tests::id_to_invalid_addr(1), tokens: vec![Token { value: 50 }, Token { value: 200 }], untranslated_address: Some(tests::id_to_invalid_addr(1)), + up: true, }, Peer { datacenter: Some("waw".into()), @@ -312,6 +313,7 @@ mod tests { address: tests::id_to_invalid_addr(2), tokens: vec![Token { value: 150 }], untranslated_address: Some(tests::id_to_invalid_addr(2)), + up: true, }, Peer { datacenter: Some("waw".into()), @@ -319,6 +321,7 @@ mod tests { address: tests::id_to_invalid_addr(3), tokens: vec![Token { value: 510 }], untranslated_address: Some(tests::id_to_invalid_addr(3)), + up: true, }, Peer { datacenter: Some("waw".into()), @@ -326,6 +329,7 @@ mod tests { address: tests::id_to_invalid_addr(4), tokens: vec![Token { value: 300 }], untranslated_address: Some(tests::id_to_invalid_addr(4)), + up: true, }, Peer { datacenter: Some("her".into()), @@ -333,6 +337,7 @@ mod tests { address: tests::id_to_invalid_addr(5), tokens: vec![Token { value: 100 }], untranslated_address: Some(tests::id_to_invalid_addr(5)), + up: true, }, Peer { datacenter: Some("her".into()), @@ -340,6 +345,7 @@ mod tests { address: tests::id_to_invalid_addr(6), tokens: vec![Token { value: 250 }], untranslated_address: Some(tests::id_to_invalid_addr(6)), + up: true, }, Peer { datacenter: Some("her".into()), @@ -347,6 +353,7 @@ mod tests { address: tests::id_to_invalid_addr(7), tokens: vec![Token { value: 500 }], untranslated_address: Some(tests::id_to_invalid_addr(7)), + up: true, }, Peer { datacenter: Some("her".into()), @@ -354,6 +361,7 @@ mod tests { address: tests::id_to_invalid_addr(8), tokens: vec![Token { value: 400 }], untranslated_address: Some(tests::id_to_invalid_addr(8)), + up: true, }, ]; diff --git a/scylla/src/transport/node.rs b/scylla/src/transport/node.rs index 6b264e69a6..fe2f521b43 100644 --- a/scylla/src/transport/node.rs +++ b/scylla/src/transport/node.rs @@ -63,7 +63,7 @@ pub struct Node { // If the node is filtered out by the host filter, this will be None pool: Option, - down_marker: AtomicBool, + up_marker: AtomicBool, } impl Node { @@ -91,8 +91,8 @@ impl Node { datacenter, rack, pool, - down_marker: false.into(), average_latency: RwLock::new(None), + up_marker: true.into(), } } @@ -114,8 +114,8 @@ impl Node { self.get_pool()?.random_connection() } - pub fn is_down(&self) -> bool { - self.down_marker.load(Ordering::Relaxed) + pub fn is_up(&self) -> bool { + self.up_marker.load(Ordering::Relaxed) } /// Returns a boolean which indicates whether this node was is enabled. @@ -125,8 +125,8 @@ impl Node { self.pool.is_some() } - pub(crate) fn change_down_marker(&self, is_down: bool) { - self.down_marker.store(is_down, Ordering::Relaxed); + pub(crate) fn change_up_marker(&self, is_up: bool) { + self.up_marker.store(is_up, Ordering::Relaxed); } pub(crate) async fn use_keyspace( diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index 29213fe6af..fa848d1f9e 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -54,6 +54,7 @@ pub struct Peer { pub tokens: Vec, pub datacenter: Option, pub rack: Option, + pub up: bool, } #[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way @@ -203,6 +204,7 @@ impl Metadata { datacenter: None, rack: None, untranslated_address: None, + up: true, } }) .collect(); @@ -485,15 +487,30 @@ async fn query_peers( local_query.set_page_size(1024); let local_query_future = conn.query_all(&local_query, &[]); - let (peers_res, local_res) = tokio::try_join!(peers_query_future, local_query_future)?; + let mut up_status_query = Query::new("select peer, up from system.cluster_status"); + up_status_query.set_page_size(1024); + let up_status_query_future = conn.query_all(&up_status_query, &[]); + + let (peers_res, local_res, up_status_res) = tokio::try_join!( + peers_query_future, + local_query_future, + up_status_query_future + )?; let peers_rows = peers_res.rows.ok_or(QueryError::ProtocolError( "system.peers query response was not Rows", ))?; - let local_rows = local_res.rows.ok_or(QueryError::ProtocolError( "system.local query response was not Rows", ))?; + let up_status_rows = up_status_res.rows.ok_or(QueryError::ProtocolError( + "system.cluster_status query response was not Rows", + ))?; + + let up_status: HashMap = up_status_rows + .into_typed::<(IpAddr, bool)>() + .collect::, _>>() + .map_err(|_| QueryError::ProtocolError("system.cluster_status has invalid column type"))?; let typed_peers_rows = peers_rows.into_typed::<(IpAddr, Option, Option, Option>)>(); @@ -563,12 +580,24 @@ async fn query_peers( } }; + let up = if let Some(up) = up_status.get(&address.ip()) { + *up + } else { + warn!( + "Couldn't find cluster status for node {}, marking it as UP", + address.ip() + ); + + true + }; + Ok(Some(Peer { untranslated_address, address, tokens, datacenter, rack, + up })) });