Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into txpool_metrics_up…
Browse files Browse the repository at this point in the history
…date
  • Loading branch information
rafal-ch committed Oct 23, 2024
2 parents d46468c + 481d4bb commit eb3efe1
Show file tree
Hide file tree
Showing 16 changed files with 509 additions and 188 deletions.
4 changes: 1 addition & 3 deletions .cargo/audit.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
[advisories]
ignore = [
"RUSTSEC-2024-0336" # https://github.com/FuelLabs/fuel-core/issues/1843
]
ignore = []
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- The number of executable transactions in the txpool (`txpool_number_of_executable_transactions`)
- The time it took to select transactions for inclusion in a block in microseconds (`txpool_select_transactions_time_microseconds`)
- The time it took to insert a transaction in the txpool in microseconds (`transaction_insertion_time_in_thread_pool_microseconds`)
- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future.

### Fixed
- [2366](https://github.com/FuelLabs/fuel-core/pull/2366): The `importer_gas_price_for_block` metric is properly collected.

### Changed

- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.

## [Version 0.40.0]

### Added
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use fuel_core_p2p::{
p2p_service::FuelP2PEvent,
request_response::messages::{
RequestMessage,
ResponseMessage,
V2ResponseMessage,
},
service::to_message_acceptance,
};
Expand Down Expand Up @@ -178,7 +178,7 @@ impl Bootstrap {
if request_message == RequestMessage::TxPoolAllTransactionsIds {
let _ = bootstrap.send_response_msg(
request_id,
ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])),
V2ResponseMessage::TxPoolAllTransactionsIds(Ok(vec![])),
);
}
}
Expand Down
148 changes: 85 additions & 63 deletions crates/fuel-core/src/query/message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::fuel_core_graphql_api::{
database::ReadView,
IntoApiResult,
};
use crate::fuel_core_graphql_api::database::ReadView;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand Down Expand Up @@ -162,15 +159,14 @@ impl MessageProofData for ReadView {
}

/// Generate an output proof.
// TODO: Do we want to return `Option` here?
pub fn message_proof<T: MessageProofData + ?Sized>(
database: &T,
transaction_id: Bytes32,
desired_nonce: Nonce,
commit_block_height: BlockHeight,
) -> StorageResult<Option<MessageProof>> {
// Check if the receipts for this transaction actually contain this message id or exit.
let receipt = database
) -> StorageResult<MessageProof> {
// Check if the receipts for this transaction actually contain this nonce or exit.
let (sender, recipient, nonce, amount, data) = database
.receipts(&transaction_id)?
.into_iter()
.find_map(|r| match r {
Expand All @@ -185,63 +181,83 @@ pub fn message_proof<T: MessageProofData + ?Sized>(
Some((sender, recipient, nonce, amount, data))
}
_ => None,
});
})
.ok_or::<StorageError>(
anyhow::anyhow!("Desired `nonce` missing in transaction receipts").into(),
)?;

