diff --git a/rs/p2p/consensus_manager/src/metrics.rs b/rs/p2p/consensus_manager/src/metrics.rs index 3acf3c33aad..6ac41936ba6 100644 --- a/rs/p2p/consensus_manager/src/metrics.rs +++ b/rs/p2p/consensus_manager/src/metrics.rs @@ -17,7 +17,6 @@ pub(crate) struct ConsensusManagerMetrics { pub assemble_task_finished_total: IntCounter, pub assemble_task_duration: Histogram, pub assemble_task_result_total: IntCounterVec, - pub assemble_task_restart_after_join_total: IntCounter, // Slot table pub slot_table_updates_total: IntCounter, @@ -89,15 +88,6 @@ impl ConsensusManagerMetrics { ) .unwrap(), ), - assemble_task_restart_after_join_total: metrics_registry.register( - IntCounter::with_opts(opts!( - "ic_consensus_manager_assemble_task_restart_after_join_total", - "assemble task immediately restarted due to advert appearing when closing.", - const_labels.clone(), - )) - .unwrap(), - ), - slot_table_updates_total: metrics_registry.register( IntCounter::with_opts(opts!( "ic_consensus_manager_slot_table_updates_total", diff --git a/rs/p2p/consensus_manager/src/receiver.rs b/rs/p2p/consensus_manager/src/receiver.rs index a448158b3d5..d6dea65cdd5 100644 --- a/rs/p2p/consensus_manager/src/receiver.rs +++ b/rs/p2p/consensus_manager/src/receiver.rs @@ -191,8 +191,7 @@ pub(crate) struct ConsensusManagerReceiver< slot_table: HashMap>>, active_assembles: HashMap>, - #[allow(clippy::type_complexity)] - artifact_processor_tasks: JoinSet<(watch::Receiver, WireArtifact::Id)>, + artifact_processor_tasks: JoinSet, topology_watcher: watch::Receiver, @@ -259,8 +258,15 @@ where } Some(result) = self.artifact_processor_tasks.join_next() => { match result { - Ok((receiver, id)) => { - self.handle_artifact_processor_joined(receiver, id, cancellation_token.clone()); + Ok(id) => { + self.active_assembles.remove(&id); + debug_assert!( + self.slot_table + .values() + .flat_map(HashMap::values) + .all(|v| self.active_assembles.contains_key(&v.id)), + "Every entry in the slot table should have an active assemble task." + ); } Err(err) => { @@ -295,45 +301,6 @@ where } } - pub(crate) fn handle_artifact_processor_joined( - &mut self, - peer_rx: watch::Receiver, - id: WireArtifact::Id, - cancellation_token: CancellationToken, - ) { - self.metrics.assemble_task_finished_total.inc(); - // Invariant: Peer sender should only be dropped in this task.. - debug_assert!(peer_rx.has_changed().is_ok()); - - // peer advertised after task finished. - if !peer_rx.borrow().is_empty() { - self.metrics.assemble_task_restart_after_join_total.inc(); - self.metrics.assemble_task_started_total.inc(); - self.artifact_processor_tasks.spawn_on( - Self::process_advert( - self.log.clone(), - id, - None, - peer_rx, - self.sender.clone(), - self.artifact_assembler.clone(), - self.metrics.clone(), - cancellation_token.clone(), - ), - &self.rt_handle, - ); - } else { - self.active_assembles.remove(&id); - } - debug_assert!( - self.slot_table - .values() - .flat_map(HashMap::values) - .all(|v| self.active_assembles.contains_key(&v.id)), - "Every entry in the slot table should have an active assemble task." - ); - } - #[instrument(skip_all)] pub(crate) fn handle_advert_receive( &mut self, @@ -463,7 +430,7 @@ where artifact_assembler: Assembler, metrics: ConsensusManagerMetrics, cancellation_token: CancellationToken, - ) -> (watch::Receiver, WireArtifact::Id) { + ) -> WireArtifact::Id { let _timer = metrics.assemble_task_duration.start_timer(); let mut peer_rx_clone = peer_rx.clone(); @@ -529,8 +496,8 @@ where .inc(); }, }; - - (peer_rx, id_c) + metrics.assemble_task_finished_total.inc(); + id_c } /// Notifies all running tasks about the topology update. @@ -966,7 +933,7 @@ mod tests { .expect("Artifact processor task panicked"); // Check that assemble task for first advert closes. - assert_eq!(result.1, 0); + assert_eq!(result, 0); } /// Check that adverts updates with higher connection ids take precedence. @@ -1063,7 +1030,7 @@ mod tests { ); // Check that assemble task for first advert closes. - assert_eq!(result.1, 0); + assert_eq!(result, 0); } /// Verify that if two peers advertise the same advert it will get added to the same assemble task. @@ -1137,7 +1104,7 @@ mod tests { cancellation.clone(), ); // Check that the assemble task is closed. - let (peer_rx, id) = mgr + let _ = mgr .artifact_processor_tasks .join_next() .await @@ -1155,9 +1122,6 @@ mod tests { cancellation.clone(), ); assert_eq!(mgr.active_assembles.len(), 2); - // Verify that we reopened the assemble task for advert 0. - mgr.handle_artifact_processor_joined(peer_rx, id, cancellation.clone()); - assert_eq!(mgr.active_assembles.len(), 2); } /// Verify that slot table is pruned if node leaves subnet. @@ -1272,8 +1236,7 @@ mod tests { .join_next() .await .unwrap() - .unwrap() - .1, + .unwrap(), 0 ); } @@ -1357,7 +1320,7 @@ mod tests { .expect("Artifact processor task panicked"); assert_eq!( - result.1, 0, + result, 0, "Expected artifact processor task for id 0 to closed" ); assert_eq!(mgr.artifact_processor_tasks.len(), 1); @@ -1423,7 +1386,7 @@ mod tests { // Only assemble task 1 closes because it got overwritten. tokio::time::timeout(Duration::from_millis(100), async { while let Some(id) = mgr.artifact_processor_tasks.join_next().await { - assert_eq!(id.unwrap().1, 1); + assert_eq!(id.unwrap(), 1); } }) .await @@ -1534,7 +1497,7 @@ mod tests { .expect("Artifact processor task panicked"); assert_eq!( - result.1, 0, + result, 0, "Expected artifact processor task for id 0 to closed" ); assert_eq!(mgr.artifact_processor_tasks.len(), 1);