Skip to content

Commit

Permalink
Configurable replication and tracing (#10)
Browse files Browse the repository at this point in the history
* Add flag so datadog tracing and replication is configurable.
  • Loading branch information
Stephen Cirner authored Sep 22, 2021
1 parent 2c4741b commit cdd6eae
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 340 deletions.
513 changes: 229 additions & 284 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async-recursion = "0.3"
base64 = "0.13"
bytes = "1.0"
chrono = "0.4"
env_logger = "0.8"
env_logger = "0.9"
futures = "0.3"
futures-util = "0.3"
hex = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.51 as builder
FROM rust:1.55 as builder

RUN rustup component add rustfmt

Expand Down
51 changes: 36 additions & 15 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use std::env;
use std::net::{IpAddr, SocketAddr};

#[derive(Debug)]
pub struct DatadogConfig {
pub agent_host: IpAddr,
pub agent_port: u16,
pub service_name: String,
}

#[derive(Debug)]
pub struct Config {
pub url: SocketAddr,
Expand All @@ -15,10 +22,9 @@ pub struct Config {
pub storage_type: String,
pub storage_base_path: String,
pub storage_threshold: u32,
pub replication_enabled: bool,
pub replication_batch_size: i32,
pub dd_agent_host: IpAddr,
pub dd_agent_port: u16,
pub dd_service_name: String,
pub dd_config: Option<DatadogConfig>,
pub backoff_min_wait: i64,
pub backoff_max_wait: i64,
}
Expand Down Expand Up @@ -56,16 +62,28 @@ impl Config {
.unwrap_or("10".to_owned())
.parse()
.expect("REPLICATION_BATCH_SIZE could not be parsed into a u32");
let dd_agent_host = env::var("DD_AGENT_HOST")
.unwrap_or("127.0.0.1".to_owned())
.parse()
.expect("DD_AGENT_HOST could not be parsed into an ip address");
let dd_agent_port = env::var("DD_AGENT_PORT")
.unwrap_or("8126".to_owned())
let dd_agent_enabled: bool = env::var("DD_AGENT_ENABLED")
.unwrap_or("false".to_owned())
.parse()
.expect("DD_AGENT_PORT could not be parsed into a u16");
let dd_service_name = env::var("DD_SERVICE_NAME")
.unwrap_or("object-store".to_owned());
.expect("DD_AGENT_ENABLED could not be parsed into a bool");

let dd_config = if dd_agent_enabled {
let agent_host = env::var("DD_AGENT_HOST")
.unwrap_or("127.0.0.1".to_owned())
.parse()
.expect("DD_AGENT_HOST could not be parsed into an ip address");
let agent_port = env::var("DD_AGENT_PORT")
.unwrap_or("8126".to_owned())
.parse()
.expect("DD_AGENT_PORT could not be parsed into a u16");
let service_name = env::var("DD_SERVICE_NAME")
.unwrap_or("object-store".to_owned());

Some(DatadogConfig { agent_host, agent_port, service_name })
} else {
None
};

let backoff_min_wait = env::var("BACKOFF_MIN_WAIT")
.unwrap_or("30".to_owned()) // 30 seconds
.parse()
Expand All @@ -74,6 +92,10 @@ impl Config {
.unwrap_or("1920".to_owned()) // 32 minutes
.parse()
.expect("BACKOFF_MAX_WAIT could not be parsed into a i64");
let replication_enabled: bool = env::var("REPLICATION_ENABLED")
.unwrap_or("false".to_owned())
.parse()
.expect("REPLICATION_ENABLED could not be parsed into a bool");

Self {
url,
Expand All @@ -88,11 +110,10 @@ impl Config {
storage_base_path,
storage_threshold,
replication_batch_size,
dd_agent_host,
dd_agent_port,
dd_service_name,
dd_config,
backoff_min_wait,
backoff_max_wait,
replication_enabled,
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,15 @@ UPDATE mailbox_public_key SET acked_at = $1 WHERE uuid = $2
}

#[trace_async("datastore::put_object")]
pub async fn put_object(db: &PgPool, dime: &Dime, dime_properties: &DimeProperties, properties: &LinkedHashMap<String, Vec<u8>>, replication_key_states: Vec<(String, PublicKeyState)>, raw_dime: Option<&Bytes>) -> Result<Object> {
pub async fn put_object(
db: &PgPool,
dime: &Dime,
dime_properties: &DimeProperties,
properties: &LinkedHashMap<String, Vec<u8>>,
replication_key_states: Vec<(String, PublicKeyState)>,
raw_dime: Option<&Bytes>,
replication_enabled: bool,
) -> Result<Object> {
let mut unique_hash = dime.unique_audience_base64()?;
unique_hash.sort_unstable();
unique_hash.insert(0, String::from(&dime_properties.hash));
Expand Down Expand Up @@ -468,7 +476,7 @@ ON CONFLICT DO NOTHING
maybe_put_mailbox_public_keys(&mut tx, uuid, dime).await?;

// objects that are saved via replication should not attempt to replicate again
if properties.get(SOURCE_KEY) != Some(&SOURCE_REPLICATION.as_bytes().to_owned()) {
if replication_enabled && properties.get(SOURCE_KEY) != Some(&SOURCE_REPLICATION.as_bytes().to_owned()) {
maybe_put_replication_objects(&mut tx, uuid, replication_key_states).await?;
}
},
Expand Down
46 changes: 34 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ static MIGRATOR: Migrator = sqlx::migrate!();
// TODO implement checksum in filestore

async fn health_status(mut reporter: tonic_health::server::HealthReporter, db: Arc<PgPool>) {
log::info!("Starting health status check");

loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;

Expand Down Expand Up @@ -120,24 +122,44 @@ async fn main() -> Result<()> {
tokio::spawn(health_status(health_reporter, Arc::clone(&pool)));

// start replication
tokio::spawn(replicate(replication_state));
// start unknown reaper - removes replication objects for public_keys that moved from Unkown -> Local
tokio::spawn(reap_unknown_keys(Arc::clone(&pool), Arc::clone(&cache)));
if config.replication_enabled {
tokio::spawn(replicate(replication_state));
// start unknown reaper - removes replication objects for public_keys that moved from Unknown -> Local
tokio::spawn(reap_unknown_keys(Arc::clone(&pool), Arc::clone(&cache)));
}

// start datadog reporter
let (datadog_sender, datadog_receiver) = channel::<MinitraceSpans>(10);
tokio::spawn(report_datadog_traces(datadog_receiver, config.dd_agent_host.clone(), config.dd_agent_port, config.dd_service_name.clone()));
if let Some(ref dd_config) = config.dd_config {
tokio::spawn(report_datadog_traces(
datadog_receiver,
dd_config.agent_host,
dd_config.agent_port,
dd_config.service_name.clone(),
));
}

log::info!("Starting server on {:?}", &config.url);

// TODO add server fields that make sense
Server::builder()
.layer(MinitraceGrpcMiddlewareLayer::new(datadog_sender))
.add_service(health_service)
.add_service(PublicKeyServiceServer::new(public_key_service))
.add_service(MailboxServiceServer::new(mailbox_service))
.add_service(ObjectServiceServer::new(object_service))
.serve(config.url)
.await?;
if config.dd_config.is_some() {
Server::builder()
.layer(MinitraceGrpcMiddlewareLayer::new(datadog_sender))
.add_service(health_service)
.add_service(PublicKeyServiceServer::new(public_key_service))
.add_service(MailboxServiceServer::new(mailbox_service))
.add_service(ObjectServiceServer::new(object_service))
.serve(config.url)
.await?;
} else {
Server::builder()
.add_service(health_service)
.add_service(PublicKeyServiceServer::new(public_key_service))
.add_service(MailboxServiceServer::new(mailbox_service))
.add_service(ObjectServiceServer::new(object_service))
.serve(config.url)
.await?;
}

Ok(())
}
21 changes: 14 additions & 7 deletions src/minitrace_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{net::{IpAddr, SocketAddr}, task::{Context, Poll}};
use minitrace::{FutureExt, Span};
use minitrace_datadog::Reporter;
use reqwest::header::HeaderMap;
use tokio::{sync::mpsc::{Receiver, Sender}};
use tokio::sync::mpsc::{Receiver, Sender};
use tonic::{body::BoxBody, codegen::http::HeaderValue, transport::Body};
use tower::{Layer, Service};

Expand Down Expand Up @@ -96,7 +96,14 @@ pub struct MinitraceSpans {
spans: Box<Vec<minitrace::span::Span>>
}

pub async fn report_datadog_traces(mut receiver: Receiver<MinitraceSpans>, host: IpAddr, port: u16, service_name: String) {
pub async fn report_datadog_traces(
mut receiver: Receiver<MinitraceSpans>,
host: IpAddr,
port: u16,
service_name: String,
) {
log::info!("Starting Datadog reporting");

let socket = SocketAddr::new(host, port);
let url = format!("http://{}/v0.4/traces", socket);
let mut headers = HeaderMap::new();
Expand All @@ -105,7 +112,7 @@ pub async fn report_datadog_traces(mut receiver: Receiver<MinitraceSpans>, host:
let client = reqwest::Client::builder()
.default_headers(headers)
.build();

match client {
Ok(client) => {
while let Some(spans) = receiver.recv().await {
Expand All @@ -116,13 +123,13 @@ pub async fn report_datadog_traces(mut receiver: Receiver<MinitraceSpans>, host:
spans.span_id_prefix,
&*spans.spans,
);

match bytes {
Ok(bytes) => {
let response = client.post(&url)
.body(bytes)
.send().await;

if let Err(error) = response {
log::warn!("error sending dd trace {:#?}", error);
}
Expand All @@ -137,6 +144,6 @@ pub async fn report_datadog_traces(mut receiver: Receiver<MinitraceSpans>, host:
log::warn!("Error creating client for sending datadog traces {:#?}", error);
}
}

log::info!("Datadog reporting loop is shutting down");
}
}
18 changes: 12 additions & 6 deletions src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tonic::{Request, Response, Status, Streaming};

// TODO add flag for whether object-replication can be ignored for a PUT
// TODO move test packet generation to helper functions
// TODO implement mailbox only for local and unknown keys - reaper for unknown to remote like replication?

pub struct ObjectGrpc {
pub cache: Arc<Mutex<Cache>>,
Expand Down Expand Up @@ -155,7 +156,7 @@ impl ObjectService for ObjectGrpc {
};
let mut replication_key_states = Vec::new();

{
if self.config.replication_enabled {
let audience = dime.unique_audience_without_owner_base64()
.map_err(|_| Status::invalid_argument("Invalid Dime proto - audience list"))?;
let cache = self.cache.lock().unwrap();
Expand All @@ -170,7 +171,7 @@ impl ObjectService for ObjectGrpc {
let response = if !dime.metadata.contains_key(consts::MAILBOX_KEY) &&
dime_properties.dime_length > self.config.storage_threshold.into()
{
let response = datastore::put_object(&self.db_pool, &dime, &dime_properties, &properties, replication_key_states, None)
let response = datastore::put_object(&self.db_pool, &dime, &dime_properties, &properties, replication_key_states, None, self.config.replication_enabled)
.await?;
let storage_path = StoragePath {
dir: response.directory.clone(),
Expand All @@ -181,7 +182,7 @@ impl ObjectService for ObjectGrpc {
.map_err(Into::<OsError>::into)?;
response.to_response(&self.config)?
} else {
datastore::put_object(&self.db_pool, &dime, &dime_properties, &properties, replication_key_states, Some(&raw_dime))
datastore::put_object(&self.db_pool, &dime, &dime_properties, &properties, replication_key_states, Some(&raw_dime), self.config.replication_enabled)
.await?
.to_response(&self.config)?
};
Expand Down Expand Up @@ -263,6 +264,7 @@ pub mod tests {
use std::hash::Hasher;

use crate::MIGRATOR;
use crate::config::DatadogConfig;
use crate::consts::*;
use crate::datastore::{replication_object_uuids, MailboxPublicKey, ObjectPublicKey};
use crate::dime::Signature;
Expand All @@ -280,6 +282,11 @@ pub mod tests {
use testcontainers::clients::Cli;

pub fn test_config() -> Config {
let dd_config = DatadogConfig {
agent_host: "127.0.0.1".parse().unwrap(),
agent_port: 8126,
service_name: "object-store".to_owned(),
};
Config {
url: "0.0.0.0:6789".parse().unwrap(),
uri_host: String::default(),
Expand All @@ -293,10 +300,9 @@ pub mod tests {
storage_type: "file_system".to_owned(),
storage_base_path: "/tmp".to_owned(),
storage_threshold: 5000,
replication_enabled: true,
replication_batch_size: 2,
dd_agent_host: "127.0.0.1".parse().unwrap(),
dd_agent_port: 8126,
dd_service_name: "object-store".to_owned(),
dd_config: Some(dd_config),
backoff_min_wait: 1,
backoff_max_wait: 1,
}
Expand Down
Loading

0 comments on commit cdd6eae

Please sign in to comment.