let (sender, recipient, nonce, amount, data) = match receipt {
Some(r) => r,
None => return Ok(None),
let Some(data) = data else {
return Err(anyhow::anyhow!("Output message doesn't contain any `data`").into())
};
let data =
data.ok_or(anyhow::anyhow!("Output message doesn't contain any `data`"))?;

// Get the block id from the transaction status if it's ready.
let message_block_height = match database
.transaction_status(&transaction_id)
.into_api_result::<TransactionStatus, StorageError>(
)? {
Some(TransactionStatus::Success { block_height, .. }) => block_height,
_ => return Ok(None),
let message_block_height = match database.transaction_status(&transaction_id) {
Ok(TransactionStatus::Success { block_height, .. }) => block_height,
Ok(TransactionStatus::Submitted { .. }) => {
return Err(anyhow::anyhow!(
"Unable to obtain the message block height. The transaction has not been processed yet"
)
.into())
}
Ok(TransactionStatus::SqueezedOut { reason }) => {
return Err(anyhow::anyhow!(
"Unable to obtain the message block height. The transaction was squeezed out: {reason}"
)
.into())
}
Ok(TransactionStatus::Failed { .. }) => {
return Err(anyhow::anyhow!(
"Unable to obtain the message block height. The transaction failed"
)
.into())
}
Err(err) => {
return Err(anyhow::anyhow!(
"Unable to obtain the message block height: {err}"
)
.into())
}
};

// Get the message fuel block header.
let (message_block_header, message_block_txs) = match database
.block(&message_block_height)
.into_api_result::<CompressedBlock, StorageError>()?
{
Some(t) => t.into_inner(),
None => return Ok(None),
};
let (message_block_header, message_block_txs) =
match database.block(&message_block_height) {
Ok(message_block) => message_block.into_inner(),
Err(err) => {
return Err(anyhow::anyhow!(
"Unable to get the message block from the database: {err}"
)
.into())
}
};

let message_id = compute_message_id(&sender, &recipient, &nonce, amount, &data);

let message_proof =
match message_receipts_proof(database, message_id, &message_block_txs)? {
Some(proof) => proof,
None => return Ok(None),
};
let message_proof = message_receipts_proof(database, message_id, &message_block_txs)?;

// Get the commit fuel block header.
let commit_block_header = match database
.block(&commit_block_height)
.into_api_result::<CompressedBlock, StorageError>()?
{
Some(t) => t.into_inner().0,
None => return Ok(None),
let (commit_block_header, _) = match database.block(&commit_block_height) {
Ok(commit_block_header) => commit_block_header.into_inner(),
Err(err) => {
return Err(anyhow::anyhow!(
"Unable to get commit block header from database: {err}"
)
.into())
}
};

let block_height = *commit_block_header.height();
if block_height == 0u32.into() {
// Cannot look beyond the genesis block
return Ok(None)
}
let verifiable_commit_block_height =
block_height.pred().expect("We checked the height above");
let Some(verifiable_commit_block_height) = commit_block_header.height().pred() else {
return Err(anyhow::anyhow!(
"Impossible to generate proof beyond the genesis block"
)
.into())
};
let block_proof = database.block_history_proof(
message_block_header.height(),
&verifiable_commit_block_height,
)?;

Ok(Some(MessageProof {
Ok(MessageProof {
message_proof,
block_proof,
message_block_header,
Expand All @@ -251,19 +267,18 @@ pub fn message_proof<T: MessageProofData + ?Sized>(
nonce,
amount,
data,
}))
})
}

fn message_receipts_proof<T: MessageProofData + ?Sized>(
database: &T,
message_id: MessageId,
message_block_txs: &[Bytes32],
) -> StorageResult<Option<MerkleProof>> {
) -> StorageResult<MerkleProof> {
// Get the message receipts from the block.
let leaves: Vec<Vec<Receipt>> = message_block_txs
.iter()
.map(|id| database.receipts(id))
.filter_map(|result| result.into_api_result::<_, StorageError>().transpose())
.try_collect()?;
let leaves = leaves.into_iter()
// Flatten the receipts after filtering on output messages
Expand All @@ -287,20 +302,27 @@ fn message_receipts_proof<T: MessageProofData + ?Sized>(
tree.push(id.as_ref());
}

// If we found the leaf proof index then return the proof.
match proof_index {
Some(proof_index) => {
// Generate the actual merkle proof.
match tree.prove(proof_index) {
Some((_, proof_set)) => Ok(Some(MerkleProof {
proof_set,
proof_index,
})),
None => Ok(None),
}
}
None => Ok(None),
}
// Check if we found a leaf.
let Some(proof_index) = proof_index else {
return Err(anyhow::anyhow!(
"Unable to find the message receipt in the transaction to generate the proof"
)
.into())
};

// Get the proof set.
let Some((_, proof_set)) = tree.prove(proof_index) else {
return Err(anyhow::anyhow!(
"Unable to generate the Merkle proof for the message from its receipts"
)
.into());
};

// Return the proof.
Ok(MerkleProof {
proof_set,
proof_index,
})
}

pub fn message_status(
Expand Down
12 changes: 7 additions & 5 deletions crates/fuel-core/src/query/message/test.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::ops::Deref;

use fuel_core_types::{
blockchain::header::{
ApplicationHeader,
ConsensusHeader,
PartialBlockHeader,
blockchain::{
block::CompressedBlock,
header::{
ApplicationHeader,
ConsensusHeader,
PartialBlockHeader,
},
},
entities::relayer::message::MerkleProof,
fuel_tx::{
Expand Down Expand Up @@ -191,7 +194,6 @@ async fn can_build_message_proof() {
nonce.to_owned(),
*commit_block.header().height(),
)
.unwrap()
.unwrap();
assert_eq!(
proof.message_block_header.message_outbox_root,
Expand Down
7 changes: 4 additions & 3 deletions crates/fuel-core/src/schema/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,14 @@ impl MessageQuery {
))?,
};

Ok(crate::query::message_proof(
let proof = crate::query::message_proof(
query.as_ref(),
transaction_id.into(),
nonce.into(),
height,
)?
.map(MessageProof))
)?;

Ok(Some(MessageProof(proof)))
}

#[graphql(complexity = "query_costs().storage_read + child_complexity")]
Expand Down
2 changes: 2 additions & 0 deletions crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ rayon = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_with = { workspace = true }
sha2 = "0.10"
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = "1.0.47"
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }
Expand Down
25 changes: 12 additions & 13 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ use crate::{
},
config::Config,
discovery,
gossipsub::{
config::build_gossipsub_behaviour,
topics::GossipTopic,
},
gossipsub::config::build_gossipsub_behaviour,
heartbeat,
peer_report,
request_response::messages::{
RequestMessage,
ResponseMessage,
V2ResponseMessage,
},
};
use fuel_core_types::fuel_types::BlockHeight;
Expand All @@ -24,6 +21,7 @@ use libp2p::{
MessageAcceptance,
MessageId,
PublishError,
TopicHash,
},
identify,
request_response::{
Expand Down Expand Up @@ -112,15 +110,16 @@ impl FuelBehaviour {
BlockHeight::default(),
);

let req_res_protocol =
core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full));
let req_res_protocol = codec
.get_req_res_protocols()
.map(|protocol| (protocol, ProtocolSupport::Full));

let req_res_config = request_response::Config::default()
.with_request_timeout(p2p_config.set_request_timeout)
.with_max_concurrent_streams(p2p_config.max_concurrent_streams);

let request_response = request_response::Behaviour::with_codec(
codec,
codec.clone(),
req_res_protocol,
req_res_config,
);
Expand Down Expand Up @@ -149,10 +148,10 @@ impl FuelBehaviour {

pub fn publish_message(
&mut self,
topic: GossipTopic,
topic_hash: TopicHash,
encoded_data: Vec<u8>,
) -> Result<MessageId, PublishError> {
self.gossipsub.publish(topic, encoded_data)
self.gossipsub.publish(topic_hash, encoded_data)
}

pub fn send_request_msg(
Expand All @@ -165,9 +164,9 @@ impl FuelBehaviour {

pub fn send_response_msg(
&mut self,
channel: ResponseChannel<ResponseMessage>,
message: ResponseMessage,
) -> Result<(), ResponseMessage> {
channel: ResponseChannel<V2ResponseMessage>,
message: V2ResponseMessage,
) -> Result<(), V2ResponseMessage> {
self.request_response.send_response(channel, message)
}

Expand Down
Loading

0 comments on commit eb3efe1

Please sign in to comment.