diff --git a/Cargo.lock b/Cargo.lock index f82ba50de..5f7646b7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -714,9 +714,11 @@ checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" name = "entrystore" version = "0.3.0" dependencies = [ + "bloom", "common", "config", "protocol-common", + "protocol-http", "protocol-memcache", "protocol-ping", "seg", @@ -1530,6 +1532,21 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pelikan_bloomcache" +version = "0.3.0" +dependencies = [ + "backtrace", + "clap 2.34.0", + "common", + "config", + "entrystore", + "logger", + "protocol-http", + "rustcommon-metrics", + "server", +] + [[package]] name = "pem" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 689c52c0f..dac2ee5f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ members = [ "src/proxy/ping", "src/proxy/thrift", "src/queues", + "src/server/bloomcache", "src/server/pingserver", "src/server/segcache", "src/session", diff --git a/src/config/src/bloom.rs b/src/config/src/bloom.rs new file mode 100644 index 000000000..a9a938401 --- /dev/null +++ b/src/config/src/bloom.rs @@ -0,0 +1,42 @@ +// Copyright 2021 Twitter, Inc. +// Licensed under the Apache License, Version 2.0 +// http://www.apache.org/licenses/LICENSE-2.0 + +use crate::units::MB; + +use serde::{Deserialize, Serialize}; + +const BLOOM_DEFAULT_SIZE: usize = 16 * MB; +const BLOOM_DEFAULT_HASHES: usize = 64; + +fn size() -> usize { + BLOOM_DEFAULT_SIZE +} + +fn hashes() -> usize { + BLOOM_DEFAULT_HASHES +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Bloom { + /// The size of the bloom filter in bytes. + #[serde(default = "size")] + pub size: usize, + + /// The number of hash functions that are evaluated for each value inserted.F + #[serde(default = "hashes")] + pub hashes: usize, +} + +impl Default for Bloom { + fn default() -> Self { + Self { + size: BLOOM_DEFAULT_SIZE, + hashes: BLOOM_DEFAULT_HASHES, + } + } +} + +pub trait BloomConfig { + fn bloom(&self) -> &Bloom; +} diff --git a/src/config/src/bloomcache.rs b/src/config/src/bloomcache.rs new file mode 100644 index 000000000..8bcb61511 --- /dev/null +++ b/src/config/src/bloomcache.rs @@ -0,0 +1,190 @@ +// Copyright 2022 Twitter, Inc. +// Licensed under the Apache License, Version 2.0 +// http://www.apache.org/licenses/LICENSE-2.0 + +use crate::*; + +use serde::{Deserialize, Serialize}; + +use std::io::Read; + +// constants to define default values +const DAEMONIZE: bool = false; +const PID_FILENAME: Option = None; +const DLOG_INTERVAL: usize = 500; + +// helper functions +fn daemonize() -> bool { + DAEMONIZE +} + +fn pid_filename() -> Option { + PID_FILENAME +} + +fn dlog_interval() -> usize { + DLOG_INTERVAL +} + +// struct definitions +#[derive(Serialize, Deserialize, Debug)] +pub struct BloomcacheConfig { + // top-level + #[serde(default = "daemonize")] + daemonize: bool, + #[serde(default = "pid_filename")] + pid_filename: Option, + #[serde(default = "dlog_interval")] + dlog_interval: usize, + + // application modules + #[serde(default)] + admin: Admin, + #[serde(default)] + server: Server, + #[serde(default)] + worker: Worker, + #[serde(default)] + time: Time, + #[serde(default)] + tls: Tls, + #[serde(default)] + bloom: Bloom, + + // ccommon + #[serde(default)] + buf: Buf, + #[serde(default)] + debug: Debug, + #[serde(default)] + klog: Klog, + #[serde(default)] + sockio: Sockio, + #[serde(default)] + tcp: Tcp, +} + +impl AdminConfig for BloomcacheConfig { + fn admin(&self) -> &Admin { + &self.admin + } +} + +impl BufConfig for BloomcacheConfig { + fn buf(&self) -> &Buf { + &self.buf + } +} + +impl DebugConfig for BloomcacheConfig { + fn debug(&self) -> &Debug { + &self.debug + } +} + +impl KlogConfig for BloomcacheConfig { + fn klog(&self) -> &Klog { + &self.klog + } +} + +impl ServerConfig for BloomcacheConfig { + fn server(&self) -> &Server { + &self.server + } +} + +impl SockioConfig for BloomcacheConfig { + fn sockio(&self) -> &Sockio { + &self.sockio + } +} + +impl TcpConfig for BloomcacheConfig { + fn tcp(&self) -> &Tcp { + &self.tcp + } +} + +impl TimeConfig for BloomcacheConfig { + fn time(&self) -> &Time { + &self.time + } +} + +impl TlsConfig for BloomcacheConfig { + fn tls(&self) -> &Tls { + &self.tls + } +} + +impl WorkerConfig for BloomcacheConfig { + fn worker(&self) -> &Worker { + &self.worker + } + + fn worker_mut(&mut self) -> &mut Worker { + &mut self.worker + } +} + +impl BloomConfig for BloomcacheConfig { + fn bloom(&self) -> &Bloom { + &self.bloom + } +} + +// implementation +impl BloomcacheConfig { + pub fn load(file: &str) -> Result { + let mut file = std::fs::File::open(file)?; + let mut content = String::new(); + file.read_to_string(&mut content)?; + match toml::from_str(&content) { + Ok(t) => Ok(t), + Err(e) => { + error!("{}", e); + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Error parsing config", + )) + } + } + } + + pub fn daemonize(&self) -> bool { + self.daemonize + } + + pub fn pid_filename(&self) -> Option { + self.pid_filename.clone() + } + + pub fn dlog_interval(&self) -> usize { + self.dlog_interval + } +} + +// trait implementations +impl Default for BloomcacheConfig { + fn default() -> Self { + Self { + daemonize: daemonize(), + pid_filename: pid_filename(), + dlog_interval: dlog_interval(), + + admin: Default::default(), + server: Default::default(), + worker: Default::default(), + time: Default::default(), + bloom: Default::default(), + + buf: Default::default(), + debug: Default::default(), + klog: Default::default(), + sockio: Default::default(), + tcp: Default::default(), + tls: Default::default(), + } + } +} diff --git a/src/config/src/lib.rs b/src/config/src/lib.rs index 3e143c700..de0f9c17b 100644 --- a/src/config/src/lib.rs +++ b/src/config/src/lib.rs @@ -7,6 +7,8 @@ extern crate log; mod admin; mod array; +mod bloom; +mod bloomcache; mod buf; mod dbuf; mod debug; @@ -28,6 +30,8 @@ mod worker; pub use admin::{Admin, AdminConfig}; pub use array::ArrayConfig; +pub use bloom::{Bloom, BloomConfig}; +pub use bloomcache::BloomcacheConfig; pub use buf::{Buf, BufConfig}; pub use dbuf::DbufConfig; pub use debug::{Debug, DebugConfig}; diff --git a/src/entrystore/Cargo.toml b/src/entrystore/Cargo.toml index 2ba8c4c1e..875f2a69b 100644 --- a/src/entrystore/Cargo.toml +++ b/src/entrystore/Cargo.toml @@ -18,4 +18,6 @@ config = { path = "../config" } protocol-common = { path = "../protocol/common" } protocol-memcache = { path = "../protocol/memcache" } protocol-ping = { path = "../protocol/ping" } -seg = { path = "../storage/seg" } \ No newline at end of file +protocol-http = { path = "../protocol/http" } +seg = { path = "../storage/seg" } +bloom = { path = "../storage/bloom" } \ No newline at end of file diff --git a/src/entrystore/src/bloom/http.rs b/src/entrystore/src/bloom/http.rs new file mode 100644 index 000000000..70d3a2953 --- /dev/null +++ b/src/entrystore/src/bloom/http.rs @@ -0,0 +1,54 @@ +// Copyright 2021 Twitter, Inc. +// Licensed under the Apache License, Version 2.0 +// http://www.apache.org/licenses/LICENSE-2.0 + +use protocol_common::Execute; +use protocol_http::{ + request::{ParseData, Request, RequestData}, + Headers, Response, Storage, +}; + +use crate::Bloom; + +impl Execute for Bloom { + fn execute(&mut self, data: &ParseData) -> Response { + let request = match &data.0 { + Ok(request) => request, + Err(e) => return e.to_response(), + }; + + let Request { headers, data } = request; + match data { + RequestData::Get(key) => self.get(key, headers), + RequestData::Put(key, value) => self.put(key, value, headers), + RequestData::Delete(key) => self.delete(key, headers), + } + } +} + +impl Storage for Bloom { + fn get(&mut self, key: &[u8], _headers: &Headers) -> Response { + if self.data.contains(key) { + Response::builder(200).body(b"") + } else { + Response::builder(404).body(b"") + } + } + + fn put(&mut self, key: &[u8], _value: &[u8], _headers: &Headers) -> Response { + let exists = self.data.contains(key); + self.data.insert(key); + + if exists { + Response::builder(200).body(b"") + } else { + Response::builder(201).body(b"") + } + } + + fn delete(&mut self, _key: &[u8], _headers: &Headers) -> Response { + let mut builder = Response::builder(405); + builder.header("Content-Type", b"text/plain"); + builder.body(b"DELETE method not supported") + } +} diff --git a/src/entrystore/src/bloom/mod.rs b/src/entrystore/src/bloom/mod.rs new file mode 100644 index 000000000..65fa3e32a --- /dev/null +++ b/src/entrystore/src/bloom/mod.rs @@ -0,0 +1,34 @@ +// Copyright 2021 Twitter, Inc. +// Licensed under the Apache License, Version 2.0 +// http://www.apache.org/licenses/LICENSE-2.0 + +use bloom::BloomFilter; +use config::BloomConfig; + +use crate::EntryStore; + +mod http; + +/// A wrapper around [`bloom::BloomFilter`] which implements `EntryStore` +/// protocol traits. +pub struct Bloom { + data: BloomFilter<[u8]>, +} + +impl Bloom { + /// Create a bloom filter storage based on the config. + pub fn new(config: &T) -> Result { + // TODO: Validate the config here and return an error. + + let config = config.bloom(); + Ok(Self { + data: BloomFilter::new(config.size * 8, config.hashes), + }) + } +} + +impl EntryStore for Bloom { + fn clear(&mut self) { + self.data.clear(); + } +} diff --git a/src/entrystore/src/lib.rs b/src/entrystore/src/lib.rs index ae3756838..569830fb2 100644 --- a/src/entrystore/src/lib.rs +++ b/src/entrystore/src/lib.rs @@ -7,9 +7,11 @@ //! addition to the base `EntryStore` trait. For example [`Seg`] implements both //! [`EntryStore`] and [`protocol::memcache::MemcacheStorage`]. +mod bloom; mod noop; mod seg; +pub use self::bloom::Bloom; pub use self::noop::*; pub use self::seg::*; diff --git a/src/server/bloomcache/Cargo.toml b/src/server/bloomcache/Cargo.toml new file mode 100644 index 000000000..3a9e1c33a --- /dev/null +++ b/src/server/bloomcache/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "pelikan_bloomcache" + +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } + +[dependencies] +backtrace = { workspace = true } +clap = { workspace = true } +common = { path = "../../common" } +config = { path = "../../config" } +entrystore = { path = "../../entrystore" } +logger = { path = "../../logger" } +protocol-http = { path = "../../protocol/http" } +rustcommon-metrics = { workspace = true } +server = { path = "../../core/server" } diff --git a/src/server/bloomcache/src/lib.rs b/src/server/bloomcache/src/lib.rs new file mode 100644 index 000000000..e63ec15aa --- /dev/null +++ b/src/server/bloomcache/src/lib.rs @@ -0,0 +1,51 @@ +// Copyright 2021 Twitter, Inc. +// Licensed under the Apache License, Version 2.0 +// http://www.apache.org/licenses/LICENSE-2.0 + +use config::*; +use entrystore::Bloom; +use logger::*; +use protocol_http::{request::ParseData, RequestParser, Response}; +use server::{Process, ProcessBuilder}; + +type Parser = RequestParser; +type Storage = Bloom; + +pub struct BloomCache { + process: Process, +} + +impl BloomCache { + /// Create a new bloom cache from the provided config. + pub fn new(config: BloomcacheConfig) -> std::io::Result { + common::metrics::init(); + + let log = configure_logging(&config); + let storage = Storage::new(&config)?; + let parser = Parser::new(); + + let builder = ProcessBuilder::::new( + &config, log, parser, storage, + )? + .version(env!("CARGO_PKG_VERSION")); + + Ok(Self { + process: builder.spawn(), + }) + } + + /// Wait for all threads to complete. Blocks until the process has fully + /// terminated. Under normal conditions, this will block indefinitely. + pub fn wait(self) { + self.process.wait() + } + + /// Triggers a shutdown of the process and blocks until the process has + /// fully terminated. This is more likely to be used for running integration + /// tests or other automated testing. + pub fn shutdown(self) { + self.process.shutdown() + } +} + +common::metrics::test_no_duplicates!(); diff --git a/src/server/bloomcache/src/main.rs b/src/server/bloomcache/src/main.rs new file mode 100644 index 000000000..1320f83e8 --- /dev/null +++ b/src/server/bloomcache/src/main.rs @@ -0,0 +1,96 @@ +// Copyright 2021 Twitter, Inc. +// Licensed under the Apache License, Version 2.0 +// http://www.apache.org/licenses/LICENSE-2.0 + +#[macro_use] +extern crate logger; + +use backtrace::Backtrace; +use clap::{App, Arg}; +use config::BloomcacheConfig; +use pelikan_bloomcache::BloomCache; +use rustcommon_metrics::*; +use server::PERCENTILES; + +fn main() { + // custom panic hook to terminate whole process after unwinding + std::panic::set_hook(Box::new(|s| { + error!("{}", s); + eprintln!("{:?}", Backtrace::new()); + std::process::exit(101); + })); + + // parse command line options + let matches = App::new(env!("CARGO_BIN_NAME")) + .version(env!("CARGO_PKG_VERSION")) + .version_short("v") + .arg( + Arg::with_name("stats") + .short("s") + .long("stats") + .help("List all metrics in stats") + .takes_value(false), + ) + .arg( + Arg::with_name("CONFIG") + .help("Server configuration file") + .index(1), + ) + .get_matches(); + + if matches.is_present("stats") { + println!("{:<31} {:<15} DESCRIPTION", "NAME", "TYPE"); + + let mut metrics = Vec::new(); + + for metric in &rustcommon_metrics::metrics() { + let any = match metric.as_any() { + Some(any) => any, + None => { + continue; + } + }; + + if any.downcast_ref::().is_some() { + metrics.push(format!("{:<31} counter", metric.name())); + } else if any.downcast_ref::().is_some() { + metrics.push(format!("{:<31} gauge", metric.name())); + } else if any.downcast_ref::().is_some() { + for (label, _) in PERCENTILES { + let name = format!("{}_{}", metric.name(), label); + metrics.push(format!("{:<31} percentile", name)); + } + } else { + continue; + } + } + + metrics.sort(); + for metric in metrics { + println!("{}", metric); + } + std::process::exit(0); + } + + // load config from file + let config = if let Some(file) = matches.value_of("CONFIG") { + debug!("loading config: {}", file); + match BloomcacheConfig::load(file) { + Ok(c) => c, + Err(e) => { + eprintln!("error launching bloomcache: {}", e); + std::process::exit(1); + } + } + } else { + Default::default() + }; + + match BloomCache::new(config) { + Ok(s) => s.wait(), + Err(e) => { + eprintln!("error launching bloomcache: {}", e); + std::process::exit(1); + } + } +} diff --git a/src/storage/bloom/src/lib.rs b/src/storage/bloom/src/lib.rs index be69b75f1..acbec0c49 100644 --- a/src/storage/bloom/src/lib.rs +++ b/src/storage/bloom/src/lib.rs @@ -130,7 +130,7 @@ impl RawBloomFilter { pub struct BloomFilter { raw: RawBloomFilter, seed: u64, - _dummy: PhantomData<*const T>, + _dummy: PhantomData, } impl BloomFilter {