Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

. #2439

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft

. #2439

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions rs/p2p/consensus_manager/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub(crate) struct ConsensusManagerMetrics {
pub assemble_task_finished_total: IntCounter,
pub assemble_task_duration: Histogram,
pub assemble_task_result_total: IntCounterVec,
pub assemble_task_restart_after_join_total: IntCounter,

// Slot table
pub slot_table_updates_total: IntCounter,
Expand Down Expand Up @@ -89,15 +88,6 @@ impl ConsensusManagerMetrics {
)
.unwrap(),
),
assemble_task_restart_after_join_total: metrics_registry.register(
IntCounter::with_opts(opts!(
"ic_consensus_manager_assemble_task_restart_after_join_total",
"assemble task immediately restarted due to advert appearing when closing.",
const_labels.clone(),
))
.unwrap(),
),

slot_table_updates_total: metrics_registry.register(
IntCounter::with_opts(opts!(
"ic_consensus_manager_slot_table_updates_total",
Expand Down
77 changes: 20 additions & 57 deletions rs/p2p/consensus_manager/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ pub(crate) struct ConsensusManagerReceiver<
slot_table: HashMap<NodeId, HashMap<SlotNumber, SlotEntry<WireArtifact::Id>>>,
active_assembles: HashMap<WireArtifact::Id, watch::Sender<PeerCounter>>,

#[allow(clippy::type_complexity)]
artifact_processor_tasks: JoinSet<(watch::Receiver<PeerCounter>, WireArtifact::Id)>,
artifact_processor_tasks: JoinSet<WireArtifact::Id>,

topology_watcher: watch::Receiver<SubnetTopology>,

Expand Down Expand Up @@ -259,8 +258,15 @@ where
}
Some(result) = self.artifact_processor_tasks.join_next() => {
match result {
Ok((receiver, id)) => {
self.handle_artifact_processor_joined(receiver, id, cancellation_token.clone());
Ok(id) => {
self.active_assembles.remove(&id);
debug_assert!(
self.slot_table
.values()
.flat_map(HashMap::values)
.all(|v| self.active_assembles.contains_key(&v.id)),
"Every entry in the slot table should have an active assemble task."
);

}
Err(err) => {
Expand Down Expand Up @@ -295,45 +301,6 @@ where
}
}

pub(crate) fn handle_artifact_processor_joined(
&mut self,
peer_rx: watch::Receiver<PeerCounter>,
id: WireArtifact::Id,
cancellation_token: CancellationToken,
) {
self.metrics.assemble_task_finished_total.inc();
// Invariant: Peer sender should only be dropped in this task..
debug_assert!(peer_rx.has_changed().is_ok());

// peer advertised after task finished.
if !peer_rx.borrow().is_empty() {
self.metrics.assemble_task_restart_after_join_total.inc();
self.metrics.assemble_task_started_total.inc();
self.artifact_processor_tasks.spawn_on(
Self::process_advert(
self.log.clone(),
id,
None,
peer_rx,
self.sender.clone(),
self.artifact_assembler.clone(),
self.metrics.clone(),
cancellation_token.clone(),
),
&self.rt_handle,
);
} else {
self.active_assembles.remove(&id);
}
debug_assert!(
self.slot_table
.values()
.flat_map(HashMap::values)
.all(|v| self.active_assembles.contains_key(&v.id)),
"Every entry in the slot table should have an active assemble task."
);
}

#[instrument(skip_all)]
pub(crate) fn handle_advert_receive(
&mut self,
Expand Down Expand Up @@ -463,7 +430,7 @@ where
artifact_assembler: Assembler,
metrics: ConsensusManagerMetrics,
cancellation_token: CancellationToken,
) -> (watch::Receiver<PeerCounter>, WireArtifact::Id) {
) -> WireArtifact::Id {
let _timer = metrics.assemble_task_duration.start_timer();

let mut peer_rx_clone = peer_rx.clone();
Expand Down Expand Up @@ -529,8 +496,8 @@ where
.inc();
},
};

(peer_rx, id_c)
metrics.assemble_task_finished_total.inc();
id_c
}

/// Notifies all running tasks about the topology update.
Expand Down Expand Up @@ -966,7 +933,7 @@ mod tests {
.expect("Artifact processor task panicked");

// Check that assemble task for first advert closes.
assert_eq!(result.1, 0);
assert_eq!(result, 0);
}

/// Check that adverts updates with higher connection ids take precedence.
Expand Down Expand Up @@ -1063,7 +1030,7 @@ mod tests {
);

// Check that assemble task for first advert closes.
assert_eq!(result.1, 0);
assert_eq!(result, 0);
}

/// Verify that if two peers advertise the same advert it will get added to the same assemble task.
Expand Down Expand Up @@ -1137,7 +1104,7 @@ mod tests {
cancellation.clone(),
);
// Check that the assemble task is closed.
let (peer_rx, id) = mgr
let _ = mgr
.artifact_processor_tasks
.join_next()
.await
Expand All @@ -1155,9 +1122,6 @@ mod tests {
cancellation.clone(),
);
assert_eq!(mgr.active_assembles.len(), 2);
// Verify that we reopened the assemble task for advert 0.
mgr.handle_artifact_processor_joined(peer_rx, id, cancellation.clone());
assert_eq!(mgr.active_assembles.len(), 2);
}

/// Verify that slot table is pruned if node leaves subnet.
Expand Down Expand Up @@ -1272,8 +1236,7 @@ mod tests {
.join_next()
.await
.unwrap()
.unwrap()
.1,
.unwrap(),
0
);
}
Expand Down Expand Up @@ -1357,7 +1320,7 @@ mod tests {
.expect("Artifact processor task panicked");

assert_eq!(
result.1, 0,
result, 0,
"Expected artifact processor task for id 0 to closed"
);
assert_eq!(mgr.artifact_processor_tasks.len(), 1);
Expand Down Expand Up @@ -1423,7 +1386,7 @@ mod tests {
// Only assemble task 1 closes because it got overwritten.
tokio::time::timeout(Duration::from_millis(100), async {
while let Some(id) = mgr.artifact_processor_tasks.join_next().await {
assert_eq!(id.unwrap().1, 1);
assert_eq!(id.unwrap(), 1);
}
})
.await
Expand Down Expand Up @@ -1534,7 +1497,7 @@ mod tests {
.expect("Artifact processor task panicked");

assert_eq!(
result.1, 0,
result, 0,
"Expected artifact processor task for id 0 to closed"
);
assert_eq!(mgr.artifact_processor_tasks.len(), 1);
Expand Down
Loading