Skip to content

Commit

Permalink
Endpoint to request which certificate published a blob last in a give…
Browse files Browse the repository at this point in the history
…n validator
  • Loading branch information
ndr-ds committed Jun 18, 2024
1 parent 4d03523 commit fc8fb81
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 8 deletions.
3 changes: 3 additions & 0 deletions linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ pub trait LocalValidatorNode {
) -> Result<HashedCertificateValue, NodeError>;

async fn download_certificate(&mut self, hash: CryptoHash) -> Result<Certificate, NodeError>;

/// Returns the hash of the `Certificate` that last used a blob.
async fn blob_last_used_by(&mut self, blob_id: BlobId) -> Result<CryptoHash, NodeError>;
}

/// Turn an address into a validator node.
Expand Down
24 changes: 24 additions & 0 deletions linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ where
})
.await
}

async fn blob_last_used_by(&mut self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
self.spawn_and_receive(move |validator, sender| {
validator.do_blob_last_used_by(blob_id, sender)
})
.await
}
}

impl<S> LocalValidatorClient<S>
Expand Down Expand Up @@ -388,6 +395,23 @@ where

sender.send(certificate)
}

async fn do_blob_last_used_by(
self,
blob_id: BlobId,
sender: oneshot::Sender<Result<CryptoHash, NodeError>>,
) -> Result<(), Result<CryptoHash, NodeError>> {
let validator = self.client.lock().await;
let certificate_hash = validator
.state
.storage_client()
.read_blob_state(blob_id)
.await
.map(|blob_state| blob_state.last_used_by)
.map_err(Into::into);

sender.send(certificate_hash)
}
}

#[derive(Clone)]
Expand Down
3 changes: 3 additions & 0 deletions linera-rpc/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ service ValidatorNode {

// Downloads a certificate.
rpc DownloadCertificate(CryptoHash) returns (Certificate);

// Returns the hash of the `Certificate` that last used a blob.
rpc BlobLastUsedBy(BlobId) returns (CryptoHash);
}

// Information about the Linera crate version the validator is running
Expand Down
9 changes: 9 additions & 0 deletions linera-rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,13 @@ impl ValidatorNode for Client {
Client::Simple(simple_client) => simple_client.download_certificate(hash).await?,
})
}

async fn blob_last_used_by(&mut self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
Ok(match self {
Client::Grpc(grpc_client) => grpc_client.blob_last_used_by(blob_id).await?,

#[cfg(with_simple_network)]
Client::Simple(simple_client) => simple_client.blob_last_used_by(blob_id).await?,
})
}
}
10 changes: 10 additions & 0 deletions linera-rpc/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ impl ValidatorNode for GrpcClient {
.into_inner()
.try_into()?)
}

#[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
async fn blob_last_used_by(&mut self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
Ok(self
.client
.blob_last_used_by(<BlobId as Into<api::BlobId>>::into(blob_id))
.await?
.into_inner()
.try_into()?)
}
}

