From 99f1b62e805b7ad9b07f2d58e6108182c45bbd5b Mon Sep 17 00:00:00 2001 From: Calvin Prewitt Date: Mon, 28 Aug 2023 09:41:42 -0500 Subject: [PATCH] added registry index for fetched records and updated proof endpoints to use the log length and registry index (#187) --- .gitignore | 1 + crates/api/src/v1/fetch.rs | 43 +++--- crates/api/src/v1/package.rs | 6 +- crates/api/src/v1/proof.rs | 101 ++++--------- crates/client/src/api.rs | 13 +- crates/client/src/lib.rs | 40 ++++-- crates/client/src/storage.rs | 9 +- crates/protocol/src/lib.rs | 4 +- crates/protocol/src/proto_envelope.rs | 58 ++++++++ crates/protocol/src/registry.rs | 8 +- crates/server/openapi.yaml | 150 +++++++++++--------- crates/server/src/api/debug/mod.rs | 14 +- crates/server/src/api/v1/fetch.rs | 10 +- crates/server/src/api/v1/package.rs | 4 +- crates/server/src/api/v1/proof.rs | 41 ++---- crates/server/src/datastore/memory.rs | 114 ++++++++++----- crates/server/src/datastore/mod.rs | 37 +++-- crates/server/src/datastore/postgres/mod.rs | 138 ++++++++++++------ crates/server/src/services/core.rs | 118 +++++++-------- tests/postgres/mod.rs | 5 +- 20 files changed, 542 insertions(+), 372 deletions(-) diff --git a/.gitignore b/.gitignore index 83abdf7b..b566778e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /content /target *.swp +*.swo diff --git a/crates/api/src/v1/fetch.rs b/crates/api/src/v1/fetch.rs index baf5dd81..7bd59695 100644 --- a/crates/api/src/v1/fetch.rs +++ b/crates/api/src/v1/fetch.rs @@ -6,16 +6,16 @@ use std::{borrow::Cow, collections::HashMap}; use thiserror::Error; use warg_crypto::hash::AnyHash; use warg_protocol::{ - registry::{LogId, RecordId}, - ProtoEnvelopeBody, + registry::{LogId, RecordId, RegistryLen}, + PublishedProtoEnvelopeBody, }; /// Represents a fetch logs request. #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct FetchLogsRequest<'a> { - /// The root checkpoint ID hash of the registry. - pub checkpoint_id: Cow<'a, AnyHash>, + /// The checkpoint log length. + pub log_length: RegistryLen, /// The limit for the number of operator and package records to fetch. #[serde(skip_serializing_if = "Option::is_none")] pub limit: Option, @@ -36,10 +36,10 @@ pub struct FetchLogsResponse { pub more: bool, /// The operator records appended since the last known operator record. #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub operator: Vec, + pub operator: Vec, /// The package records appended since last known package record ids. #[serde(default, skip_serializing_if = "HashMap::is_empty")] - pub packages: HashMap>, + pub packages: HashMap>, } /// Represents a fetch API error. @@ -47,8 +47,8 @@ pub struct FetchLogsResponse { #[derive(Debug, Error)] pub enum FetchError { /// The provided checkpoint was not found. - #[error("checkpoint `{0}` was not found")] - CheckpointNotFound(AnyHash), + #[error("checkpoint log length `{0}` was not found")] + CheckpointNotFound(RegistryLen), /// The provided log was not found. #[error("log `{0}` was not found")] LogNotFound(LogId), @@ -78,7 +78,7 @@ impl FetchError { #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] enum EntityType { - Checkpoint, + LogLength, Log, Record, } @@ -90,6 +90,12 @@ where T: Clone + ToOwned, ::Owned: Serialize + for<'b> Deserialize<'b>, { + CheckpointNotFound { + status: Status<404>, + #[serde(rename = "type")] + ty: EntityType, + id: RegistryLen, + }, NotFound { status: Status<404>, #[serde(rename = "type")] @@ -105,10 +111,10 @@ where impl Serialize for FetchError { fn serialize(&self, serializer: S) -> Result { match self { - Self::CheckpointNotFound(checkpoint) => RawError::NotFound { + Self::CheckpointNotFound(log_length) => RawError::CheckpointNotFound:: { status: Status::<404>, - ty: EntityType::Checkpoint, - id: Cow::Borrowed(checkpoint), + ty: EntityType::LogLength, + id: *log_length, } .serialize(serializer), Self::LogNotFound(log_id) => RawError::NotFound { @@ -138,15 +144,8 @@ impl<'de> Deserialize<'de> for FetchError { D: serde::Deserializer<'de>, { match RawError::::deserialize(deserializer)? { + RawError::CheckpointNotFound { id, .. } => Ok(Self::CheckpointNotFound(id)), RawError::NotFound { status: _, ty, id } => match ty { - EntityType::Checkpoint => { - Ok(Self::CheckpointNotFound(id.parse().map_err(|_| { - serde::de::Error::invalid_value( - Unexpected::Str(&id), - &"a valid checkpoint hash", - ) - })?)) - } EntityType::Log => Ok(Self::LogNotFound( id.parse::() .map_err(|_| { @@ -164,6 +163,10 @@ impl<'de> Deserialize<'de> for FetchError { })? .into(), )), + _ => Err(serde::de::Error::invalid_value( + Unexpected::Str(&id), + &"a valid log length", + )), }, RawError::Message { status, message } => Ok(Self::Message { status, diff --git a/crates/api/src/v1/package.rs b/crates/api/src/v1/package.rs index 0701c7e9..090d3ae7 100644 --- a/crates/api/src/v1/package.rs +++ b/crates/api/src/v1/package.rs @@ -6,7 +6,7 @@ use std::{borrow::Cow, collections::HashMap}; use thiserror::Error; use warg_crypto::hash::AnyHash; use warg_protocol::{ - registry::{LogId, PackageId, RecordId}, + registry::{LogId, PackageId, RecordId, RegistryIndex}, ProtoEnvelopeBody, }; @@ -105,10 +105,10 @@ pub enum PackageRecordState { Published { /// The envelope of the package record. record: ProtoEnvelopeBody, - /// The index of the record in the registry log. - registry_log_index: u32, /// The content sources of the record. content_sources: HashMap>, + /// The published index of the record in the registry log. + registry_index: RegistryIndex, }, } diff --git a/crates/api/src/v1/proof.rs b/crates/api/src/v1/proof.rs index d7f18311..f71ac381 100644 --- a/crates/api/src/v1/proof.rs +++ b/crates/api/src/v1/proof.rs @@ -1,21 +1,21 @@ //! Types relating to the proof API. use crate::Status; -use serde::{de::Unexpected, Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize, Serializer}; use serde_with::{base64::Base64, serde_as}; use std::borrow::Cow; use thiserror::Error; use warg_crypto::hash::AnyHash; -use warg_protocol::registry::{Checkpoint, LogId, LogLeaf}; +use warg_protocol::registry::{LogId, RegistryIndex, RegistryLen}; /// Represents a consistency proof request. #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ConsistencyRequest { /// The starting log length to check for consistency. - pub from: u32, + pub from: RegistryLen, /// The ending log length to check for consistency. - pub to: u32, + pub to: RegistryLen, } /// Represents a consistency proof response. @@ -31,11 +31,11 @@ pub struct ConsistencyResponse { /// Represents an inclusion proof request. #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct InclusionRequest<'a> { - /// The checkpoint to check for inclusion. - pub checkpoint: Cow<'a, Checkpoint>, - /// The log leafs to check for inclusion. - pub leafs: Cow<'a, [LogLeaf]>, +pub struct InclusionRequest { + /// The log length to check for inclusion. + pub log_length: RegistryLen, + /// The log leaf indexes in the registry log to check for inclusion. + pub leafs: Vec, } /// Represents an inclusion proof response. @@ -55,12 +55,12 @@ pub struct InclusionResponse { #[non_exhaustive] #[derive(Debug, Error)] pub enum ProofError { - /// The provided log root was not found. - #[error("log root `{0}` was not found")] - RootNotFound(AnyHash), + /// The checkpoint could not be found for the provided log length. + #[error("checkpoint not found for log length {0}")] + CheckpointNotFound(RegistryLen), /// The provided log leaf was not found. - #[error("log leaf `{}:{}` was not found", .0.log_id, .0.record_id)] - LeafNotFound(LogLeaf), + #[error("log leaf `{0}` not found")] + LeafNotFound(RegistryIndex), /// Failed to prove inclusion of a package. #[error("failed to prove inclusion of package log `{0}`")] PackageLogNotIncluded(LogId), @@ -89,7 +89,7 @@ impl ProofError { /// Returns the HTTP status code of the error. pub fn status(&self) -> u16 { match self { - Self::RootNotFound(_) | Self::LeafNotFound(_) => 404, + Self::CheckpointNotFound(_) | Self::LeafNotFound(_) => 404, Self::BundleFailure(_) | Self::PackageLogNotIncluded(_) | Self::IncorrectProof { .. } => 422, @@ -101,7 +101,7 @@ impl ProofError { #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] enum EntityType { - LogRoot, + LogLength, Leaf, } @@ -122,16 +122,12 @@ enum BundleError<'a> { #[derive(Serialize, Deserialize)] #[serde(untagged, rename_all = "camelCase")] -enum RawError<'a, T> -where - T: Clone + ToOwned, - ::Owned: Serialize + for<'b> Deserialize<'b>, -{ +enum RawError<'a> { NotFound { status: Status<404>, #[serde(rename = "type")] ty: EntityType, - id: Cow<'a, T>, + id: RegistryIndex, }, BundleError { status: Status<422>, @@ -147,26 +143,26 @@ where impl Serialize for ProofError { fn serialize(&self, serializer: S) -> Result { match self { - Self::RootNotFound(root) => RawError::NotFound { + Self::CheckpointNotFound(log_length) => RawError::NotFound { status: Status::<404>, - ty: EntityType::LogRoot, - id: Cow::Borrowed(root), + ty: EntityType::LogLength, + id: *log_length, } .serialize(serializer), - Self::LeafNotFound(leaf) => RawError::NotFound:: { + Self::LeafNotFound(leaf_index) => RawError::NotFound { status: Status::<404>, ty: EntityType::Leaf, - id: Cow::Owned(format!("{}|{}", leaf.log_id, leaf.record_id)), + id: *leaf_index, } .serialize(serializer), - Self::PackageLogNotIncluded(log_id) => RawError::BundleError::<()> { + Self::PackageLogNotIncluded(log_id) => RawError::BundleError { status: Status::<422>, error: BundleError::PackageNotIncluded { log_id: Cow::Borrowed(log_id), }, } .serialize(serializer), - Self::IncorrectProof { root, found } => RawError::BundleError::<()> { + Self::IncorrectProof { root, found } => RawError::BundleError { status: Status::<422>, error: BundleError::IncorrectProof { root: Cow::Borrowed(root), @@ -174,14 +170,14 @@ impl Serialize for ProofError { }, } .serialize(serializer), - Self::BundleFailure(message) => RawError::BundleError::<()> { + Self::BundleFailure(message) => RawError::BundleError { status: Status::<422>, error: BundleError::Failure { message: Cow::Borrowed(message), }, } .serialize(serializer), - Self::Message { status, message } => RawError::Message::<()> { + Self::Message { status, message } => RawError::Message { status: *status, message: Cow::Borrowed(message), } @@ -195,47 +191,10 @@ impl<'de> Deserialize<'de> for ProofError { where D: serde::Deserializer<'de>, { - match RawError::::deserialize(deserializer)? { + match RawError::deserialize(deserializer)? { RawError::NotFound { status: _, ty, id } => match ty { - EntityType::LogRoot => { - Ok(Self::RootNotFound(id.parse::().map_err(|_| { - serde::de::Error::invalid_value( - Unexpected::Str(&id), - &"a valid checkpoint id", - ) - })?)) - } - EntityType::Leaf => Ok(Self::LeafNotFound( - id.split_once('|') - .map(|(log_id, record_id)| { - Ok(LogLeaf { - log_id: log_id - .parse::() - .map_err(|_| { - serde::de::Error::invalid_value( - Unexpected::Str(log_id), - &"a valid log id", - ) - })? - .into(), - record_id: record_id - .parse::() - .map_err(|_| { - serde::de::Error::invalid_value( - Unexpected::Str(record_id), - &"a valid record id", - ) - })? - .into(), - }) - }) - .ok_or_else(|| { - serde::de::Error::invalid_value( - Unexpected::Str(&id), - &"a valid leaf id", - ) - })??, - )), + EntityType::LogLength => Ok(Self::CheckpointNotFound(id)), + EntityType::Leaf => Ok(Self::LeafNotFound(id)), }, RawError::BundleError { status: _, error } => match error { BundleError::PackageNotIncluded { log_id } => { diff --git a/crates/client/src/api.rs b/crates/client/src/api.rs index 6ae05355..7f379590 100644 --- a/crates/client/src/api.rs +++ b/crates/client/src/api.rs @@ -237,7 +237,12 @@ impl Client { } /// Proves the inclusion of the given package log heads in the registry. - pub async fn prove_inclusion(&self, request: InclusionRequest<'_>) -> Result<(), ClientError> { + pub async fn prove_inclusion( + &self, + request: InclusionRequest, + checkpoint: &Checkpoint, + leafs: &[LogLeaf], + ) -> Result<(), ClientError> { let url = self.url.join(paths::prove_inclusion()); tracing::debug!("proving checkpoint inclusion at `{url}`"); @@ -246,11 +251,7 @@ impl Client { ) .await?; - Self::validate_inclusion_response( - response, - request.checkpoint.as_ref(), - request.leafs.as_ref(), - ) + Self::validate_inclusion_response(response, checkpoint, leafs) } /// Proves consistency between two log roots. diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 0d07ab58..5ccff206 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -26,7 +26,7 @@ use warg_crypto::{ use warg_protocol::{ operator, package, registry::{LogId, LogLeaf, PackageId, RecordId, TimestampedCheckpoint}, - ProtoEnvelope, SerdeEnvelope, Version, VersionReq, + PublishedProtoEnvelope, SerdeEnvelope, Version, VersionReq, }; pub mod api; @@ -387,7 +387,7 @@ impl Client { let response: FetchLogsResponse = self .api .fetch_logs(FetchLogsRequest { - checkpoint_id: Cow::Borrowed(&checkpoint_id), + log_length: checkpoint.log_length, operator: operator .state .head() @@ -404,11 +404,12 @@ impl Client { })?; for record in response.operator { - let record: ProtoEnvelope = record.try_into()?; + let record: PublishedProtoEnvelope = record.try_into()?; operator .state - .validate(&record) + .validate(&record.envelope) .map_err(|inner| ClientError::OperatorValidationFailed { inner })?; + operator.head_registry_index = Some(record.registry_index); } for (log_id, records) in response.packages { @@ -417,13 +418,15 @@ impl Client { })?; for record in records { - let record: ProtoEnvelope = record.try_into()?; - package.state.validate(&record).map_err(|inner| { + let record: PublishedProtoEnvelope = + record.try_into()?; + package.state.validate(&record.envelope).map_err(|inner| { ClientError::PackageValidationFailed { id: package.id.clone(), inner, } })?; + package.head_registry_index = Some(record.registry_index); } // At this point, the package log should not be empty @@ -445,29 +448,36 @@ impl Client { } // Prove inclusion for the current log heads - let mut leafs = Vec::with_capacity(packages.len() + 1 /* for operator */); - if let Some(head) = operator.state.head() { + let mut leaf_indices = Vec::with_capacity(packages.len() + 1 /* for operator */); + let mut leafs = Vec::with_capacity(leaf_indices.len()); + if let Some(index) = operator.head_registry_index { + leaf_indices.push(index); leafs.push(LogLeaf { log_id: LogId::operator_log::(), - record_id: head.digest.clone(), + record_id: operator.state.head().as_ref().unwrap().digest.clone(), }); } for (log_id, package) in &packages { - if let Some(head) = package.state.head() { + if let Some(index) = package.head_registry_index { + leaf_indices.push(index); leafs.push(LogLeaf { log_id: log_id.clone(), - record_id: head.digest.clone(), + record_id: package.state.head().as_ref().unwrap().digest.clone(), }); } } if !leafs.is_empty() { self.api - .prove_inclusion(InclusionRequest { - checkpoint: Cow::Borrowed(checkpoint), - leafs: Cow::Borrowed(&leafs), - }) + .prove_inclusion( + InclusionRequest { + log_length: checkpoint.log_length, + leafs: leaf_indices, + }, + checkpoint, + &leafs, + ) .await?; } diff --git a/crates/client/src/storage.rs b/crates/client/src/storage.rs index ee25ef89..c1260079 100644 --- a/crates/client/src/storage.rs +++ b/crates/client/src/storage.rs @@ -13,7 +13,7 @@ use warg_crypto::{ use warg_protocol::{ operator, package::{self, PackageRecord, PACKAGE_RECORD_VERSION}, - registry::{Checkpoint, PackageId, RecordId, TimestampedCheckpoint}, + registry::{Checkpoint, PackageId, RecordId, RegistryIndex, TimestampedCheckpoint}, ProtoEnvelope, SerdeEnvelope, Version, }; @@ -109,6 +109,9 @@ pub struct OperatorInfo { /// The current operator log state #[serde(default)] pub state: operator::LogState, + /// The registry log index of the most recent record + #[serde(default)] + pub head_registry_index: Option, } /// Represents information about a registry package. @@ -123,6 +126,9 @@ pub struct PackageInfo { /// The current package log state #[serde(default)] pub state: package::LogState, + /// The registry log index of the most recent record + #[serde(default)] + pub head_registry_index: Option, } impl PackageInfo { @@ -132,6 +138,7 @@ impl PackageInfo { id: id.into(), checkpoint: None, state: package::LogState::default(), + head_registry_index: None, } } } diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index ba7ddbe9..e3e7ffba 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -8,7 +8,9 @@ mod proto_envelope; pub mod registry; mod serde_envelope; -pub use proto_envelope::{ProtoEnvelope, ProtoEnvelopeBody}; +pub use proto_envelope::{ + ProtoEnvelope, ProtoEnvelopeBody, PublishedProtoEnvelope, PublishedProtoEnvelopeBody, +}; pub use semver::{Version, VersionReq}; pub use serde_envelope::SerdeEnvelope; diff --git a/crates/protocol/src/proto_envelope.rs b/crates/protocol/src/proto_envelope.rs index 42df72d8..1856aa99 100644 --- a/crates/protocol/src/proto_envelope.rs +++ b/crates/protocol/src/proto_envelope.rs @@ -1,3 +1,4 @@ +use super::registry::RegistryIndex; use anyhow::Error; use base64::{engine::general_purpose::STANDARD, Engine}; use prost::Message; @@ -8,6 +9,15 @@ use thiserror::Error; use warg_crypto::{hash::AnyHashError, signing, Decode, Signable}; use warg_protobuf::protocol as protobuf; +/// The ProtoEnvelope with the published registry log index. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PublishedProtoEnvelope { + /// The wrapped ProtoEnvelope + pub envelope: ProtoEnvelope, + /// The published registry log index for the record + pub registry_index: RegistryIndex, +} + /// The envelope struct is used to keep around the original /// bytes that the content was serialized into in case /// the serialization is not canonical. @@ -163,3 +173,51 @@ impl fmt::Debug for ProtoEnvelopeBody { .finish() } } + +#[serde_as] +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PublishedProtoEnvelopeBody { + /// The ProtoEnvelopeBody flattened + #[serde(flatten)] + pub envelope: ProtoEnvelopeBody, + /// The index of the published record in the registry log + pub registry_index: RegistryIndex, +} + +impl TryFrom for PublishedProtoEnvelope +where + Content: Decode, +{ + type Error = Error; + + fn try_from(value: PublishedProtoEnvelopeBody) -> Result { + Ok(PublishedProtoEnvelope { + envelope: ProtoEnvelope::::try_from(value.envelope)?, + registry_index: value.registry_index, + }) + } +} + +impl From> for PublishedProtoEnvelopeBody { + fn from(value: PublishedProtoEnvelope) -> Self { + PublishedProtoEnvelopeBody { + envelope: ProtoEnvelopeBody::from(value.envelope), + registry_index: value.registry_index, + } + } +} + +impl fmt::Debug for PublishedProtoEnvelopeBody { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PublishedProtoEnvelopeBody") + .field( + "content_bytes", + &STANDARD.encode(&self.envelope.content_bytes), + ) + .field("key_id", &self.envelope.key_id) + .field("signature", &self.envelope.signature) + .field("registry_index", &self.registry_index) + .finish() + } +} diff --git a/crates/protocol/src/registry.rs b/crates/protocol/src/registry.rs index 10dc8782..0bdd48fc 100644 --- a/crates/protocol/src/registry.rs +++ b/crates/protocol/src/registry.rs @@ -9,11 +9,17 @@ use warg_crypto::prefix::VisitPrefixEncode; use warg_crypto::{prefix, ByteVisitor, Signable, VisitBytes}; use wasmparser::names::KebabStr; +/// Type alias for registry log index +pub type RegistryIndex = usize; + +/// Type alias for registry log length +pub type RegistryLen = RegistryIndex; + #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Checkpoint { pub log_root: AnyHash, - pub log_length: u32, + pub log_length: RegistryLen, pub map_root: AnyHash, } diff --git a/crates/server/openapi.yaml b/crates/server/openapi.yaml index 1b327ef8..b632bd6a 100644 --- a/crates/server/openapi.yaml +++ b/crates/server/openapi.yaml @@ -62,26 +62,9 @@ paths: content: application/json: schema: - type: object - additionalProperties: false - required: - - status - - type - - id - properties: - status: - type: integer - description: The HTTP status code for the error. - example: 404 - type: - type: string - description: The type of entity that was not found. - enum: [checkpoint, log, record] - example: checkpoint - id: - type: string - description: The identifier of the entity that was not found. - example: sha256:b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c + oneOf: + - "$ref": "#/components/schemas/FetchLogsIDNotFoundError" + - "$ref": "#/components/schemas/FetchLogsLogLengthNotFoundError" default: description: An error occurred when processing the request. content: @@ -339,8 +322,6 @@ paths: - proof description: | Proves the consistency of the registry between two specified checkpoints. - - TODO: document the consistency proof bundle format. requestBody: content: application/json: @@ -372,12 +353,11 @@ paths: type: type: string description: The type of entity that was not found. - enum: [checkpoint] - example: checkpoint + enum: [logLength] + example: logLength id: - "$ref": "#/components/schemas/AnyHash" - description: | - The identifier of the entity that was not found. + type: integer + description: The identifier of the entity that was not found. "422": description: The proof bundle could not be generated. content: @@ -404,10 +384,6 @@ paths: - proof description: | Proves that the given log leafs are present in the given registry checkpoint. - - TODO: document the log inclusion proof bundle format. - - TODO: document the map inclusion proof bundle format. requestBody: content: application/json: @@ -439,14 +415,11 @@ paths: type: type: string description: The type of entity that was not found. - enum: [checkpoint, leaf] - example: checkpoint + enum: [logLength, leaf] + example: logLength id: - "$ref": "#/components/schemas/AnyHash" - description: | - The identifier of the entity that was not found. - - For leafs, the format is `|`. + type: integer + description: The identifier of the entity that was not found. "422": description: The proof bundle could not be generated. content: @@ -496,11 +469,13 @@ components: description: A request to fetch logs from the registry. additionalProperties: false required: - - checkpointId + - logLength properties: - checkpointId: - $ref: "#/components/schemas/AnyHash" - description: The registry checkpoint ID hash to fetch from. + logLength: + type: integer + description: The registry checkpoint log length to fetch from. + example: 101 + minimum: 1 limit: type: integer description: The limit of operator and packages records to return for the fetch request. @@ -545,7 +520,7 @@ components: description: The operator log records for the given checkpoint since the last known record. maxItems: 1000 items: - $ref: "#/components/schemas/EnvelopeBody" + $ref: "#/components/schemas/PublishedEnvelopeBody" packages: type: object description: The map of package log identifier to package records. @@ -555,19 +530,22 @@ components: description: The package log records for the given checkpoint since the last known record. maxItems: 1000 items: - $ref: "#/components/schemas/EnvelopeBody" + $ref: "#/components/schemas/PublishedEnvelopeBody" example: ? "sha256:7d865e959b2466918c9863afca942d0fb89d7c9ac0c99bafc3749504ded97730" : - contentBytes: "ZXhhbXBsZQ==" keyId: "sha256:7d865e959b2466918c9863afca942d0fb89d7c9ac0c99bafc3749504ded97730" signature: "ecdsa-p256:MEUCIQCzWZBW6ux9LecP66Y+hjmLZTP/hZVz7puzlPTXcRT2wwIgQZO7nxP0nugtw18MwHZ26ROFWcJmgCtKOguK031Y1D0=" + registryIndex: 101 - contentBytes: "ZXhhbXBsZQ==" keyId: "sha256:7d865e959b2466918c9863afca942d0fb89d7c9ac0c99bafc3749504ded97730" signature: "ecdsa-p256:MEUCIQCzWZBW6ux9LecP66Y+hjmLZTP/hZVz7puzlPTXcRT2wwIgQZO7nxP0nugtw18MwHZ26ROFWcJmgCtKOguK031Y1D0=" + registryIndex: 305 ? "sha256:b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c" : - contentBytes: "ZXhhbXBsZQ==" keyId: "sha256:7d865e959b2466918c9863afca942d0fb89d7c9ac0c99bafc3749504ded97730" signature: "ecdsa-p256:MEUCIQCzWZBW6ux9LecP66Y+hjmLZTP/hZVz7puzlPTXcRT2wwIgQZO7nxP0nugtw18MwHZ26ROFWcJmgCtKOguK031Y1D0=" + registryIndex: 732 PublishPackageRecordRequest: type: object description: A request to publish a record to a package log. @@ -650,29 +628,18 @@ components: description: A request to prove the inclusion of log leafs in a checkpoint. additionalProperties: false required: - - checkpoint + - logLength - leafs properties: - checkpoint: - $ref: "#/components/schemas/Checkpoint" - description: The checkpoint to prove the inclusion for. + logLength: + type: integer + description: The checkpoint log length to prove the inclusion for. leafs: type: array maxItems: 1000 - description: The log leafs to prove the inclusion for. + description: The log leaf registry log index to prove the inclusion for. items: - type: object - description: A log leaf. - required: - - logId - - recordId - properties: - logId: - $ref: "#/components/schemas/AnyHash" - description: The log identifier. - recordId: - $ref: "#/components/schemas/AnyHash" - description: The record identifier. + type: integer ProveInclusionResponse: type: object description: A response containing the inclusion proof bundle. @@ -739,7 +706,7 @@ components: description: A record that has been published to the log. required: - state - - checkpoint + - registryIndex - record properties: state: @@ -747,8 +714,8 @@ components: description: The state of the package record. enum: [published] example: published - registryLogIndex: - type: number + registryIndex: + type: integer description: The index of the record in the registry log. record: "$ref": "#/components/schemas/EnvelopeBody" @@ -835,6 +802,19 @@ components: maxLength: 1048576 example: "ZXhhbXBsZQ==" - $ref: "#/components/schemas/Signature" + PublishedEnvelopeBody: + description: A signed envelope body with the published registry log index. + allOf: + - type: object + required: + - registryIndex + properties: + registryIndex: + type: integer + description: The index of the published record in the registry log. + example: 42 + - $ref: "#/components/schemas/EnvelopeBody" + - $ref: "#/components/schemas/Signature" Signature: type: object description: Represents a signature of content. @@ -990,3 +970,45 @@ components: type: string description: The failure error message. example: bundle must contain proofs for the same root + FetchLogsIDNotFoundError: + type: object + additionalProperties: false + required: + - status + - type + - id + properties: + status: + type: integer + description: The HTTP status code for the error. + example: 404 + type: + type: string + description: The type of entity that was not found. + enum: [log, record] + example: log + id: + type: string + description: The identifier of the entity that was not found. + example: sha256:b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c + FetchLogsLogLengthNotFoundError: + type: object + additionalProperties: false + required: + - status + - type + - id + properties: + status: + type: integer + description: The HTTP status code for the error. + example: 404 + type: + type: string + description: The type of entity that was not found. + enum: [logLength] + example: log + id: + type: integer + description: The log length that was not found. + example: 1001 diff --git a/crates/server/src/api/debug/mod.rs b/crates/server/src/api/debug/mod.rs index 035a82f9..94c62bed 100644 --- a/crates/server/src/api/debug/mod.rs +++ b/crates/server/src/api/debug/mod.rs @@ -11,7 +11,7 @@ use axum::{ }; use serde::Serialize; use warg_crypto::{ - hash::{AnyHash, Hash, Sha256}, + hash::{AnyHash, Sha256}, signing::KeyID, }; use warg_protocol::{ @@ -92,11 +92,11 @@ async fn get_package_info( .get_latest_checkpoint() .await .context("get_latest_checkpoint")?; - let checkpoint_id = Hash::::of(&checkpoint.as_ref().checkpoint).into(); + let checkpoint_log_length = checkpoint.as_ref().checkpoint.log_length; let log_id = LogId::package_log::(&package_id); let records = store - .get_package_records(&log_id, &checkpoint_id, None, u16::MAX) + .get_package_records(&log_id, checkpoint_log_length, None, u16::MAX) .await .context("get_package_records")?; @@ -105,15 +105,19 @@ async fn get_package_info( let records = records .into_iter() .map(|record| { - package_state.validate(&record).context("validate")?; - let record_id = RecordId::package_record::(&record); + package_state + .validate(&record.envelope) + .context("validate")?; + let record_id = RecordId::package_record::(&record.envelope); let timestamp = record + .envelope .as_ref() .timestamp .duration_since(SystemTime::UNIX_EPOCH) .context("duration_since")? .as_secs(); let entries = record + .envelope .as_ref() .entries .iter() diff --git a/crates/server/src/api/v1/fetch.rs b/crates/server/src/api/v1/fetch.rs index 6cc34942..32ca984f 100644 --- a/crates/server/src/api/v1/fetch.rs +++ b/crates/server/src/api/v1/fetch.rs @@ -13,7 +13,7 @@ use std::collections::HashMap; use warg_api::v1::fetch::{FetchError, FetchLogsRequest, FetchLogsResponse}; use warg_crypto::hash::Sha256; use warg_protocol::registry::{LogId, TimestampedCheckpoint}; -use warg_protocol::{ProtoEnvelopeBody, SerdeEnvelope}; +use warg_protocol::{PublishedProtoEnvelopeBody, SerdeEnvelope}; const DEFAULT_RECORDS_LIMIT: u16 = 100; const MAX_RECORDS_LIMIT: u16 = 1000; @@ -85,12 +85,12 @@ async fn fetch_logs( ))); } - let operator: Vec = config + let operator: Vec = config .core_service .store() .get_operator_records( &LogId::operator_log::(), - &body.checkpoint_id, + body.log_length, body.operator.as_deref(), limit, ) @@ -104,10 +104,10 @@ async fn fetch_logs( let mut map = HashMap::new(); let packages = body.packages.into_owned(); for (id, since) in packages { - let records: Vec = config + let records: Vec = config .core_service .store() - .get_package_records(&id, &body.checkpoint_id, since.as_ref(), limit) + .get_package_records(&id, body.log_length, since.as_ref(), limit) .await? .into_iter() .map(Into::into) diff --git a/crates/server/src/api/v1/package.rs b/crates/server/src/api/v1/package.rs index 8e0f2a5c..e1b28cc9 100644 --- a/crates/server/src/api/v1/package.rs +++ b/crates/server/src/api/v1/package.rs @@ -306,13 +306,13 @@ async fn get_record( }) .collect(); - let registry_log_index = record.registry_log_index.unwrap().try_into().unwrap(); + let registry_index = record.registry_index.unwrap(); Ok(Json(PackageRecord { id: record_id, state: PackageRecordState::Published { record: record.envelope.into(), - registry_log_index, + registry_index, content_sources, }, })) diff --git a/crates/server/src/api/v1/proof.rs b/crates/server/src/api/v1/proof.rs index b3d9017f..bc2086d2 100644 --- a/crates/server/src/api/v1/proof.rs +++ b/crates/server/src/api/v1/proof.rs @@ -6,6 +6,7 @@ use axum::{ use warg_api::v1::proof::{ ConsistencyRequest, ConsistencyResponse, InclusionRequest, InclusionResponse, ProofError, }; +use warg_protocol::registry::{RegistryIndex, RegistryLen}; #[derive(Clone)] pub struct Config { @@ -27,19 +28,12 @@ impl Config { struct ProofApiError(ProofError); -impl ProofApiError { - fn bad_request(message: impl ToString) -> Self { - Self(ProofError::Message { - status: StatusCode::BAD_REQUEST.as_u16(), - message: message.to_string(), - }) - } -} - impl From for ProofApiError { fn from(value: CoreServiceError) -> Self { Self(match value { - CoreServiceError::RootNotFound(root) => ProofError::RootNotFound(root), + CoreServiceError::CheckpointNotFound(log_length) => { + ProofError::CheckpointNotFound(log_length) + } CoreServiceError::LeafNotFound(leaf) => ProofError::LeafNotFound(leaf), CoreServiceError::BundleFailure(e) => ProofError::BundleFailure(e.to_string()), CoreServiceError::PackageNotIncluded(id) => ProofError::PackageLogNotIncluded(id), @@ -70,7 +64,7 @@ async fn prove_consistency( ) -> Result, ProofApiError> { let bundle = config .core - .log_consistency_proof(body.from as usize, body.to as usize) + .log_consistency_proof(body.from as RegistryLen, body.to as RegistryLen) .await?; Ok(Json(ConsistencyResponse { @@ -81,24 +75,17 @@ async fn prove_consistency( #[debug_handler] async fn prove_inclusion( State(config): State, - Json(body): Json>, + Json(body): Json, ) -> Result, ProofApiError> { - let checkpoint = body.checkpoint.into_owned(); - let log_length = checkpoint.log_length; - let map_root = checkpoint - .map_root - .try_into() - .map_err(ProofApiError::bad_request)?; + let log_length = body.log_length as RegistryLen; + let leafs = body + .leafs + .into_iter() + .map(|index| index as RegistryIndex) + .collect::>(); - let log_bundle = config - .core - .log_inclusion_proofs(log_length as usize, &body.leafs) - .await?; - - let map_bundle = config - .core - .map_inclusion_proofs(&map_root, &body.leafs) - .await?; + let log_bundle = config.core.log_inclusion_proofs(log_length, &leafs).await?; + let map_bundle = config.core.map_inclusion_proofs(log_length, &leafs).await?; Ok(Json(InclusionResponse { log: log_bundle.encode(), diff --git a/crates/server/src/datastore/memory.rs b/crates/server/src/datastore/memory.rs index af7998d2..22aa8adc 100644 --- a/crates/server/src/datastore/memory.rs +++ b/crates/server/src/datastore/memory.rs @@ -11,12 +11,14 @@ use warg_crypto::{hash::AnyHash, Signable}; use warg_protocol::{ operator, package::{self, PackageEntry}, - registry::{LogId, LogLeaf, PackageId, RecordId, TimestampedCheckpoint}, - ProtoEnvelope, SerdeEnvelope, + registry::{ + LogId, LogLeaf, PackageId, RecordId, RegistryIndex, RegistryLen, TimestampedCheckpoint, + }, + ProtoEnvelope, PublishedProtoEnvelope, SerdeEnvelope, }; struct Entry { - registry_index: u64, + registry_index: RegistryIndex, record_content: ProtoEnvelope, } @@ -41,7 +43,7 @@ struct Record { /// Index in the log's entries. index: usize, /// Index in the registry's log. - registry_index: u64, + registry_index: RegistryIndex, } enum PendingRecord { @@ -76,8 +78,9 @@ struct State { operators: HashMap>, packages: HashMap>, package_ids: BTreeSet, - checkpoints: IndexMap>, + checkpoints: IndexMap>, records: HashMap>, + log_leafs: HashMap, } /// Represents an in-memory data store. @@ -118,6 +121,23 @@ impl DataStore for MemoryDataStore { Ok(Box::pin(futures::stream::empty())) } + async fn get_log_leafs_with_registry_index( + &self, + entries: &[RegistryIndex], + ) -> Result, DataStoreError> { + let state = self.0.read().await; + + let mut leafs = Vec::with_capacity(entries.len()); + for entry in entries { + match state.log_leafs.get(entry) { + Some(log_leaf) => leafs.push(log_leaf.clone()), + None => return Err(DataStoreError::LogLeafNotFound(*entry)), + } + } + + Ok(leafs) + } + async fn store_operator_record( &self, log_id: &LogId, @@ -168,12 +188,15 @@ impl DataStore for MemoryDataStore { &self, log_id: &LogId, record_id: &RecordId, - registry_log_index: u64, + registry_index: RegistryIndex, ) -> Result<(), DataStoreError> { let mut state = self.0.write().await; let State { - operators, records, .. + operators, + records, + log_leafs, + .. } = &mut *state; let status = records @@ -194,13 +217,20 @@ impl DataStore for MemoryDataStore { Ok(_) => { let index = log.entries.len(); log.entries.push(Entry { - registry_index: registry_log_index, + registry_index, record_content: record, }); *status = RecordStatus::Validated(Record { index, - registry_index: registry_log_index, + registry_index, }); + log_leafs.insert( + registry_index, + LogLeaf { + log_id: log_id.clone(), + record_id: record_id.clone(), + }, + ); Ok(()) } Err(e) => { @@ -277,12 +307,15 @@ impl DataStore for MemoryDataStore { &self, log_id: &LogId, record_id: &RecordId, - registry_log_index: u64, + registry_index: RegistryIndex, ) -> Result<(), DataStoreError> { let mut state = self.0.write().await; let State { - packages, records, .. + packages, + records, + log_leafs, + .. } = &mut *state; let status = records @@ -303,13 +336,20 @@ impl DataStore for MemoryDataStore { Ok(_) => { let index = log.entries.len(); log.entries.push(Entry { - registry_index: registry_log_index, + registry_index, record_content: record, }); *status = RecordStatus::Validated(Record { index, - registry_index: registry_log_index, + registry_index, }); + log_leafs.insert( + registry_index, + LogLeaf { + log_id: log_id.clone(), + record_id: record_id.clone(), + }, + ); Ok(()) } Err(e) => { @@ -389,14 +429,14 @@ impl DataStore for MemoryDataStore { async fn store_checkpoint( &self, - checkpoint_id: &AnyHash, + _checkpoint_id: &AnyHash, ts_checkpoint: SerdeEnvelope, ) -> Result<(), DataStoreError> { let mut state = self.0.write().await; state .checkpoints - .insert(checkpoint_id.clone(), ts_checkpoint); + .insert(ts_checkpoint.as_ref().checkpoint.log_length, ts_checkpoint); Ok(()) } @@ -412,10 +452,10 @@ impl DataStore for MemoryDataStore { async fn get_operator_records( &self, log_id: &LogId, - checkpoint_id: &AnyHash, + registry_log_length: RegistryLen, since: Option<&RecordId>, limit: u16, - ) -> Result>, DataStoreError> { + ) -> Result>, DataStoreError> { let state = self.0.read().await; let log = state @@ -423,8 +463,8 @@ impl DataStore for MemoryDataStore { .get(log_id) .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - let Some(checkpoint) = state.checkpoints.get(checkpoint_id) else { - return Err(DataStoreError::CheckpointNotFound(checkpoint_id.clone())); + if !state.checkpoints.contains_key(®istry_log_length) { + return Err(DataStoreError::CheckpointNotFound(registry_log_length)); }; let start_log_idx = match since { @@ -434,14 +474,16 @@ impl DataStore for MemoryDataStore { }, None => 0, }; - let end_registry_idx = checkpoint.as_ref().checkpoint.log_length as u64; Ok(log .entries .iter() .skip(start_log_idx) - .take_while(|entry| entry.registry_index < end_registry_idx) - .map(|entry| entry.record_content.clone()) + .take_while(|entry| entry.registry_index < registry_log_length) + .map(|entry| PublishedProtoEnvelope { + envelope: entry.record_content.clone(), + registry_index: entry.registry_index, + }) .take(limit as usize) .collect()) } @@ -449,10 +491,10 @@ impl DataStore for MemoryDataStore { async fn get_package_records( &self, log_id: &LogId, - checkpoint_id: &AnyHash, + registry_log_length: RegistryLen, since: Option<&RecordId>, limit: u16, - ) -> Result>, DataStoreError> { + ) -> Result>, DataStoreError> { let state = self.0.read().await; let log = state @@ -460,8 +502,8 @@ impl DataStore for MemoryDataStore { .get(log_id) .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - let Some(checkpoint) = state.checkpoints.get(checkpoint_id) else { - return Err(DataStoreError::CheckpointNotFound(checkpoint_id.clone())); + if !state.checkpoints.contains_key(®istry_log_length) { + return Err(DataStoreError::CheckpointNotFound(registry_log_length)); }; let start_log_idx = match since { @@ -471,14 +513,16 @@ impl DataStore for MemoryDataStore { }, None => 0, }; - let end_registry_idx = checkpoint.as_ref().checkpoint.log_length as u64; Ok(log .entries .iter() .skip(start_log_idx) - .take_while(|entry| entry.registry_index < end_registry_idx) - .map(|entry| entry.record_content.clone()) + .take_while(|entry| entry.registry_index < registry_log_length) + .map(|entry| PublishedProtoEnvelope { + envelope: entry.record_content.clone(), + registry_index: entry.registry_index, + }) .take(limit as usize) .collect()) } @@ -496,7 +540,7 @@ impl DataStore for MemoryDataStore { .get(record_id) .ok_or_else(|| DataStoreError::RecordNotFound(record_id.clone()))?; - let (status, envelope, registry_log_index) = match status { + let (status, envelope, registry_index) = match status { RecordStatus::Pending(PendingRecord::Operator { record, .. }) => { (super::RecordStatus::Pending, record.clone().unwrap(), None) } @@ -515,7 +559,7 @@ impl DataStore for MemoryDataStore { .checkpoints .last() .map(|(_, c)| c.as_ref().checkpoint.log_length) - .unwrap_or_default() as u64; + .unwrap_or_default(); ( if r.registry_index < published_length { @@ -533,7 +577,7 @@ impl DataStore for MemoryDataStore { Ok(super::Record { status, envelope, - registry_log_index, + registry_index, }) } @@ -550,7 +594,7 @@ impl DataStore for MemoryDataStore { .get(record_id) .ok_or_else(|| DataStoreError::RecordNotFound(record_id.clone()))?; - let (status, envelope, registry_log_index) = match status { + let (status, envelope, registry_index) = match status { RecordStatus::Pending(PendingRecord::Package { record, .. }) => { (super::RecordStatus::Pending, record.clone().unwrap(), None) } @@ -569,7 +613,7 @@ impl DataStore for MemoryDataStore { .checkpoints .last() .map(|(_, c)| c.as_ref().checkpoint.log_length) - .unwrap_or_default() as u64; + .unwrap_or_default(); ( if r.registry_index < published_length { @@ -587,7 +631,7 @@ impl DataStore for MemoryDataStore { Ok(super::Record { status, envelope, - registry_log_index, + registry_index, }) } diff --git a/crates/server/src/datastore/mod.rs b/crates/server/src/datastore/mod.rs index 599fb094..7b069e33 100644 --- a/crates/server/src/datastore/mod.rs +++ b/crates/server/src/datastore/mod.rs @@ -4,8 +4,10 @@ use thiserror::Error; use warg_crypto::{hash::AnyHash, signing::KeyID}; use warg_protocol::{ operator, package, - registry::{LogId, LogLeaf, PackageId, RecordId, TimestampedCheckpoint}, - ProtoEnvelope, SerdeEnvelope, + registry::{ + LogId, LogLeaf, PackageId, RecordId, RegistryIndex, RegistryLen, TimestampedCheckpoint, + }, + ProtoEnvelope, PublishedProtoEnvelope, SerdeEnvelope, }; mod memory; @@ -21,8 +23,8 @@ pub enum DataStoreError { #[error("a conflicting operation was processed: update to the latest checkpoint and try the operation again")] Conflict, - #[error("checkpoint `{0}` was not found")] - CheckpointNotFound(AnyHash), + #[error("checkpoint log length `{0}` was not found")] + CheckpointNotFound(RegistryLen), #[error("log `{0}` was not found")] LogNotFound(LogId), @@ -30,6 +32,9 @@ pub enum DataStoreError { #[error("record `{0}` was not found")] RecordNotFound(RecordId), + #[error("log leaf {0} was not found")] + LogLeafNotFound(RegistryIndex), + #[error("record `{0}` cannot be validated as it is not in a pending state")] RecordNotPending(RecordId), @@ -90,7 +95,7 @@ where /// The index of the record in the registry log. /// /// This is `None` if the record is not published. - pub registry_log_index: Option, + pub registry_index: Option, } /// Implemented by data stores. @@ -113,6 +118,12 @@ pub trait DataStore: Send + Sync { &self, ) -> Result> + Send>>, DataStoreError>; + /// Looks up the log_id and record_id from the registry log index. + async fn get_log_leafs_with_registry_index( + &self, + entries: &[RegistryIndex], + ) -> Result, DataStoreError>; + /// Stores the given operator record. async fn store_operator_record( &self, @@ -140,7 +151,7 @@ pub trait DataStore: Send + Sync { &self, log_id: &LogId, record_id: &RecordId, - registry_log_index: u64, + registry_index: RegistryIndex, ) -> Result<(), DataStoreError>; /// Stores the given package record. @@ -175,7 +186,7 @@ pub trait DataStore: Send + Sync { &self, log_id: &LogId, record_id: &RecordId, - registry_log_index: u64, + registry_index: RegistryIndex, ) -> Result<(), DataStoreError>; /// Determines if the given content digest is missing for the record. @@ -215,23 +226,23 @@ pub trait DataStore: Send + Sync { &self, ) -> Result, DataStoreError>; - /// Gets the operator records for the given registry checkpoint ID hash. + /// Gets the operator records for the given registry log length. async fn get_operator_records( &self, log_id: &LogId, - checkpoint_id: &AnyHash, + registry_log_length: RegistryLen, since: Option<&RecordId>, limit: u16, - ) -> Result>, DataStoreError>; + ) -> Result>, DataStoreError>; - /// Gets the package records for the given registry checkpoint ID hash. + /// Gets the package records for the given registry log length. async fn get_package_records( &self, log_id: &LogId, - checkpoint_id: &AnyHash, + registry_log_length: RegistryLen, since: Option<&RecordId>, limit: u16, - ) -> Result>, DataStoreError>; + ) -> Result>, DataStoreError>; /// Gets an operator record. async fn get_operator_record( diff --git a/crates/server/src/datastore/postgres/mod.rs b/crates/server/src/datastore/postgres/mod.rs index 5d93e94f..126c2e90 100644 --- a/crates/server/src/datastore/postgres/mod.rs +++ b/crates/server/src/datastore/postgres/mod.rs @@ -16,13 +16,19 @@ use diesel_migrations::{ }; use futures::{Stream, StreamExt}; use secrecy::{ExposeSecret, SecretString}; -use std::{collections::HashSet, pin::Pin}; +use std::{ + collections::{HashMap, HashSet}, + pin::Pin, +}; use warg_crypto::{hash::AnyHash, Decode, Signable}; use warg_protocol::{ operator, package::{self, PackageEntry}, - registry::{Checkpoint, LogId, LogLeaf, PackageId, RecordId, TimestampedCheckpoint}, - ProtoEnvelope, Record as _, SerdeEnvelope, Validator, + registry::{ + Checkpoint, LogId, LogLeaf, PackageId, RecordId, RegistryIndex, RegistryLen, + TimestampedCheckpoint, + }, + ProtoEnvelope, PublishedProtoEnvelope, Record as _, SerdeEnvelope, Validator, }; mod models; @@ -31,27 +37,31 @@ mod schema; async fn get_records( conn: &mut AsyncPgConnection, log_id: i32, - checkpoint_id: &AnyHash, + registry_log_length: RegistryLen, since: Option<&RecordId>, limit: i64, -) -> Result>, DataStoreError> { - let checkpoint_length = schema::checkpoints::table +) -> Result>, DataStoreError> { + schema::checkpoints::table .select(schema::checkpoints::log_length) - .filter(schema::checkpoints::checkpoint_id.eq(TextRef(checkpoint_id))) + .filter(schema::checkpoints::log_length.eq(registry_log_length as i64)) .first::(conn) .await .optional()? - .ok_or_else(|| DataStoreError::CheckpointNotFound(checkpoint_id.clone()))?; + .ok_or_else(|| DataStoreError::CheckpointNotFound(registry_log_length))?; let mut query = schema::records::table .into_boxed() - .select((schema::records::record_id, schema::records::content)) + .select(( + schema::records::record_id, + schema::records::content, + schema::records::registry_log_index, + )) .order_by(schema::records::id.asc()) .limit(limit) .filter( schema::records::log_id .eq(log_id) - .and(schema::records::registry_log_index.lt(checkpoint_length)) + .and(schema::records::registry_log_index.lt(registry_log_length as i64)) .and(schema::records::status.eq(RecordStatus::Validated)), ); @@ -68,15 +78,21 @@ async fn get_records( } query - .load::<(ParsedText, Vec)>(conn) + .load::<(ParsedText, Vec, Option)>(conn) .await? .into_iter() - .map(|(record_id, c)| { - ProtoEnvelope::from_protobuf(c).map_err(|e| DataStoreError::InvalidRecordContents { - record_id: record_id.0.into(), - message: e.to_string(), - }) - }) + .map( + |(record_id, c, index)| match ProtoEnvelope::from_protobuf(c) { + Ok(envelope) => Ok(PublishedProtoEnvelope { + envelope, + registry_index: index.unwrap() as RegistryIndex, + }), + Err(e) => Err(DataStoreError::InvalidRecordContents { + record_id: record_id.0.into(), + message: e.to_string(), + }), + }, + ) .collect::>() } @@ -194,14 +210,14 @@ async fn commit_record( conn: &mut AsyncPgConnection, log_id: i32, record_id: &RecordId, - registry_log_index: u64, + registry_index: RegistryIndex, ) -> Result<(), DataStoreError> where V: Validator + 'static, ::Error: ToString + Send + Sync, DataStoreError: From<::Error>, { - let registry_log_index: i64 = registry_log_index.try_into().unwrap(); + let registry_index: i64 = registry_index.try_into().unwrap(); conn.transaction::<_, DataStoreError, _>(|conn| { async move { // Get the record content and validator @@ -246,7 +262,7 @@ where .filter(schema::records::id.eq(id)) .set(( schema::records::status.eq(RecordStatus::Validated), - schema::records::registry_log_index.eq(Some(registry_log_index)), + schema::records::registry_log_index.eq(Some(registry_index)), )) .execute(conn) .await?; @@ -331,7 +347,7 @@ where message: e.to_string(), } })?, - registry_log_index: record.registry_log_index.map(|idx| idx.try_into().unwrap()), + registry_index: record.registry_log_index.map(|idx| idx.try_into().unwrap()), }) } @@ -395,7 +411,7 @@ impl DataStore for PostgresDataStore { Ok(TimestampedCheckpoint { checkpoint: Checkpoint { log_root: checkpoint.log_root.0, - log_length: checkpoint.log_length as u32, + log_length: checkpoint.log_length as RegistryIndex, map_root: checkpoint.map_root.0, }, timestamp: checkpoint.timestamp.try_into().unwrap(), @@ -421,7 +437,7 @@ impl DataStore for PostgresDataStore { .inner_join(schema::logs::table) .select((schema::logs::log_id, schema::records::record_id)) .filter(schema::records::status.eq(RecordStatus::Validated)) - .order_by(schema::records::id) + .order(schema::records::registry_log_index.asc()) .load_stream::<(ParsedText, ParsedText)>(&mut conn) .await? .map(|r| { @@ -433,6 +449,48 @@ impl DataStore for PostgresDataStore { )) } + // Note: order of the entries is expected to match to the corresponding returned log leafs. + async fn get_log_leafs_with_registry_index( + &self, + entries: &[RegistryIndex], + ) -> Result, DataStoreError> { + let mut conn = self.pool.get().await?; + + let mut leafs_map = schema::records::table + .inner_join(schema::logs::table) + .select(( + schema::logs::log_id, + schema::records::record_id, + schema::records::registry_log_index, + )) + .filter( + schema::records::registry_log_index + .eq_any(entries.iter().map(|i| *i as i64).collect::>()), + ) + .load::<(ParsedText, ParsedText, Option)>(&mut conn) + .await? + .into_iter() + .map(|(log_id, record_id, index)| { + ( + index.unwrap() as RegistryIndex, + LogLeaf { + log_id: log_id.0.into(), + record_id: record_id.0.into(), + }, + ) + }) + .collect::>(); + + Ok(entries + .iter() + .map(|registry_index| { + leafs_map + .remove(registry_index) + .ok_or(DataStoreError::LogLeafNotFound(*registry_index)) + }) + .collect::, _>>()?) + } + async fn store_operator_record( &self, log_id: &LogId, @@ -473,7 +531,7 @@ impl DataStore for PostgresDataStore { &self, log_id: &LogId, record_id: &RecordId, - registry_log_index: u64, + registry_index: RegistryIndex, ) -> Result<(), DataStoreError> { let mut conn = self.pool.get().await?; let log_id = schema::logs::table @@ -484,13 +542,8 @@ impl DataStore for PostgresDataStore { .optional()? .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - match commit_record::( - conn.as_mut(), - log_id, - record_id, - registry_log_index, - ) - .await + match commit_record::(conn.as_mut(), log_id, record_id, registry_index) + .await { Ok(()) => Ok(()), Err(e) => { @@ -542,7 +595,7 @@ impl DataStore for PostgresDataStore { &self, log_id: &LogId, record_id: &RecordId, - registry_log_index: u64, + registry_index: RegistryIndex, ) -> Result<(), DataStoreError> { let mut conn = self.pool.get().await?; let log_id = schema::logs::table @@ -553,13 +606,8 @@ impl DataStore for PostgresDataStore { .optional()? .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - match commit_record::( - conn.as_mut(), - log_id, - record_id, - registry_log_index, - ) - .await + match commit_record::(conn.as_mut(), log_id, record_id, registry_index) + .await { Ok(()) => Ok(()), Err(e) => { @@ -735,10 +783,10 @@ impl DataStore for PostgresDataStore { async fn get_operator_records( &self, log_id: &LogId, - checkpoint_id: &AnyHash, + registry_log_length: RegistryLen, since: Option<&RecordId>, limit: u16, - ) -> Result>, DataStoreError> { + ) -> Result>, DataStoreError> { let mut conn = self.pool.get().await?; let log_id = schema::logs::table .select(schema::logs::id) @@ -748,16 +796,16 @@ impl DataStore for PostgresDataStore { .optional()? .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - get_records(&mut conn, log_id, checkpoint_id, since, limit as i64).await + get_records(&mut conn, log_id, registry_log_length, since, limit as i64).await } async fn get_package_records( &self, log_id: &LogId, - checkpoint_id: &AnyHash, + registry_log_length: RegistryLen, since: Option<&RecordId>, limit: u16, - ) -> Result>, DataStoreError> { + ) -> Result>, DataStoreError> { let mut conn = self.pool.get().await?; let log_id = schema::logs::table .select(schema::logs::id) @@ -767,7 +815,7 @@ impl DataStore for PostgresDataStore { .optional()? .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone()))?; - get_records(&mut conn, log_id, checkpoint_id, since, limit as i64).await + get_records(&mut conn, log_id, registry_log_length, since, limit as i64).await } async fn get_operator_record( diff --git a/crates/server/src/services/core.rs b/crates/server/src/services/core.rs index 2c786433..74f871c4 100644 --- a/crates/server/src/services/core.rs +++ b/crates/server/src/services/core.rs @@ -17,7 +17,10 @@ use warg_crypto::{ }; use warg_protocol::{ operator, - registry::{Checkpoint, LogId, LogLeaf, MapLeaf, RecordId, TimestampedCheckpoint}, + registry::{ + Checkpoint, LogId, LogLeaf, MapLeaf, RecordId, RegistryIndex, RegistryLen, + TimestampedCheckpoint, + }, ProtoEnvelope, SerdeEnvelope, }; use warg_transparency::{ @@ -71,12 +74,14 @@ impl CoreService { /// Constructs a log consistency proof between the given log tree roots. pub async fn log_consistency_proof( &self, - from_log_length: usize, - to_log_length: usize, + from_log_length: RegistryLen, + to_log_length: RegistryLen, ) -> Result, CoreServiceError> { let state = self.inner.state.read().await; - let proof = state.log.prove_consistency(from_log_length, to_log_length); + let proof = state + .log + .prove_consistency(from_log_length as usize, to_log_length as usize); LogProofBundle::bundle(vec![proof], vec![], &state.log) .map_err(CoreServiceError::BundleFailure) } @@ -84,19 +89,20 @@ impl CoreService { /// Constructs log inclusion proofs for the given entries at the given log tree root. pub async fn log_inclusion_proofs( &self, - log_length: usize, - entries: &[LogLeaf], + log_length: RegistryLen, + entries: &[RegistryIndex], ) -> Result, CoreServiceError> { let state = self.inner.state.read().await; let proofs = entries .iter() - .map(|entry| { - let node = state - .leaf_index - .get(entry) - .ok_or_else(|| CoreServiceError::LeafNotFound(entry.clone()))?; - Ok(state.log.prove_inclusion(*node, log_length)) + .map(|index| { + let node = if *index < state.leaf_index.len() as RegistryIndex { + state.leaf_index[*index as usize] + } else { + return Err(CoreServiceError::LeafNotFound(*index)); + }; + Ok(state.log.prove_inclusion(node, log_length as usize)) }) .collect::, CoreServiceError>>()?; @@ -106,20 +112,27 @@ impl CoreService { /// Constructs map inclusion proofs for the given entries at the given map tree root. pub async fn map_inclusion_proofs( &self, - root: &Hash, - entries: &[LogLeaf], + log_length: RegistryLen, + entries: &[RegistryIndex], ) -> Result, CoreServiceError> { let state = self.inner.state.read().await; - let map = state + let (map_root, map) = state .map_index - .get(root) - .ok_or_else(|| CoreServiceError::RootNotFound(root.into()))?; + .get(&log_length) + .ok_or_else(|| CoreServiceError::CheckpointNotFound(log_length))?; - let proofs = entries + let indexes = self + .inner + .store + .get_log_leafs_with_registry_index(entries) + .await + .map_err(CoreServiceError::DataStore)?; + + let proofs = indexes .iter() - .map(|entry| { - let LogLeaf { log_id, record_id } = entry; + .map(|log_leaf| { + let LogLeaf { log_id, record_id } = log_leaf; let proof = map .prove(log_id.clone()) @@ -129,9 +142,9 @@ impl CoreService { record_id: record_id.clone(), }; let found_root = proof.evaluate(log_id, &map_leaf); - if &found_root != root { + if &found_root != map_root { return Err(CoreServiceError::IncorrectProof { - root: root.into(), + root: map_root.into(), found: found_root.into(), }); } @@ -185,16 +198,18 @@ impl Inner { // Reconstruct internal state from previously-stored data let mut checkpoints = self.store.get_all_checkpoints().await?; - let mut checkpoints_by_len: HashMap = Default::default(); + let mut checkpoints_by_len: HashMap = Default::default(); while let Some(checkpoint) = checkpoints.next().await { let checkpoint = checkpoint?.checkpoint; - checkpoints_by_len.insert(checkpoint.log_length as usize, checkpoint); + checkpoints_by_len.insert(checkpoint.log_length, checkpoint); } let state = self.state.get_mut(); while let Some(entry) = published.next().await { state.push_entry(entry?); - if let Some(stored_checkpoint) = checkpoints_by_len.get(&state.log.length()) { + if let Some(stored_checkpoint) = + checkpoints_by_len.get(&(state.log.length() as RegistryLen)) + { // Validate stored checkpoint (and update internal state as a side-effect) let computed_checkpoint = state.checkpoint(); assert!(stored_checkpoint == &computed_checkpoint); @@ -231,8 +246,7 @@ impl Inner { .await?; // Update state with init record - let entry = LogLeaf { log_id, record_id }; - state.push_entry(entry.clone()); + state.push_entry(LogLeaf { log_id, record_id }); // "zero" checkpoint to be updated let mut checkpoint = Checkpoint { @@ -281,10 +295,10 @@ impl Inner { let LogLeaf { log_id, record_id } = entry; // Validate and commit the package entry to the store - let registry_log_index = state.log.length().try_into().unwrap(); + let registry_index = state.log.length() as RegistryIndex; let commit_res = self .store - .commit_package_record(log_id, record_id, registry_log_index) + .commit_package_record(log_id, record_id, registry_index) .await; if let Err(err) = commit_res { @@ -312,7 +326,7 @@ impl Inner { { // Recalculate the checkpoint if necessary let mut state = self.state.write().await; - if state.log.length() != (checkpoint.log_length as usize) { + if state.log.length() as RegistryLen != checkpoint.log_length { *checkpoint = state.checkpoint(); tracing::debug!("Updating to checkpoint {checkpoint:?}"); } @@ -338,45 +352,37 @@ type VerifiableMap = Map; struct State { // The verifiable log of all package log entries log: VecLog, - // Index log tree nodes by entry - leaf_index: HashMap, - // Index log size by log tree root - root_index: HashMap, usize>, + // Index log tree nodes by registry log index of the record + leaf_index: Vec, // The verifiable map of package logs' latest entries (log_id -> record_id) map: VerifiableMap, - // Index verifiable map snapshots by root (at checkpoints only) - map_index: HashMap, VerifiableMap>, + // Index verifiable map snapshots by log length (at checkpoints only) + map_index: HashMap, VerifiableMap)>, } impl State { - fn push_entry(&mut self, entry: LogLeaf) { - let node = self.log.push(&entry); - self.leaf_index.insert(entry.clone(), node); + fn push_entry(&mut self, log_leaf: LogLeaf) { + let node = self.log.push(&log_leaf); + self.leaf_index.push(node); - let log_checkpoint = self.log.checkpoint(); - self.root_index - .insert(log_checkpoint.root(), log_checkpoint.length()); - - self.map = self.map.insert( - entry.log_id.clone(), - MapLeaf { - record_id: entry.record_id.clone(), - }, - ); + let LogLeaf { log_id, record_id } = log_leaf; + self.map = self.map.insert(log_id, MapLeaf { record_id }); } fn checkpoint(&mut self) -> Checkpoint { let log_checkpoint = self.log.checkpoint(); let map_root = self.map.root(); + let log_length = log_checkpoint.length() as RegistryLen; // Update map snapshot - if log_checkpoint.length() > 0 { - self.map_index.insert(map_root.clone(), self.map.clone()); + if log_length > 0 { + self.map_index + .insert(log_length, (map_root.clone(), self.map.clone())); } Checkpoint { - log_length: log_checkpoint.length().try_into().unwrap(), + log_length, log_root: log_checkpoint.root().into(), map_root: map_root.into(), } @@ -385,10 +391,10 @@ impl State { #[derive(Debug, Error)] pub enum CoreServiceError { - #[error("root `{0}` was not found")] - RootNotFound(AnyHash), - #[error("log leaf `{}:{}` was not found", .0.log_id, .0.record_id)] - LeafNotFound(LogLeaf), + #[error("checkpoint at log length `{0}` was not found")] + CheckpointNotFound(RegistryLen), + #[error("log leaf `{0}` was not found")] + LeafNotFound(RegistryIndex), #[error("failed to bundle proofs: `{0}`")] BundleFailure(anyhow::Error), #[error("failed to prove inclusion of package `{0}`")] diff --git a/tests/postgres/mod.rs b/tests/postgres/mod.rs index 0d8d29a1..8ed5939e 100644 --- a/tests/postgres/mod.rs +++ b/tests/postgres/mod.rs @@ -4,6 +4,7 @@ use super::{support::*, *}; use anyhow::{Context, Result}; use testresult::TestResult; use warg_client::api; +use warg_protocol::registry::RegistryLen; use warg_server::datastore::{DataStore, PostgresDataStore}; fn data_store() -> Result> { @@ -65,7 +66,7 @@ async fn it_works_with_postgres() -> TestResult { let ts_checkpoint = client.latest_checkpoint().await?; assert_eq!( ts_checkpoint.as_ref().checkpoint.log_length, - packages.len() as u32 + 2, /* publishes + initial checkpoint + yank */ + packages.len() as RegistryLen + 2, /* publishes + initial checkpoint + yank */ "expected {len} packages plus the initial checkpoint and yank", len = packages.len() ); @@ -83,7 +84,7 @@ async fn it_works_with_postgres() -> TestResult { let ts_checkpoint = client.latest_checkpoint().await?; assert_eq!( ts_checkpoint.as_ref().checkpoint.log_length, - packages.len() as u32 + 2, /* publishes + initial checkpoint + yank*/ + packages.len() as RegistryLen + 2, /* publishes + initial checkpoint + yank*/ "expected {len} packages plus the initial checkpoint and yank", len = packages.len() );