Skip to content

Commit

Permalink
[Consensus Observer] Fix argument ordering and add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 7, 2024
1 parent a33b939 commit dffdec5
Showing 1 changed file with 161 additions and 1 deletion.
162 changes: 161 additions & 1 deletion consensus/src/consensus_observer/observer/subscription_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ pub async fn create_new_subscriptions(
// Sort the potential peers for subscription requests
let mut sorted_potential_peers = match sort_peers_for_subscriptions(
connected_peers_and_metadata,
unhealthy_subscription_peers,
active_subscription_peers,
unhealthy_subscription_peers,
consensus_publisher,
) {
Some(sorted_peers) => sorted_peers,
Expand Down Expand Up @@ -465,6 +465,166 @@ mod tests {
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_create_new_subscriptions_active_peers() {
// Create a consensus observer config and client
let consensus_observer_config = ConsensusObserverConfig::default();
let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public];
let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) =
create_consensus_observer_client(network_ids);

// Create a list of connected peers (one per network)
let mut connected_peers = vec![];
for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] {
// Create a new peer
let peer_network_id = create_peer_and_connection(
*network_id,
peers_and_metadata.clone(),
get_distance_from_validators(network_id),
None,
true,
);

// Add the peer to the list of sorted peers
connected_peers.push(peer_network_id);
}

// Get the connected peers and metadata
let connected_peers_and_metadata = peers_and_metadata
.get_connected_peers_and_metadata()
.unwrap();

// Spawn the subscription creation task to create 2 subscriptions,
// but mark the validator peer as an active subscription.
let num_subscriptions_to_create = 2;
let active_subscription_peers = vec![connected_peers[0]];
let subscription_creation_handle = tokio::spawn(async move {
create_new_subscriptions(
consensus_observer_config,
consensus_observer_client.clone(),
None,
Arc::new(MockDatabaseReader::new()),
TimeService::mock(),
connected_peers_and_metadata,
num_subscriptions_to_create,
active_subscription_peers,
vec![],
)
.await
});

// Handle the peer manager requests made by the subscription creation task
for connected_peer in &connected_peers {
// If the peer is the validator, we should not expect a subscription request
let network_id = connected_peer.network_id();
if network_id.is_validator_network() {
continue;
}

// Otherwise, handle the subscription request
handle_next_subscription_request(network_id, &mut peer_manager_request_receivers, true)
.await;
}

// Wait for the subscription creation task to complete
let consensus_observer_subscriptions = subscription_creation_handle.await.unwrap();

// Verify the number of created subscriptions
assert_eq!(
consensus_observer_subscriptions.len(),
num_subscriptions_to_create
);

// Verify the created subscription peers
let first_peer = connected_peers[1];
let last_peer = connected_peers[2];
let expected_subscription_peers = [first_peer, last_peer];
for consensus_observer_subscription in consensus_observer_subscriptions {
let peer_network_id = consensus_observer_subscription.get_peer_network_id();
assert!(expected_subscription_peers.contains(&peer_network_id));
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_create_new_subscriptions_unhealthy_peers() {
// Create a consensus observer config and client
let consensus_observer_config = ConsensusObserverConfig::default();
let network_ids = &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public];
let (peers_and_metadata, consensus_observer_client, mut peer_manager_request_receivers) =
create_consensus_observer_client(network_ids);

// Create a list of connected peers (one per network)
let mut connected_peers = vec![];
for network_id in &[NetworkId::Validator, NetworkId::Vfn, NetworkId::Public] {
// Create a new peer
let peer_network_id = create_peer_and_connection(
*network_id,
peers_and_metadata.clone(),
get_distance_from_validators(network_id),
None,
true,
);

// Add the peer to the list of sorted peers
connected_peers.push(peer_network_id);
}

// Get the connected peers and metadata
let connected_peers_and_metadata = peers_and_metadata
.get_connected_peers_and_metadata()
.unwrap();

// Spawn the subscription creation task to create 2 subscriptions,
// but mark the VFN peer as an unhealthy subscription.
let num_subscriptions_to_create = 2;
let unhealthy_subscription_peers = vec![connected_peers[1]];
let subscription_creation_handle = tokio::spawn(async move {
create_new_subscriptions(
consensus_observer_config,
consensus_observer_client.clone(),
None,
Arc::new(MockDatabaseReader::new()),
TimeService::mock(),
connected_peers_and_metadata,
num_subscriptions_to_create,
vec![],
unhealthy_subscription_peers,
)
.await
});

// Handle the peer manager requests made by the subscription creation task
for connected_peer in &connected_peers {
// If the peer is the VFN, we should not expect a subscription request
let network_id = connected_peer.network_id();
if network_id.is_vfn_network() {
continue;
}

// Otherwise, handle the subscription request
handle_next_subscription_request(network_id, &mut peer_manager_request_receivers, true)
.await;
}

// Wait for the subscription creation task to complete
let consensus_observer_subscriptions = subscription_creation_handle.await.unwrap();

// Verify the number of created subscriptions
assert_eq!(
consensus_observer_subscriptions.len(),
num_subscriptions_to_create
);

// Verify the created subscription peers
let first_peer = connected_peers[0];
let last_peer = connected_peers[2];
let expected_subscription_peers = [first_peer, last_peer];
for consensus_observer_subscription in consensus_observer_subscriptions {
let peer_network_id = consensus_observer_subscription.get_peer_network_id();
assert!(expected_subscription_peers.contains(&peer_network_id));
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_create_new_subscriptions_multiple() {
// Create a consensus observer config and client
Expand Down

0 comments on commit dffdec5

Please sign in to comment.