Skip to content

Commit

Permalink
Merge pull request #2307 from joshuef/MinViableChangesReduced
Browse files Browse the repository at this point in the history
Less changes for cpu usage
  • Loading branch information
maqi authored Oct 24, 2024
2 parents c928ec7 + 538985d commit b428ec4
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 184 deletions.
40 changes: 19 additions & 21 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,10 @@ jobs:
CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }}
timeout-minutes: 30

# Sleep for a while to allow restarted nodes can be detected by others
- name: Sleep a while
run: sleep 300

- name: Stop the local network and upload logs
if: always()
uses: maidsafe/sn-local-testnet-action@main
Expand All @@ -653,6 +657,10 @@ jobs:
rg "(\d+) matches" | rg "\d+" -o)
echo "Restarted $restart_count nodes"
# `PeerRemovedFromRoutingTable` now only happens when a peer reported as `BadNode`.
# Otherwise kad will remove a `dropped out node` directly from RT.
# So, the detection of the removal explicity will now have much less chance,
# due to the removal of connection_issue tracking.
- name: Get peers removed from nodes using rg
shell: bash
timeout-minutes: 1
Expand All @@ -665,24 +673,6 @@ jobs:
fi
echo "PeerRemovedFromRoutingTable $peer_removed times"
- name: Verify peers removed exceed restarted node counts
shell: bash
timeout-minutes: 1
# get the counts, then the specific line, and then the digit count only
# then check we have an expected level of restarts
# TODO: make this use an env var, or relate to testnet size
run: |
restart_count=$(rg "Node is restarting in" "${{ matrix.node_data_path }}" -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "Restart $restart_count nodes"
peer_removed=$(rg "PeerRemovedFromRoutingTable" "${{ matrix.node_data_path }}" -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "PeerRemovedFromRoutingTable $peer_removed times"
if [ $peer_removed -lt $restart_count ]; then
echo "PeerRemovedFromRoutingTable times of: $peer_removed is less than the restart count of: $restart_count"
exit 1
fi
# TODO: reenable this once the testnet dir creation is tidied up to avoid a large count here
# if [ $restart_count -lt $node_count ]; then
# echo "Restart count of: $restart_count is less than the node count of: $node_count"
Expand Down Expand Up @@ -795,6 +785,10 @@ jobs:
CARGO_TARGET_DIR: ${{ matrix.os == 'windows-latest' && './test-target' || '.' }}
timeout-minutes: 5

# Sleep for a while to allow restarted nodes can be detected by others
- name: Sleep a while
run: sleep 300

- name: Stop the local network and upload logs
if: always()
uses: maidsafe/sn-local-testnet-action@main
Expand All @@ -808,16 +802,20 @@ jobs:
timeout-minutes: 1
# get the counts, then the specific line, and then the digit count only
# then check we have an expected level of restarts
# TODO: make this use an env var, or relate to testnet size
#
# `PeerRemovedFromRoutingTable` now only happens when a peer reported as `BadNode`.
# Otherwise kad will remove a `dropped out node` directly from RT.
# So, the detection of the removal explicity will now have much less chance,
# due to the removal of connection_issue tracking.
run: |
restart_count=$(rg "Node is restarting in" "${{ matrix.node_data_path }}" -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "Restart $restart_count nodes"
peer_removed=$(rg "PeerRemovedFromRoutingTable" "${{ matrix.node_data_path }}" -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "PeerRemovedFromRoutingTable $peer_removed times"
if [ $peer_removed -lt $restart_count ]; then
echo "PeerRemovedFromRoutingTable times of: $peer_removed is less than the restart count of: $restart_count"
if [ -z "$peer_removed" ]; then
echo "No peer removal count found"
exit 1
fi
node_count=$(ls "${{ matrix.node_data_path }}" | wc -l)
Expand Down
14 changes: 12 additions & 2 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use std::{
fs,
io::{Read, Write},
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
};
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -130,6 +131,13 @@ const NETWORKING_CHANNEL_SIZE: usize = 10_000;
/// Time before a Kad query times out if no response is received
const KAD_QUERY_TIMEOUT_S: Duration = Duration::from_secs(10);

// Init during compilation, instead of runtime error that should never happen
// Option<T>::expect will be stabilised as const in the future (https://github.com/rust-lang/rust/issues/67441)
const REPLICATION_FACTOR: NonZeroUsize = match NonZeroUsize::new(CLOSE_GROUP_SIZE) {
Some(v) => v,
None => panic!("CLOSE_GROUP_SIZE should not be zero"),
};

/// The various settings to apply to when fetching a record from network
#[derive(Clone)]
pub struct GetRecordCfg {
Expand Down Expand Up @@ -354,6 +362,7 @@ impl NetworkBuilder {
.disjoint_query_paths(true)
// Records never expire
.set_record_ttl(None)
.set_replication_factor(REPLICATION_FACTOR)
// Emit PUT events for validation prior to insertion into the RecordStore.
// This is no longer needed as the record_storage::put now can carry out validation.
// .set_record_filtering(KademliaStoreInserts::FilterBoth)
Expand Down Expand Up @@ -437,6 +446,7 @@ impl NetworkBuilder {
let _ = kad_cfg
.set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
.set_max_packet_size(MAX_PACKET_SIZE)
.set_replication_factor(REPLICATION_FACTOR)
// Require iterative queries to use disjoint paths for increased resiliency in the presence of potentially adversarial nodes.
.disjoint_query_paths(true);

Expand Down Expand Up @@ -912,7 +922,7 @@ impl SwarmDriver {
let farthest_peer_to_check = self
.get_all_local_peers_excluding_self()
.len()
.checked_div(3 * CLOSE_GROUP_SIZE)
.checked_div(5 * CLOSE_GROUP_SIZE)
.unwrap_or(1);

info!("Farthest peer we'll check: {:?}", farthest_peer_to_check);
Expand Down Expand Up @@ -947,7 +957,7 @@ impl SwarmDriver {

sorted_distances.sort_unstable();

let median_index = sorted_distances.len() / 2;
let median_index = sorted_distances.len() / 8;

let default = KBucketDistance::default();
let median = sorted_distances.get(median_index).cloned();
Expand Down
228 changes: 123 additions & 105 deletions sn_networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,128 +493,146 @@ impl SwarmDriver {
/// SplitRecord if there are multiple content hash versions.
fn handle_get_record_finished(&mut self, query_id: QueryId, step: ProgressStep) -> Result<()> {
// return error if the entry cannot be found
if let Some((_key, senders, result_map, cfg)) = self.pending_get_record.remove(&query_id) {
if let Some((r_key, senders, result_map, cfg)) = self.pending_get_record.remove(&query_id) {
let num_of_versions = result_map.len();
let (result, log_string) = if let Some((record, from_peers)) =
result_map.values().next()
{
let data_key_address = NetworkAddress::from_record_key(&record.key);
let expected_get_range = self.get_request_range();

let we_have_searched_thoroughly = Self::have_we_have_searched_thoroughly_for_quorum(
expected_get_range,
from_peers,
&data_key_address,
&cfg.get_quorum,
);

let pretty_key = PrettyPrintRecordKey::from(&record.key);
info!("RANGE: {pretty_key:?} we_have_searched_far_enough: {we_have_searched_thoroughly:?}");
let data_key_address = NetworkAddress::from_record_key(&r_key);
let expected_get_range = self.get_request_range();
let all_seen_peers: HashSet<_> = result_map
.values()
.flat_map(|(_, peers)| peers)
.cloned()
.collect();
let we_have_searched_thoroughly = Self::have_we_have_searched_thoroughly_for_quorum(
expected_get_range,
&all_seen_peers,
&data_key_address,
&cfg.get_quorum,
);

// we have a split record, return it
if num_of_versions > 1 {
warn!("RANGE: Multiple versions found over range");
for sender in senders {
sender
.send(Err(GetRecordError::SplitRecord {
result_map: result_map.clone(),
}))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}

let result = if num_of_versions > 1 {
warn!("RANGE: more than one version found!");
Err(GetRecordError::SplitRecord {
result_map: result_map.clone(),
})
} else if we_have_searched_thoroughly {
warn!("RANGE: Get record finished: {pretty_key:?} Enough of the network has responded or it's not sensitive data... and we only have one copy...");
for (record, _peers) in result_map.values() {
self.reput_data_to_range(record, &data_key_address, &all_seen_peers)?;
}

Ok(record.clone())
} else {
// We have not searched enough of the network range.
let result = Err(GetRecordError::NotEnoughCopiesInRange {
record: record.clone(),
expected: get_quorum_value(&cfg.get_quorum),
got: from_peers.len(),
range: expected_get_range.ilog2().unwrap_or(0),
});
return Ok(());
}

// This should be a backstop... Quorum::All is the only one that enforces
// a full search of the network range.
if matches!(cfg.get_quorum, Quorum::All) {
warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need to extend the range and PUT the data. {result:?}");
// we have no results, bail
if num_of_versions == 0 {
warn!("RANGE: No versions found!");
for sender in senders {
sender
.send(Err(GetRecordError::RecordNotFound))
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}
return Ok(());
}

warn!("Reputting data to network {pretty_key:?}...");
// if we have searched thoroughly, we can return the record
if num_of_versions == 1 {
let result = if let Some((record, peers)) = result_map.values().next() {
warn!("RANGE: one version found!");

if we_have_searched_thoroughly {
Ok(record.clone())
} else {
self.reput_data_to_range(record, &data_key_address, &all_seen_peers)?;
Err(GetRecordError::NotEnoughCopiesInRange {
record: record.clone(),
expected: get_quorum_value(&cfg.get_quorum),
got: peers.len(),
range: expected_get_range.ilog2().unwrap_or(0),
})
}
} else {
debug!("Getting record task {query_id:?} completed with step count {:?}, but no copy found.", step.count);
Err(GetRecordError::RecordNotFound)
};
for sender in senders {
sender
.send(result.clone())
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}

// let's ensure we have an updated network view
self.trigger_network_discovery();
#[cfg(feature = "open-metrics")]
if self.metrics_recorder.is_some() {
self.check_for_change_in_our_close_group();
}
}
} else {
debug!("Can't locate query task {query_id:?} during GetRecord finished. We might have already returned the result to the sender.");
}
Ok(())
}

warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need PUT the data back into nodes in that range.");
/// Repost data to the network if we didn't get enough responses.
fn reput_data_to_range(
&mut self,
record: &Record,
data_key_address: &NetworkAddress,
// all peers who responded with any version of the record
from_peers: &HashSet<PeerId>,
) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
// This should be a backstop... Quorum::All is the only one that enforces
// a full search of the network range.
info!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has the record, or same state, we need to extend the range and PUT the data.");

let record_type = get_type_from_record(record)?;
info!("Reputting data to network {pretty_key:?}...");

let replicate_targets: HashSet<_> = self
.get_filtered_peers_exceeding_range_or_closest_nodes(&data_key_address)
.iter()
.cloned()
.collect();
warn!("RANGE: {pretty_key:?} Query Finished: Not enough of the network has responded, we need PUT the data back into nodes in that range.");

if from_peers == &replicate_targets {
warn!("RANGE: {pretty_key:?} We asked everyone we know of in that range already!");
}
let record_type = get_type_from_record(record)?;

// set holder to someone that has the data
let holder = NetworkAddress::from_peer(
from_peers
.iter()
.next()
.cloned()
.unwrap_or(self.self_peer_id),
);
let replicate_targets: HashSet<_> = self
.get_filtered_peers_exceeding_range_or_closest_nodes(data_key_address)
.iter()
.cloned()
.collect();

for peer in replicate_targets {
warn!("Reputting data to {peer:?} for {pretty_key:?} if needed...");
// Do not send to any peer that has already informed us
if from_peers.contains(&peer) {
continue;
}

debug!("RANGE: (insufficient, so ) Sending data to unresponded peer: {peer:?} for {pretty_key:?}");

// nodes will try/fail to trplicate it from us, but grab from the network thereafter
self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
req: Request::Cmd(Cmd::Replicate {
holder: holder.clone(),
keys: vec![(data_key_address.clone(), record_type.clone())],
}),
peer,
sender: None,
});
}
}
if from_peers == &replicate_targets {
warn!("RANGE: {pretty_key:?} We asked everyone we know of in that range already!");
}

result
};
// set holder to someone that has the data
let holder = NetworkAddress::from_peer(
from_peers
.iter()
.next()
.cloned()
.unwrap_or(self.self_peer_id),
);

(
result,
format!("Getting record {:?} completed with only {:?} copies received, and {num_of_versions} versions.",
PrettyPrintRecordKey::from(&record.key), usize::from(step.count) - 1)
)
} else {
(
Err(GetRecordError::RecordNotFound),
format!("Getting record task {query_id:?} completed with step count {:?}, but no copy found.", step.count),
)
};

if cfg.expected_holders.is_empty() {
debug!("{log_string}");
} else {
debug!(
"{log_string}, and {:?} expected holders not responded",
cfg.expected_holders
);
for peer in replicate_targets {
warn!("Reputting data to {peer:?} for {pretty_key:?} if needed...");
// Do not send to any peer that has already informed us
if from_peers.contains(&peer) {
continue;
}

for sender in senders {
sender
.send(result.clone())
.map_err(|_| NetworkError::InternalMsgChannelDropped)?;
}
} else {
debug!("Can't locate query task {query_id:?} during GetRecord finished. We might have already returned the result to the sender.");
debug!("RANGE: (insufficient, so ) Sending data to unresponded peer: {peer:?} for {pretty_key:?}");

// nodes will try/fail to trplicate it from us, but grab from the network thereafter
self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
req: Request::Cmd(Cmd::Replicate {
holder: holder.clone(),
keys: vec![(data_key_address.clone(), record_type.clone())],
}),
peer,
sender: None,
});
}

Ok(())
}

Expand Down
Loading

0 comments on commit b428ec4

Please sign in to comment.