Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce pelikan_bloomcache #468

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ members = [
"src/proxy/ping",
"src/proxy/thrift",
"src/queues",
"src/server/bloomcache",
"src/server/pingserver",
"src/server/segcache",
"src/session",
Expand Down
42 changes: 42 additions & 0 deletions src/config/src/bloom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2021 Twitter, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2022

// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use crate::units::MB;

use serde::{Deserialize, Serialize};

const BLOOM_DEFAULT_SIZE: usize = 16 * MB;
const BLOOM_DEFAULT_HASHES: usize = 64;

fn size() -> usize {
BLOOM_DEFAULT_SIZE
}

fn hashes() -> usize {
BLOOM_DEFAULT_HASHES
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Bloom {
/// The size of the bloom filter in bytes.
#[serde(default = "size")]
pub size: usize,

/// The number of hash functions that are evaluated for each value inserted.F
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo?

#[serde(default = "hashes")]
pub hashes: usize,

This comment was marked as spam.

}

impl Default for Bloom {
fn default() -> Self {
Self {
size: BLOOM_DEFAULT_SIZE,
hashes: BLOOM_DEFAULT_HASHES,
}
}
}

pub trait BloomConfig {
fn bloom(&self) -> &Bloom;
}
190 changes: 190 additions & 0 deletions src/config/src/bloomcache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2022 Twitter, Inc.
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use crate::*;

use serde::{Deserialize, Serialize};

use std::io::Read;

// constants to define default values
const DAEMONIZE: bool = false;
const PID_FILENAME: Option<String> = None;
const DLOG_INTERVAL: usize = 500;

// helper functions
fn daemonize() -> bool {
DAEMONIZE
}

fn pid_filename() -> Option<String> {
PID_FILENAME
}

fn dlog_interval() -> usize {
DLOG_INTERVAL
}

// struct definitions
#[derive(Serialize, Deserialize, Debug)]
pub struct BloomcacheConfig {
// top-level
#[serde(default = "daemonize")]
daemonize: bool,
#[serde(default = "pid_filename")]
pid_filename: Option<String>,
#[serde(default = "dlog_interval")]
dlog_interval: usize,

// application modules
#[serde(default)]
admin: Admin,
#[serde(default)]
server: Server,
#[serde(default)]
worker: Worker,
#[serde(default)]
time: Time,
#[serde(default)]
tls: Tls,
#[serde(default)]
bloom: Bloom,

// ccommon
#[serde(default)]
buf: Buf,
#[serde(default)]
debug: Debug,
#[serde(default)]
klog: Klog,
#[serde(default)]
sockio: Sockio,
#[serde(default)]
tcp: Tcp,
}

impl AdminConfig for BloomcacheConfig {
fn admin(&self) -> &Admin {
&self.admin
}
}

impl BufConfig for BloomcacheConfig {
fn buf(&self) -> &Buf {
&self.buf
}
}

impl DebugConfig for BloomcacheConfig {
fn debug(&self) -> &Debug {
&self.debug
}
}

impl KlogConfig for BloomcacheConfig {
fn klog(&self) -> &Klog {
&self.klog
}
}

impl ServerConfig for BloomcacheConfig {
fn server(&self) -> &Server {
&self.server
}
}

impl SockioConfig for BloomcacheConfig {
fn sockio(&self) -> &Sockio {
&self.sockio
}
}

impl TcpConfig for BloomcacheConfig {
fn tcp(&self) -> &Tcp {
&self.tcp
}
}

impl TimeConfig for BloomcacheConfig {
fn time(&self) -> &Time {
&self.time
}
}

impl TlsConfig for BloomcacheConfig {
fn tls(&self) -> &Tls {
&self.tls
}
}

impl WorkerConfig for BloomcacheConfig {
fn worker(&self) -> &Worker {
&self.worker
}

fn worker_mut(&mut self) -> &mut Worker {
&mut self.worker
}
}

impl BloomConfig for BloomcacheConfig {
fn bloom(&self) -> &Bloom {
&self.bloom
}
}

// implementation
impl BloomcacheConfig {
pub fn load(file: &str) -> Result<Self, std::io::Error> {
let mut file = std::fs::File::open(file)?;
let mut content = String::new();
file.read_to_string(&mut content)?;
match toml::from_str(&content) {
Ok(t) => Ok(t),
Err(e) => {
error!("{}", e);
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Error parsing config",
))
}
}
}

pub fn daemonize(&self) -> bool {
self.daemonize
}

pub fn pid_filename(&self) -> Option<String> {
self.pid_filename.clone()
}

pub fn dlog_interval(&self) -> usize {
self.dlog_interval
}
}

