diff --git a/crates/protocol/src/registry.rs b/crates/protocol/src/registry.rs index 0d69258e..ab7ca3ff 100644 --- a/crates/protocol/src/registry.rs +++ b/crates/protocol/src/registry.rs @@ -246,6 +246,12 @@ impl From for RecordId { } } +impl From for AnyHash { + fn from(id: RecordId) -> AnyHash { + id.0 + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/server/src/api/mod.rs b/crates/server/src/api/mod.rs index 5ca66e19..c6f997b9 100644 --- a/crates/server/src/api/mod.rs +++ b/crates/server/src/api/mod.rs @@ -1,3 +1,4 @@ +use crate::contentstore::ContentStore; use crate::{ policy::{content::ContentPolicy, record::RecordPolicy}, services::CoreService, @@ -12,7 +13,6 @@ use tower_http::{ }; use tracing::{Level, Span}; use url::Url; -use crate::contentstore::ContentStore; pub mod v1; diff --git a/crates/server/src/api/v1/mod.rs b/crates/server/src/api/v1/mod.rs index f746dd6f..ecd95e7a 100644 --- a/crates/server/src/api/v1/mod.rs +++ b/crates/server/src/api/v1/mod.rs @@ -1,3 +1,4 @@ +use crate::contentstore::ContentStore; use crate::{ policy::{content::ContentPolicy, record::RecordPolicy}, services::CoreService, @@ -15,7 +16,6 @@ use axum::{ use serde::{Serialize, Serializer}; use std::{path::PathBuf, sync::Arc}; use url::Url; -use crate::contentstore::ContentStore; pub mod fetch; pub mod package; diff --git a/crates/server/src/api/v1/package.rs b/crates/server/src/api/v1/package.rs index ca4617b8..847035bc 100644 --- a/crates/server/src/api/v1/package.rs +++ b/crates/server/src/api/v1/package.rs @@ -1,14 +1,15 @@ -use std::collections::HashSet; use super::{Json, Path}; use crate::{ + contentstore::ContentStore, datastore::{DataStoreError, RecordStatus}, policy::{ content::{ContentPolicy, ContentPolicyError}, record::{RecordPolicy, RecordPolicyError}, }, services::CoreService, - contentstore::ContentStore, }; +use axum::body::StreamBody; +use axum::http::header; use axum::{ debug_handler, extract::{BodyStream, State}, @@ -18,14 +19,13 @@ use axum::{ Router, }; use futures::StreamExt; +use std::collections::HashSet; use std::sync::Arc; use std::{collections::HashMap, path::PathBuf}; -use axum::body::StreamBody; -use axum::http::header; use tempfile::NamedTempFile; use tokio::io::AsyncWriteExt; -use url::Url; use tokio_util::io::ReaderStream; +use url::Url; use warg_api::v1::package::{ ContentSource, MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest, UploadEndpoint, @@ -70,21 +70,33 @@ impl Config { Router::new() .route("/:log_id/record", post(publish_record)) .route("/:log_id/record/:record_id", get(get_record)) - .route("/:log_id/record/:record_id/content/:digest", post(upload_content)) - .route("/:log_id/record/:record_id/content/:digest", get(fetch_content)) + .route( + "/:log_id/record/:record_id/content/:digest", + post(upload_content), + ) + .route( + "/:log_id/record/:record_id/content/:digest", + get(fetch_content), + ) .with_state(self) } - fn content_url(&self, - log_id: &LogId, - record_id: &RecordId, - digest: &AnyHash) -> String { + fn content_url(&self, log_id: &LogId, record_id: &RecordId, digest: &AnyHash) -> String { + let log_hash: AnyHash = log_id.clone().into(); + let record_hash: AnyHash = record_id.clone().into(); format!( - "{url}/{log_id}/record/{record_id}/content/{digest}", + "{url}{log_id}/record/{record_id}/content/{digest}", + log_id = Self::hash_fmt(&log_hash), + record_id = Self::hash_fmt(&record_hash), + digest = Self::hash_fmt(digest), url = self.content_base_url, ) } + fn hash_fmt(digest: &AnyHash) -> String { + digest.to_string().replace(':', "-") + } + fn build_missing_content<'a>( &self, log_id: &LogId, @@ -387,13 +399,17 @@ async fn upload_content( // Only persist the file if the content was successfully processed res?; - let version = crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id).await?; + let version = + crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id) + .await?; let package_id = config.core_service.store().get_package_id(&log_id).await?; let mut tmp_file = tokio::fs::File::open(&tmp_path) .await .map_err(PackageApiError::internal_error)?; - config.content_store.store_content(&package_id, &digest, version.to_string(), &mut tmp_file) + config + .content_store + .store_content(&package_id, &digest, version.to_string(), &mut tmp_file) .await .map_err(PackageApiError::internal_error)?; @@ -412,7 +428,10 @@ async fn upload_content( Ok(( StatusCode::CREATED, - [(header::LOCATION, config.content_url(&log_id, &record_id, &digest))], + [( + header::LOCATION, + config.content_url(&log_id, &record_id, &digest), + )], )) } @@ -468,12 +487,12 @@ async fn fetch_content( tracing::info!("fetching content for record `{record_id}` from `{log_id}`"); let package_id = config.core_service.store().get_package_id(&log_id).await?; - let version = crate::datastore::get_release_version( - config.core_service.store(), - &log_id, - &record_id, - ).await?; - let file = config.content_store.fetch_content(&package_id, &digest, version.to_string()) + let version = + crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id) + .await?; + let file = config + .content_store + .fetch_content(&package_id, &digest, version.to_string()) .await .map_err(PackageApiError::not_found)?; diff --git a/crates/server/src/bin/warg-server.rs b/crates/server/src/bin/warg-server.rs index 6cb9b7e9..64594a47 100644 --- a/crates/server/src/bin/warg-server.rs +++ b/crates/server/src/bin/warg-server.rs @@ -1,8 +1,8 @@ use anyhow::{Context, Result}; use clap::{Parser, ValueEnum}; +use oci_distribution::secrets::RegistryAuth::Anonymous; use secrecy::SecretString; use std::{net::SocketAddr, path::PathBuf}; -use oci_distribution::secrets::RegistryAuth::Anonymous; use tokio::signal; use tracing_subscriber::filter::LevelFilter; use url::Url; @@ -127,7 +127,7 @@ async fn main() -> Result<()> { .with_context(|| format!("failed to decode authorized keys from {path:?}"))?; config = config.with_record_policy(authorized_key_policy); } - + let config = match args.content_store { ContentStoreKind::Local => { tracing::info!("using local content store"); @@ -136,7 +136,14 @@ async fn main() -> Result<()> { ContentStoreKind::OCIv1_1 => { use warg_server::contentstore::oci::ociv1_1::OCIv1_1ContentStore; tracing::info!("using OCIv1.1 content store"); - config.with_content_store(OCIv1_1ContentStore::new(args.oci_registry_url.unwrap(), Anonymous, &args.content_dir).await) + config.with_content_store( + OCIv1_1ContentStore::new( + args.oci_registry_url.unwrap(), + Anonymous, + &args.content_dir, + ) + .await, + ) } }; diff --git a/crates/server/src/contentstore/local.rs b/crates/server/src/contentstore/local.rs index 4b8e8b4f..7eb59ae0 100644 --- a/crates/server/src/contentstore/local.rs +++ b/crates/server/src/contentstore/local.rs @@ -1,9 +1,9 @@ +use crate::contentstore::{ContentStore, ContentStoreError}; use std::path::{Path, PathBuf}; use tokio::fs::File; use tokio::io::copy; use warg_crypto::hash::AnyHash; use warg_protocol::registry::PackageId; -use crate::contentstore::{ContentStore, ContentStoreError}; #[derive(Clone)] pub struct LocalContentStore { @@ -46,7 +46,7 @@ impl ContentStore for LocalContentStore { _package_id: &PackageId, digest: &AnyHash, _version: String, - content: &mut File + content: &mut File, ) -> Result { let file_path = self.content_path(digest); let mut stored_file = File::create(file_path.clone()) @@ -67,6 +67,8 @@ impl ContentStore for LocalContentStore { _version: String, ) -> Result { let path = self.content_path(digest); - Path::new(&path).try_exists().map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) + Path::new(&path) + .try_exists() + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) } } diff --git a/crates/server/src/contentstore/mod.rs b/crates/server/src/contentstore/mod.rs index 1e1fc04a..bf632646 100644 --- a/crates/server/src/contentstore/mod.rs +++ b/crates/server/src/contentstore/mod.rs @@ -1,6 +1,6 @@ +use thiserror::Error; use tokio::fs::File; use warg_crypto::hash::AnyHash; -use thiserror::Error; use warg_protocol::registry::PackageId; pub mod local; @@ -32,7 +32,7 @@ pub trait ContentStore: Send + Sync { package_id: &PackageId, digest: &AnyHash, version: String, - content: &mut File + content: &mut File, ) -> Result; async fn content_present( diff --git a/crates/server/src/contentstore/oci/client.rs b/crates/server/src/contentstore/oci/client.rs index d4eb550a..ff992f3d 100644 --- a/crates/server/src/contentstore/oci/client.rs +++ b/crates/server/src/contentstore/oci/client.rs @@ -2,14 +2,14 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::{Context, Result}; +use oci_distribution::config::{Architecture, Config as DistConfig, ConfigFile, Os}; use oci_distribution::{ client, client::{ClientProtocol, Config, ImageLayer}, manifest::OciImageManifest, - Reference, secrets::RegistryAuth, + Reference, }; -use oci_distribution::config::{Architecture, ConfigFile, Config as DistConfig, Os}; use serde_json; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -20,8 +20,7 @@ use tokio::task::block_in_place; use warg_crypto::hash::AnyHash; use crate::{ - contentstore::ContentStoreError, - contentstore::ContentStoreError::ContentStoreInternalError, + contentstore::ContentStoreError, contentstore::ContentStoreError::ContentStoreInternalError, }; const COMPONENT_ARTIFACT_TYPE: &str = "application/vnd.bytecodealliance.component.v1+wasm"; @@ -52,7 +51,10 @@ impl Client { digest: &AnyHash, ) -> Result { let path = self.cached_content_path(digest); - if Path::new(&path).try_exists().map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))? { + if Path::new(&path) + .try_exists() + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))? + { let file = File::open(path) .await .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; @@ -69,18 +71,13 @@ impl Client { // block_on. let result = block_in_place(|| { Handle::current().block_on(async move { - let mut oci = self - .oci_client - .write() - .await; - oci - .pull(&reference, &self.auth, vec![WASM_LAYER_MEDIA_TYPE]) + let mut oci = self.oci_client.write().await; + oci.pull(&reference, &self.auth, vec![WASM_LAYER_MEDIA_TYPE]) .await }) }); - let image = result - .map_err(|e| ContentStoreInternalError(e.to_string()))?; + let image = result.map_err(|e| ContentStoreInternalError(e.to_string()))?; let layer = image .layers @@ -90,8 +87,7 @@ impl Client { let mut file = File::create(self.cached_content_path(digest)) .await .map_err(|e| ContentStoreInternalError(e.to_string()))?; - file - .write_all(&layer.data) + file.write_all(&layer.data) .await .map_err(|e| ContentStoreInternalError(e.to_string()))?; Ok(file) @@ -121,8 +117,8 @@ impl Client { }), ..Default::default() }; - let config_data = serde_json::to_vec(&config) - .map_err(|e| ContentStoreInternalError(e.to_string()))?; + let config_data = + serde_json::to_vec(&config).map_err(|e| ContentStoreInternalError(e.to_string()))?; let oci_config = Config::oci_v1(config_data, None); let mut layers = Vec::new(); let wasm_layer = Self::wasm_layer(file) @@ -135,15 +131,11 @@ impl Client { // TODO: fix the higher-level lifetime error that occurs when not using block_in_place and // block_on. - let result= block_in_place(|| { + let result = block_in_place(|| { Handle::current().block_on(async move { tracing::log::trace!("Pushing component to {:?}", reference); - let mut oci = self - .oci_client - .write() - .await; - oci - .push(&reference, &layers, oci_config, &self.auth, Some(manifest)) + let mut oci = self.oci_client.write().await; + oci.push(&reference, &layers, oci_config, &self.auth, Some(manifest)) .await }) }); @@ -154,7 +146,10 @@ impl Client { .map_err(|e| ContentStoreInternalError(e.to_string())) } - pub async fn content_exists(&self, reference: impl AsRef) -> Result { + pub async fn content_exists( + &self, + reference: impl AsRef, + ) -> Result { let reference: Reference = reference .as_ref() .parse() @@ -163,10 +158,7 @@ impl Client { .unwrap(); let mut oci = self.oci_client.write().await; - match oci - .fetch_manifest_digest(&reference, &self.auth) - .await - { + match oci.fetch_manifest_digest(&reference, &self.auth).await { Ok(_) => Ok(true), Err(_) => Ok(false), } @@ -177,7 +169,9 @@ impl Client { tracing::log::trace!("Reading wasm component from {:?}", file); let mut contents = vec![]; - file.read_to_end(&mut contents).await.context("cannot read wasm component")?; + file.read_to_end(&mut contents) + .await + .context("cannot read wasm component")?; Ok(ImageLayer::new( contents, diff --git a/crates/server/src/contentstore/oci/mod.rs b/crates/server/src/contentstore/oci/mod.rs index bacac086..8cf00ceb 100644 --- a/crates/server/src/contentstore/oci/mod.rs +++ b/crates/server/src/contentstore/oci/mod.rs @@ -1,2 +1,2 @@ -pub mod ociv1_1; mod client; +pub mod ociv1_1; diff --git a/crates/server/src/contentstore/oci/ociv1_1.rs b/crates/server/src/contentstore/oci/ociv1_1.rs index 03aa10f6..12183bd1 100644 --- a/crates/server/src/contentstore/oci/ociv1_1.rs +++ b/crates/server/src/contentstore/oci/ociv1_1.rs @@ -1,10 +1,10 @@ -use std::path::PathBuf; +use crate::contentstore::oci::client::Client; +use crate::contentstore::{ContentStore, ContentStoreError}; use oci_distribution::secrets::RegistryAuth; +use std::path::PathBuf; use tokio::fs::File; use warg_crypto::hash::AnyHash; use warg_protocol::registry::PackageId; -use crate::contentstore::{ContentStore, ContentStoreError}; -use crate::contentstore::oci::client::Client; type Auth = RegistryAuth; @@ -17,11 +17,18 @@ pub struct OCIv1_1ContentStore { impl OCIv1_1ContentStore { pub async fn new(registry_url: impl Into, auth: Auth, temp_dir: &PathBuf) -> Self { let client = Client::new(true, auth, temp_dir).await; - Self { client, registry_url: registry_url.into() } + Self { + client, + registry_url: registry_url.into(), + } } fn reference(&self, package_id: &PackageId, version: String) -> String { - let (reg_url, namespace, name) = (self.registry_url.clone(), package_id.namespace(), package_id.name()); + let (reg_url, namespace, name) = ( + self.registry_url.clone(), + package_id.namespace(), + package_id.name(), + ); format!("{reg_url}/{namespace}/{name}:{version}") } } @@ -36,10 +43,7 @@ impl ContentStore for OCIv1_1ContentStore { version: String, ) -> Result { let reference = self.reference(package_id, version); - self - .client - .pull(reference, digest) - .await + self.client.pull(reference, digest).await } /// Store the content in the store. @@ -48,13 +52,10 @@ impl ContentStore for OCIv1_1ContentStore { package_id: &PackageId, digest: &AnyHash, version: String, - content: &mut File + content: &mut File, ) -> Result { let reference = self.reference(package_id, version); - self - .client - .push(reference, content, digest) - .await + self.client.push(reference, content, digest).await } /// Check if the content is present in the store. diff --git a/crates/server/src/datastore/memory.rs b/crates/server/src/datastore/memory.rs index cb5c316c..3dee712d 100644 --- a/crates/server/src/datastore/memory.rs +++ b/crates/server/src/datastore/memory.rs @@ -233,7 +233,10 @@ impl DataStore for MemoryDataStore { }); let mut state = self.0.write().await; - state.names.entry(log_id.clone()).or_insert_with(|| package_id.clone()); + state + .names + .entry(log_id.clone()) + .or_insert_with(|| package_id.clone()); let prev = state.records.entry(log_id.clone()).or_default().insert( record_id.clone(), RecordStatus::Pending(PendingRecord::Package { diff --git a/crates/server/src/datastore/mod.rs b/crates/server/src/datastore/mod.rs index fc7b42db..e9e9d570 100644 --- a/crates/server/src/datastore/mod.rs +++ b/crates/server/src/datastore/mod.rs @@ -4,12 +4,16 @@ use futures::Stream; use thiserror::Error; pub use memory::*; -use PackageEntry::Release; #[cfg(feature = "postgres")] pub use postgres::*; use warg_crypto::{hash::AnyHash, signing::KeyID}; -use warg_protocol::{operator, package, ProtoEnvelope, registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId}, SerdeEnvelope, Version}; use warg_protocol::package::PackageEntry; +use warg_protocol::{ + operator, package, + registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId}, + ProtoEnvelope, SerdeEnvelope, Version, +}; +use PackageEntry::Release; mod memory; #[cfg(feature = "postgres")] @@ -79,8 +83,8 @@ pub enum RecordStatus { /// Represents a record in a log. pub struct Record - where - T: Clone, +where + T: Clone, { /// The status of the record. pub status: RecordStatus, @@ -231,10 +235,7 @@ pub trait DataStore: Send + Sync { ) -> Result>, DataStoreError>; /// Gets the package ID for the given log ID. - async fn get_package_id( - &self, - log_id: &LogId, - ) -> Result; + async fn get_package_id(&self, log_id: &LogId) -> Result; /// Gets an operator record. async fn get_operator_record( @@ -271,15 +272,20 @@ pub trait DataStore: Send + Sync { } pub fn get_version_for_release(record: &package::PackageRecord) -> Option<&Version> { - record.entries.iter().find_map(|entry| { - match entry { - Release { version, content: _ } => Some(version), - _ => None, - } + record.entries.iter().find_map(|entry| match entry { + Release { + version, + content: _, + } => Some(version), + _ => None, }) } -pub async fn get_release_version(data_store: &dyn DataStore, log_id: &LogId, record_id: &RecordId) -> Result { +pub async fn get_release_version( + data_store: &dyn DataStore, + log_id: &LogId, + record_id: &RecordId, +) -> Result { let record = data_store.get_package_record(&log_id, &record_id).await?; get_version_for_release(&record.envelope.as_ref()) .cloned() diff --git a/crates/server/src/datastore/postgres/mod.rs b/crates/server/src/datastore/postgres/mod.rs index 8b42d3b4..4eec3d59 100644 --- a/crates/server/src/datastore/postgres/mod.rs +++ b/crates/server/src/datastore/postgres/mod.rs @@ -747,8 +747,11 @@ impl DataStore for PostgresDataStore { get_records(&mut conn, log_id, checkpoint_id, since, limit as i64).await } - async fn get_package_id(&self, log_id: &LogId) -> std::result::Result { - let mut conn = self.0.get().await?; + async fn get_package_id( + &self, + log_id: &LogId, + ) -> std::result::Result { + let mut conn = self.pool.get().await?; schema::logs::table .select(schema::logs::name) .filter(schema::logs::log_id.eq(TextRef(log_id))) diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 36b733bb..557a47af 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -1,6 +1,8 @@ +use crate::contentstore::local::LocalContentStore; use crate::{api::create_router, datastore::MemoryDataStore}; use anyhow::{Context, Result}; use axum::Router; +use contentstore::ContentStore; use datastore::DataStore; use futures::Future; use policy::{content::ContentPolicy, record::RecordPolicy}; @@ -15,16 +17,14 @@ use std::{ }; use tokio::task::JoinHandle; use url::Url; -use contentstore::ContentStore; use warg_crypto::signing::PrivateKey; -use crate::contentstore::local::LocalContentStore; pub mod api; pub mod args; +pub mod contentstore; pub mod datastore; pub mod policy; pub mod services; -pub mod contentstore; const DEFAULT_BIND_ADDRESS: &str = "127.0.0.1:8090"; const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5); diff --git a/tests/server.rs b/tests/server.rs index 60d029aa..2549229f 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -421,7 +421,9 @@ async fn test_custom_content_url(config: &Config) -> Result<()> { .await?; let expected_url = format!( - "https://example.com/content/{digest}", + "https://example.com/{log_id}/record/{record_id}/content/{digest}", + log_id = log_id.to_string().replace(':', "-"), + record_id = record.id.to_string().replace(':', "-"), digest = digest.to_string().replace(':', "-") );