From dffdec5faeb7c93c0a875bd3f7e00544894aea3a Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Wed, 6 Nov 2024 16:59:22 -0500 Subject: [PATCH] [Consensus Observer] Fix argument ordering and add tests. --- .../observer/subscription_utils.rs | 162 +++++++++++++++++- 1 file changed, 161 insertions(+), 1 deletion(-) diff --git a/consensus/src/consensus_observer/observer/subscription_utils.rs b/consensus/src/consensus_observer/observer/subscription_utils.rs index 0bca7c61b007d..0741ee9a458ea 100644 --- a/consensus/src/consensus_observer/observer/subscription_utils.rs +++ b/consensus/src/consensus_observer/observer/subscription_utils.rs @@ -48,8 +48,8 @@ pub async fn create_new_subscriptions( // Sort the potential peers for subscription requests let mut sorted_potential_peers = match sort_peers_for_subscriptions( connected_peers_and_metadata, - unhealthy_subscription_peers, active_subscription_peers, + unhealthy_subscription_peers, consensus_publisher, ) { Some(sorted_peers) => sorted_peers, @@ -465,6 +465,166 @@ mod tests { } } + #[tokio::test(flavor = "multi_thread")] + async fn test_create_new_subscriptions_active_peers() { + // Create a consensus observer config and client + let consensus_observer_config = ConsensusObserverConfig::default(); + let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; + let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) = + create_consensus_observer_client(network_ids); + + // Create a list of connected peers (one per network) + let mut connected_peers = vec![]; + for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] { + // Create a new peer + let peer_network_id = create_peer_and_connection( + *network_id, + peers_and_metadata.clone(), + get_distance_from_validators(network_id), + None, + true, + ); + + // Add the peer to the list of sorted peers + connected_peers.push(peer_network_id); + } + + // Get the connected peers and metadata + let connected_peers_and_metadata = peers_and_metadata + .get_connected_peers_and_metadata() + .unwrap(); + + // Spawn the subscription creation task to create 2 subscriptions, + // but mark the validator peer as an active subscription. + let num_subscriptions_to_create = 2; + let active_subscription_peers = vec![connected_peers[0]]; + let subscription_creation_handle = tokio::spawn(async move { + create_new_subscriptions( + consensus_observer_config, + consensus_observer_client.clone(), + None, + Arc::new(MockDatabaseReader::new()), + TimeService::mock(), + connected_peers_and_metadata, + num_subscriptions_to_create, + active_subscription_peers, + vec![], + ) + .await + }); + + // Handle the peer manager requests made by the subscription creation task + for connected_peer in &connected_peers { + // If the peer is the validator, we should not expect a subscription request + let network_id = connected_peer.network_id(); + if network_id.is_validator_network() { + continue; + } + + // Otherwise, handle the subscription request + handle_next_subscription_request(network_id, &mut peer_manager_request_receivers, true) + .await; + } + + // Wait for the subscription creation task to complete + let consensus_observer_subscriptions = subscription_creation_handle.await.unwrap(); + + // Verify the number of created subscriptions + assert_eq!( + consensus_observer_subscriptions.len(), + num_subscriptions_to_create + ); + + // Verify the created subscription peers + let first_peer = connected_peers[1]; + let last_peer = connected_peers[2]; + let expected_subscription_peers = [first_peer, last_peer]; + for consensus_observer_subscription in consensus_observer_subscriptions { + let peer_network_id = consensus_observer_subscription.get_peer_network_id(); + assert!(expected_subscription_peers.contains(&peer_network_id)); + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_new_subscriptions_unhealthy_peers() { + // Create a consensus observer config and client + let consensus_observer_config = ConsensusObserverConfig::default(); + let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public]; + let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) = + create_consensus_observer_client(network_ids); + + // Create a list of connected peers (one per network) + let mut connected_peers = vec![]; + for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] { + // Create a new peer + let peer_network_id = create_peer_and_connection( + *network_id, + peers_and_metadata.clone(), + get_distance_from_validators(network_id), + None, + true, + ); + + // Add the peer to the list of sorted peers + connected_peers.push(peer_network_id); + } + + // Get the connected peers and metadata + let connected_peers_and_metadata = peers_and_metadata + .get_connected_peers_and_metadata() + .unwrap(); + + // Spawn the subscription creation task to create 2 subscriptions, + // but mark the VFN peer as an unhealthy subscription. + let num_subscriptions_to_create = 2; + let unhealthy_subscription_peers = vec![connected_peers[1]]; + let subscription_creation_handle = tokio::spawn(async move { + create_new_subscriptions( + consensus_observer_config, + consensus_observer_client.clone(), + None, + Arc::new(MockDatabaseReader::new()), + TimeService::mock(), + connected_peers_and_metadata, + num_subscriptions_to_create, + vec![], + unhealthy_subscription_peers, + ) + .await + }); + + // Handle the peer manager requests made by the subscription creation task + for connected_peer in &connected_peers { + // If the peer is the VFN, we should not expect a subscription request + let network_id = connected_peer.network_id(); + if network_id.is_vfn_network() { + continue; + } + + // Otherwise, handle the subscription request + handle_next_subscription_request(network_id, &mut peer_manager_request_receivers, true) + .await; + } + + // Wait for the subscription creation task to complete + let consensus_observer_subscriptions = subscription_creation_handle.await.unwrap(); + + // Verify the number of created subscriptions + assert_eq!( + consensus_observer_subscriptions.len(), + num_subscriptions_to_create + ); + + // Verify the created subscription peers + let first_peer = connected_peers[0]; + let last_peer = connected_peers[2]; + let expected_subscription_peers = [first_peer, last_peer]; + for consensus_observer_subscription in consensus_observer_subscriptions { + let peer_network_id = consensus_observer_subscription.get_peer_network_id(); + assert!(expected_subscription_peers.contains(&peer_network_id)); + } + } + #[tokio::test(flavor = "multi_thread")] async fn test_create_new_subscriptions_multiple() { // Create a consensus observer config and client