Skip to content

Commit

Permalink
fix fmt and test fails
Browse files Browse the repository at this point in the history
Signed-off-by: David Justice <[email protected]>
  • Loading branch information
devigned committed Jul 26, 2023
1 parent e125664 commit f915398
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 97 deletions.
6 changes: 6 additions & 0 deletions crates/protocol/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ impl From<AnyHash> for RecordId {
}
}

impl From<RecordId> for AnyHash {
fn from(id: RecordId) -> AnyHash {
id.0
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/server/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::contentstore::ContentStore;
use crate::{
policy::{content::ContentPolicy, record::RecordPolicy},
services::CoreService,
Expand All @@ -12,7 +13,6 @@ use tower_http::{
};
use tracing::{Level, Span};
use url::Url;
use crate::contentstore::ContentStore;

pub mod v1;

Expand Down
2 changes: 1 addition & 1 deletion crates/server/src/api/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::contentstore::ContentStore;
use crate::{
policy::{content::ContentPolicy, record::RecordPolicy},
services::CoreService,
Expand All @@ -15,7 +16,6 @@ use axum::{
use serde::{Serialize, Serializer};
use std::{path::PathBuf, sync::Arc};
use url::Url;
use crate::contentstore::ContentStore;

pub mod fetch;
pub mod package;
Expand Down
61 changes: 40 additions & 21 deletions crates/server/src/api/v1/package.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::collections::HashSet;
use super::{Json, Path};
use crate::{
contentstore::ContentStore,
datastore::{DataStoreError, RecordStatus},
policy::{
content::{ContentPolicy, ContentPolicyError},
record::{RecordPolicy, RecordPolicyError},
},
services::CoreService,
contentstore::ContentStore,
};
use axum::body::StreamBody;
use axum::http::header;
use axum::{
debug_handler,
extract::{BodyStream, State},
Expand All @@ -18,14 +19,13 @@ use axum::{
Router,
};
use futures::StreamExt;
use std::collections::HashSet;
use std::sync::Arc;
use std::{collections::HashMap, path::PathBuf};
use axum::body::StreamBody;
use axum::http::header;
use tempfile::NamedTempFile;
use tokio::io::AsyncWriteExt;
use url::Url;
use tokio_util::io::ReaderStream;
use url::Url;
use warg_api::v1::package::{
ContentSource, MissingContent, PackageError, PackageRecord, PackageRecordState,
PublishRecordRequest, UploadEndpoint,
Expand Down Expand Up @@ -70,21 +70,33 @@ impl Config {
Router::new()
.route("/:log_id/record", post(publish_record))
.route("/:log_id/record/:record_id", get(get_record))
.route("/:log_id/record/:record_id/content/:digest", post(upload_content))
.route("/:log_id/record/:record_id/content/:digest", get(fetch_content))
.route(
"/:log_id/record/:record_id/content/:digest",
post(upload_content),
)
.route(
"/:log_id/record/:record_id/content/:digest",
get(fetch_content),
)
.with_state(self)
}

fn content_url(&self,
log_id: &LogId,
record_id: &RecordId,
digest: &AnyHash) -> String {
fn content_url(&self, log_id: &LogId, record_id: &RecordId, digest: &AnyHash) -> String {
let log_hash: AnyHash = log_id.clone().into();
let record_hash: AnyHash = record_id.clone().into();
format!(
"{url}/{log_id}/record/{record_id}/content/{digest}",
"{url}{log_id}/record/{record_id}/content/{digest}",
log_id = Self::hash_fmt(&log_hash),
record_id = Self::hash_fmt(&record_hash),
digest = Self::hash_fmt(digest),
url = self.content_base_url,
)
}

fn hash_fmt(digest: &AnyHash) -> String {
digest.to_string().replace(':', "-")
}

fn build_missing_content<'a>(
&self,
log_id: &LogId,
Expand Down Expand Up @@ -387,13 +399,17 @@ async fn upload_content(
// Only persist the file if the content was successfully processed
res?;

let version = crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id).await?;
let version =
crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id)
.await?;
let package_id = config.core_service.store().get_package_id(&log_id).await?;
let mut tmp_file = tokio::fs::File::open(&tmp_path)
.await
.map_err(PackageApiError::internal_error)?;

config.content_store.store_content(&package_id, &digest, version.to_string(), &mut tmp_file)
config
.content_store
.store_content(&package_id, &digest, version.to_string(), &mut tmp_file)
.await
.map_err(PackageApiError::internal_error)?;

Expand All @@ -412,7 +428,10 @@ async fn upload_content(

Ok((
StatusCode::CREATED,
[(header::LOCATION, config.content_url(&log_id, &record_id, &digest))],
[(
header::LOCATION,
config.content_url(&log_id, &record_id, &digest),
)],
))
}

Expand Down Expand Up @@ -468,12 +487,12 @@ async fn fetch_content(
tracing::info!("fetching content for record `{record_id}` from `{log_id}`");

let package_id = config.core_service.store().get_package_id(&log_id).await?;
let version = crate::datastore::get_release_version(
config.core_service.store(),
&log_id,
&record_id,
).await?;
let file = config.content_store.fetch_content(&package_id, &digest, version.to_string())
let version =
crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id)
.await?;
let file = config
.content_store
.fetch_content(&package_id, &digest, version.to_string())
.await
.map_err(PackageApiError::not_found)?;

Expand Down
13 changes: 10 additions & 3 deletions crates/server/src/bin/warg-server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::{Context, Result};
use clap::{Parser, ValueEnum};
use oci_distribution::secrets::RegistryAuth::Anonymous;
use secrecy::SecretString;
use std::{net::SocketAddr, path::PathBuf};
use oci_distribution::secrets::RegistryAuth::Anonymous;
use tokio::signal;
use tracing_subscriber::filter::LevelFilter;
use url::Url;
Expand Down Expand Up @@ -127,7 +127,7 @@ async fn main() -> Result<()> {
.with_context(|| format!("failed to decode authorized keys from {path:?}"))?;
config = config.with_record_policy(authorized_key_policy);
}

let config = match args.content_store {
ContentStoreKind::Local => {
tracing::info!("using local content store");
Expand All @@ -136,7 +136,14 @@ async fn main() -> Result<()> {
ContentStoreKind::OCIv1_1 => {
use warg_server::contentstore::oci::ociv1_1::OCIv1_1ContentStore;
tracing::info!("using OCIv1.1 content store");
config.with_content_store(OCIv1_1ContentStore::new(args.oci_registry_url.unwrap(), Anonymous, &args.content_dir).await)
config.with_content_store(
OCIv1_1ContentStore::new(
args.oci_registry_url.unwrap(),
Anonymous,
&args.content_dir,
)
.await,
)
}
};

Expand Down
8 changes: 5 additions & 3 deletions crates/server/src/contentstore/local.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::contentstore::{ContentStore, ContentStoreError};
use std::path::{Path, PathBuf};
use tokio::fs::File;
use tokio::io::copy;
use warg_crypto::hash::AnyHash;
use warg_protocol::registry::PackageId;
use crate::contentstore::{ContentStore, ContentStoreError};

#[derive(Clone)]
pub struct LocalContentStore {
Expand Down Expand Up @@ -46,7 +46,7 @@ impl ContentStore for LocalContentStore {
_package_id: &PackageId,
digest: &AnyHash,
_version: String,
content: &mut File
content: &mut File,
) -> Result<String, ContentStoreError> {
let file_path = self.content_path(digest);
let mut stored_file = File::create(file_path.clone())
Expand All @@ -67,6 +67,8 @@ impl ContentStore for LocalContentStore {
_version: String,
) -> Result<bool, ContentStoreError> {
let path = self.content_path(digest);
Path::new(&path).try_exists().map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))
Path::new(&path)
.try_exists()
.map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))
}
}
4 changes: 2 additions & 2 deletions crates/server/src/contentstore/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use thiserror::Error;
use tokio::fs::File;
use warg_crypto::hash::AnyHash;
use thiserror::Error;
use warg_protocol::registry::PackageId;

pub mod local;
Expand Down Expand Up @@ -32,7 +32,7 @@ pub trait ContentStore: Send + Sync {
package_id: &PackageId,
digest: &AnyHash,
version: String,
content: &mut File
content: &mut File,
) -> Result<String, ContentStoreError>;

async fn content_present(
Expand Down
54 changes: 24 additions & 30 deletions crates/server/src/contentstore/oci/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{Context, Result};
use oci_distribution::config::{Architecture, Config as DistConfig, ConfigFile, Os};
use oci_distribution::{
client,
client::{ClientProtocol, Config, ImageLayer},
manifest::OciImageManifest,
Reference,
secrets::RegistryAuth,
Reference,
};
use oci_distribution::config::{Architecture, ConfigFile, Config as DistConfig, Os};
use serde_json;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
Expand All @@ -20,8 +20,7 @@ use tokio::task::block_in_place;
use warg_crypto::hash::AnyHash;

use crate::{
contentstore::ContentStoreError,
contentstore::ContentStoreError::ContentStoreInternalError,
contentstore::ContentStoreError, contentstore::ContentStoreError::ContentStoreInternalError,
};

const COMPONENT_ARTIFACT_TYPE: &str = "application/vnd.bytecodealliance.component.v1+wasm";
Expand Down Expand Up @@ -52,7 +51,10 @@ impl Client {
digest: &AnyHash,
) -> Result<File, ContentStoreError> {
let path = self.cached_content_path(digest);
if Path::new(&path).try_exists().map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))? {
if Path::new(&path)
.try_exists()
.map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?
{
let file = File::open(path)
.await
.map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?;
Expand All @@ -69,18 +71,13 @@ impl Client {
// block_on.
let result = block_in_place(|| {
Handle::current().block_on(async move {
let mut oci = self
.oci_client
.write()
.await;
oci
.pull(&reference, &self.auth, vec![WASM_LAYER_MEDIA_TYPE])
let mut oci = self.oci_client.write().await;
oci.pull(&reference, &self.auth, vec![WASM_LAYER_MEDIA_TYPE])
.await
})
});

let image = result
.map_err(|e| ContentStoreInternalError(e.to_string()))?;
let image = result.map_err(|e| ContentStoreInternalError(e.to_string()))?;

let layer = image
.layers
Expand All @@ -90,8 +87,7 @@ impl Client {
let mut file = File::create(self.cached_content_path(digest))
.await
.map_err(|e| ContentStoreInternalError(e.to_string()))?;
file
.write_all(&layer.data)
file.write_all(&layer.data)
.await
.map_err(|e| ContentStoreInternalError(e.to_string()))?;
Ok(file)
Expand Down Expand Up @@ -121,8 +117,8 @@ impl Client {
}),
..Default::default()
};
let config_data = serde_json::to_vec(&config)
.map_err(|e| ContentStoreInternalError(e.to_string()))?;
let config_data =
serde_json::to_vec(&config).map_err(|e| ContentStoreInternalError(e.to_string()))?;
let oci_config = Config::oci_v1(config_data, None);
let mut layers = Vec::new();
let wasm_layer = Self::wasm_layer(file)
Expand All @@ -135,15 +131,11 @@ impl Client {

// TODO: fix the higher-level lifetime error that occurs when not using block_in_place and
// block_on.
let result= block_in_place(|| {
let result = block_in_place(|| {
Handle::current().block_on(async move {
tracing::log::trace!("Pushing component to {:?}", reference);
let mut oci = self
.oci_client
.write()
.await;
oci
.push(&reference, &layers, oci_config, &self.auth, Some(manifest))
let mut oci = self.oci_client.write().await;
oci.push(&reference, &layers, oci_config, &self.auth, Some(manifest))
.await
})
});
Expand All @@ -154,7 +146,10 @@ impl Client {
.map_err(|e| ContentStoreInternalError(e.to_string()))
}

pub async fn content_exists(&self, reference: impl AsRef<str>) -> Result<bool, ContentStoreError> {
pub async fn content_exists(
&self,
reference: impl AsRef<str>,
) -> Result<bool, ContentStoreError> {
let reference: Reference = reference
.as_ref()
.parse()
Expand All @@ -163,10 +158,7 @@ impl Client {
.unwrap();

let mut oci = self.oci_client.write().await;
match oci
.fetch_manifest_digest(&reference, &self.auth)
.await
{
match oci.fetch_manifest_digest(&reference, &self.auth).await {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
Expand All @@ -177,7 +169,9 @@ impl Client {
tracing::log::trace!("Reading wasm component from {:?}", file);

let mut contents = vec![];
file.read_to_end(&mut contents).await.context("cannot read wasm component")?;
file.read_to_end(&mut contents)
.await
.context("cannot read wasm component")?;

Ok(ImageLayer::new(
contents,
Expand Down
2 changes: 1 addition & 1 deletion crates/server/src/contentstore/oci/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod ociv1_1;
mod client;
pub mod ociv1_1;
Loading

0 comments on commit f915398

Please sign in to comment.