#[cfg(not(web))]
Expand Down
24 changes: 24 additions & 0 deletions linera-rpc/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub enum RpcMessage {
DownloadBlob(Box<BlobId>),
DownloadCertificateValue(Box<CryptoHash>),
DownloadCertificate(Box<CryptoHash>),
BlobLastUsedBy(Box<BlobId>),
VersionInfoQuery,

// Outbound
Expand All @@ -38,6 +39,7 @@ pub enum RpcMessage {
DownloadBlobResponse(Box<Blob>),
DownloadCertificateValueResponse(Box<CertificateValue>),
DownloadCertificateResponse(Box<Certificate>),
BlobLastUsedByResponse(Box<CryptoHash>),

// Internal to a validator
CrossChainRequest(Box<CrossChainRequest>),
Expand Down Expand Up @@ -66,6 +68,8 @@ impl RpcMessage {
| DownloadCertificateValue(_)
| DownloadCertificateValueResponse(_)
| DownloadCertificate(_)
| BlobLastUsedBy(_)
| BlobLastUsedByResponse(_)
| DownloadCertificateResponse(_) => {
return None;
}
Expand All @@ -83,6 +87,7 @@ impl RpcMessage {
VersionInfoQuery
| DownloadBlob(_)
| DownloadCertificateValue(_)
| BlobLastUsedBy(_)
| DownloadCertificate(_) => true,
BlockProposal(_)
| LiteCertificate(_)
Expand All @@ -95,6 +100,7 @@ impl RpcMessage {
| VersionInfoResponse(_)
| DownloadBlobResponse(_)
| DownloadCertificateValueResponse(_)
| BlobLastUsedByResponse(_)
| DownloadCertificateResponse(_) => false,
}
}
Expand Down Expand Up @@ -160,6 +166,18 @@ impl TryFrom<RpcMessage> for Certificate {
}
}

impl TryFrom<RpcMessage> for CryptoHash {
type Error = NodeError;
fn try_from(message: RpcMessage) -> Result<Self, Self::Error> {
use RpcMessage::*;
match message {
BlobLastUsedByResponse(hash) => Ok(*hash),
Error(error) => Err(*error),
_ => Err(NodeError::UnexpectedMessage),
}
}
}

impl From<BlockProposal> for RpcMessage {
fn from(block_proposal: BlockProposal) -> Self {
RpcMessage::BlockProposal(Box::new(block_proposal))
Expand Down Expand Up @@ -231,3 +249,9 @@ impl From<Certificate> for RpcMessage {
RpcMessage::DownloadCertificateResponse(Box::new(certificate))
}
}

impl From<CryptoHash> for RpcMessage {
fn from(hash: CryptoHash) -> Self {
RpcMessage::BlobLastUsedByResponse(Box::new(hash))
}
}
5 changes: 5 additions & 0 deletions linera-rpc/src/simple/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ impl ValidatorNode for SimpleClient {
self.query(RpcMessage::DownloadCertificate(Box::new(hash)))
.await
}

async fn blob_last_used_by(&mut self, blob_id: BlobId) -> Result<CryptoHash, NodeError> {
self.query(RpcMessage::BlobLastUsedBy(Box::new(blob_id)))
.await
}
}

#[derive(Clone)]
Expand Down
2 changes: 2 additions & 0 deletions linera-rpc/src/simple/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ where
| RpcMessage::DownloadBlobResponse(_)
| RpcMessage::DownloadCertificateValue(_)
| RpcMessage::DownloadCertificateValueResponse(_)
| RpcMessage::BlobLastUsedBy(_)
| RpcMessage::BlobLastUsedByResponse(_)
| RpcMessage::DownloadCertificate(_)
| RpcMessage::DownloadCertificateResponse(_) => Err(NodeError::UnexpectedMessage),
};
Expand Down
24 changes: 16 additions & 8 deletions linera-rpc/tests/snapshots/format__format.yaml.snap
Original file line number Diff line number Diff line change
Expand Up @@ -770,36 +770,44 @@ RpcMessage:
NEWTYPE:
TYPENAME: CryptoHash
7:
VersionInfoQuery: UNIT
BlobLastUsedBy:
NEWTYPE:
TYPENAME: BlobId
8:
VersionInfoQuery: UNIT
9:
Vote:
NEWTYPE:
TYPENAME: LiteVote
9:
10:
ChainInfoResponse:
NEWTYPE:
TYPENAME: ChainInfoResponse
10:
11:
Error:
NEWTYPE:
TYPENAME: NodeError
11:
12:
VersionInfoResponse:
NEWTYPE:
TYPENAME: VersionInfo
12:
13:
DownloadBlobResponse:
NEWTYPE:
TYPENAME: Blob
13:
14:
DownloadCertificateValueResponse:
NEWTYPE:
TYPENAME: CertificateValue
14:
15:
DownloadCertificateResponse:
NEWTYPE:
TYPENAME: Certificate
15:
16:
BlobLastUsedByResponse:
NEWTYPE:
TYPENAME: CryptoHash
17:
CrossChainRequest:
NEWTYPE:
TYPENAME: CrossChainRequest
Expand Down
15 changes: 15 additions & 0 deletions linera-service/src/grpc_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,21 @@ where
.map_err(|err| Status::from_error(Box::new(err)))?;
Ok(Response::new(certificate.try_into()?))
}

#[instrument(skip_all, err(Display))]
async fn blob_last_used_by(
&self,
request: Request<BlobId>,
) -> Result<Response<CryptoHash>, Status> {
let blob_id = request.into_inner().try_into()?;
let blob_state = self
.0
.storage
.read_blob_state(blob_id)
.await
.map_err(|err| Status::from_error(Box::new(err)))?;
Ok(Response::new(blob_state.last_used_by.into()))
}
}

#[async_trait]
Expand Down
8 changes: 8 additions & 0 deletions linera-service/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,13 @@ where
DownloadCertificate(hash) => {
Ok(Some(self.storage.read_certificate(*hash).await?.into()))
}
BlobLastUsedBy(blob_id) => Ok(Some(
self.storage
.read_blob_state(*blob_id)
.await?
.last_used_by
.into(),
)),
BlockProposal(_)
| LiteCertificate(_)
| Certificate(_)
Expand All @@ -314,6 +321,7 @@ where
| ChainInfoResponse(_)
| VersionInfoResponse(_)
| DownloadBlobResponse(_)
| BlobLastUsedByResponse(_)
| DownloadCertificateValueResponse(_)
| DownloadCertificateResponse(_) => {
Err(anyhow::Error::from(NodeError::UnexpectedMessage))
Expand Down
4 changes: 4 additions & 0 deletions linera-service/src/schema_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl ValidatorNode for DummyValidatorNode {
async fn download_certificate(&mut self, _: CryptoHash) -> Result<Certificate, NodeError> {
Err(NodeError::UnexpectedMessage)
}

async fn blob_last_used_by(&mut self, _: BlobId) -> Result<CryptoHash, NodeError> {
Err(NodeError::UnexpectedMessage)
}
}

struct DummyValidatorNodeProvider;
Expand Down

0 comments on commit fc8fb81

Please sign in to comment.