// trait implementations
impl Default for BloomcacheConfig {
fn default() -> Self {
Self {
daemonize: daemonize(),
pid_filename: pid_filename(),
dlog_interval: dlog_interval(),

admin: Default::default(),
server: Default::default(),
worker: Default::default(),
time: Default::default(),
bloom: Default::default(),

buf: Default::default(),
debug: Default::default(),
klog: Default::default(),
sockio: Default::default(),
tcp: Default::default(),
tls: Default::default(),
}
}
}
4 changes: 4 additions & 0 deletions src/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ extern crate log;

mod admin;
mod array;
mod bloom;
mod bloomcache;
mod buf;
mod dbuf;
mod debug;
Expand All @@ -28,6 +30,8 @@ mod worker;

pub use admin::{Admin, AdminConfig};
pub use array::ArrayConfig;
pub use bloom::{Bloom, BloomConfig};
pub use bloomcache::BloomcacheConfig;
pub use buf::{Buf, BufConfig};
pub use dbuf::DbufConfig;
pub use debug::{Debug, DebugConfig};
Expand Down
4 changes: 3 additions & 1 deletion src/entrystore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ config = { path = "../config" }
protocol-common = { path = "../protocol/common" }
protocol-memcache = { path = "../protocol/memcache" }
protocol-ping = { path = "../protocol/ping" }
seg = { path = "../storage/seg" }
protocol-http = { path = "../protocol/http" }
seg = { path = "../storage/seg" }
bloom = { path = "../storage/bloom" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer alphabetical ordering

54 changes: 54 additions & 0 deletions src/entrystore/src/bloom/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021 Twitter, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2022

// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use protocol_common::Execute;
use protocol_http::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not use this style here and would prefer two use statements.

request::{ParseData, Request, RequestData},
Headers, Response, Storage,
};

use crate::Bloom;

impl Execute<ParseData, Response> for Bloom {
fn execute(&mut self, data: &ParseData) -> Response {
let request = match &data.0 {
Ok(request) => request,
Err(e) => return e.to_response(),
};

let Request { headers, data } = request;
match data {
RequestData::Get(key) => self.get(key, headers),
RequestData::Put(key, value) => self.put(key, value, headers),
RequestData::Delete(key) => self.delete(key, headers),
}
}
}

impl Storage for Bloom {
fn get(&mut self, key: &[u8], _headers: &Headers) -> Response {
if self.data.contains(key) {
Response::builder(200).body(b"")
} else {
Response::builder(404).body(b"")
}
}

fn put(&mut self, key: &[u8], _value: &[u8], _headers: &Headers) -> Response {
let exists = self.data.contains(key);
self.data.insert(key);

if exists {
Response::builder(200).body(b"")
} else {
Response::builder(201).body(b"")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we us named status codes? Had to look this one up.

}
}

fn delete(&mut self, _key: &[u8], _headers: &Headers) -> Response {
let mut builder = Response::builder(405);
builder.header("Content-Type", b"text/plain");
builder.body(b"DELETE method not supported")
}
}
34 changes: 34 additions & 0 deletions src/entrystore/src/bloom/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2021 Twitter, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2022

// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use bloom::BloomFilter;
use config::BloomConfig;

use crate::EntryStore;

mod http;

/// A wrapper around [`bloom::BloomFilter`] which implements `EntryStore`
/// protocol traits.
pub struct Bloom {
data: BloomFilter<[u8]>,
}

impl Bloom {
/// Create a bloom filter storage based on the config.
pub fn new<T: BloomConfig>(config: &T) -> Result<Self, std::io::Error> {
// TODO: Validate the config here and return an error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we've been putting names on the TODOs - eg: TODO(bmartin):


let config = config.bloom();
Ok(Self {
data: BloomFilter::new(config.size * 8, config.hashes),
})
}
}

impl EntryStore for Bloom {
fn clear(&mut self) {
self.data.clear();
}
}
Loading