Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared memory hash table #1379

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pgrx-examples/shmem/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use pgrx::atomics::*;
use pgrx::lwlock::PgLwLock;
use pgrx::prelude::*;
use pgrx::shmem::*;
use pgrx::shmem_hash::*;
use pgrx::{pg_shmem_init, warning};
use serde::*;
use std::iter::Iterator;
Expand Down Expand Up @@ -39,6 +40,7 @@ static HASH: PgLwLock<heapless::FnvIndexMap<i32, i32, 4>> = PgLwLock::new();
static STRUCT: PgLwLock<Pgtest> = PgLwLock::new();
static PRIMITIVE: PgLwLock<i32> = PgLwLock::new();
static ATOMIC: PgAtomic<std::sync::atomic::AtomicBool> = PgAtomic::new();
static HASH_TABLE: PgHashMap<i64, f32> = PgHashMap::new(2);

#[pg_guard]
pub extern "C" fn _PG_init() {
Expand All @@ -48,6 +50,7 @@ pub extern "C" fn _PG_init() {
pg_shmem_init!(STRUCT);
pg_shmem_init!(PRIMITIVE);
pg_shmem_init!(ATOMIC);
pg_shmem_init!(HASH_TABLE);
}

#[pg_extern]
Expand All @@ -60,6 +63,26 @@ fn vec_count() -> i32 {
VEC.share().len() as i32
}

#[pg_extern]
fn hash_table_insert(key: i64, value: f32) {
HASH_TABLE.insert(key, value).unwrap();
}

#[pg_extern]
fn hash_table_get(key: i64) -> Option<f32> {
HASH_TABLE.get(key)
}

#[pg_extern]
fn hash_table_remove(key: i64) -> Option<f32> {
HASH_TABLE.remove(key)
}

#[pg_extern]
fn hash_table_len() -> i64 {
HASH_TABLE.len() as i64
}

#[pg_extern]
fn vec_drain() -> SetOfIterator<'static, Pgtest> {
let mut vec = VEC.exclusive();
Expand Down
1 change: 1 addition & 0 deletions pgrx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub mod pg_catalog;
pub mod pgbox;
pub mod rel;
pub mod shmem;
pub mod shmem_hash;
pub mod spi;
#[cfg(feature = "cshim")]
pub mod spinlock;
Expand Down
220 changes: 220 additions & 0 deletions pgrx/src/shmem_hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use crate::lwlock::*;
use crate::shmem::PgSharedMemoryInitialization;
use crate::PgSharedMem;
use crate::{pg_sys, PGRXSharedMemory};
use std::ffi::c_void;
use uuid::Uuid;

#[derive(Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum Error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A public error type shall be given a name that does not collide with the name of the Error trait.

HashTableFull,
}

#[derive(Copy, Clone)]
pub struct PgHashMapInner {
htab: *mut pg_sys::HTAB,
elements: u64,
}

unsafe impl PGRXSharedMemory for PgHashMapInner {}
unsafe impl Send for PgHashMapInner {}
unsafe impl Sync for PgHashMapInner {}

#[repr(align(8))]
#[derive(Copy, Clone, Debug)]
struct Key<K> {
// We copy it with std::ptr::copy, but we don't actually use the field
// in Rust, hence the warning.
#[allow(dead_code)]
key: K,
}

#[repr(align(8))]
#[derive(Copy, Clone, Debug)]
struct Value<V> {
value: V,
}

impl Default for PgHashMapInner {
fn default() -> Self {
Self { htab: std::ptr::null_mut(), elements: 0 }
}
}

pub struct PgHashMap<K: Copy + Clone, V: Copy + Clone> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type for new APIs shall not be prefixed with "Pg". It's Postgres, everything's Pg everywhere, all these types are namespaced via pgrx::, and this style is inconsistently used at best (Array vs. PgHeapTuple, for instance).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShmemHashMap would be fine.

htab: PgLwLock<PgHashMapInner>,
size: u64,
phantom_key: std::marker::PhantomData<K>,
phantom_value: std::marker::PhantomData<V>,
}

/// Compute the hash for the key and it's pointer
/// to pass to the hash_search. Lock on HTAB should be taken,
/// although not strictly required I think.
macro_rules! key {
($key:expr, $htab:expr) => {{
let key = Key { key: $key };
let key_ptr: *const c_void = std::ptr::addr_of!(key) as *const Key<K> as *const c_void;
let hash_value = unsafe { pg_sys::get_hash_value($htab.htab, key_ptr) };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment re: "this has to document the preconditions of this function so we know why it's safe to call". The answer of "there are virtually none and this is an in-principle safe function as long as we call it when loaded in the Postgres context" is fine.


(key_ptr, hash_value)
}};
}

/// Get the value pointer. It's stored next to the key.
/// https://github.com/postgres/postgres/blob/1f998863b0bc6fc8ef3d971d9c6d2c29b52d8ba2/src/backend/utils/hash/dynahash.c#L246-L250
macro_rules! value_ptr {
($entry:expr) => {{
let value_ptr: *mut Value<V> =
unsafe { $entry.offset(std::mem::size_of::<Key<K>>().try_into().unwrap()) }
as *mut Value<V>;

value_ptr
}};
}

impl<K: Copy + Clone, V: Copy + Clone> PgHashMap<K, V> {
/// Create new PgHashMap. This still needs to be allocated with
/// `pg_shmem_init!` just like any other shared memory structure.
pub const fn new(size: u64) -> PgHashMap<K, V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's common for ::new() to be semantically equivalent to ::default(), so the size parameter's purpose should be documented rather than left implicit, if we require it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should also be made clear why it's a u64 instead of the more common usize. Yes, the difference "doesn't matter", but the difference between those two types informs what purpose it serves.

PgHashMap {
htab: PgLwLock::new(),
size,
phantom_key: std::marker::PhantomData,
phantom_value: std::marker::PhantomData,
}
}

/// Insert a key and value into the HashMap. If the key is already
/// present, it will be replaced. If the HashMap is full, return an error.
pub fn insert(&self, key: K, value: V) -> Result<(), Error> {
levkk marked this conversation as resolved.
Show resolved Hide resolved
let mut found = false;
let mut htab = self.htab.exclusive();
let (key_ptr, hash_value) = key!(key, htab);

let entry = unsafe {
pg_sys::hash_search_with_hash_value(
htab.htab,
key_ptr,
hash_value,
pg_sys::HASHACTION_HASH_FIND,
&mut found,
)
};

// If we don't do this check, pg will overwrite
// some random entry with our key/value...
if entry.is_null() && htab.elements == self.size {
return Err(Error::HashTableFull);
}

let entry = unsafe {
pg_sys::hash_search_with_hash_value(
htab.htab,
key_ptr,
hash_value,
pg_sys::HASHACTION_HASH_ENTER_NULL,
&mut found,
)
};

if !entry.is_null() {
let value_ptr = value_ptr!(entry);
let value = Value { value };
unsafe {
std::ptr::copy(std::ptr::addr_of!(value), value_ptr, 1);
}
htab.elements += 1;
Ok(())
} else {
// OOM.
return Err(Error::HashTableFull);
}
}

/// Get a value from the HashMap using the key.
/// If the key doesn't exist, return None.
pub fn get(&self, key: K) -> Option<V> {
let htab = self.htab.exclusive();
let (key_ptr, hash_value) = key!(key, htab);

let entry = unsafe {
pg_sys::hash_search_with_hash_value(
Comment on lines +170 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document the preconditions for calling this function somewhere in this file, you can reference them in later unsafe calls to the same function.

htab.htab,
key_ptr,
hash_value,
pg_sys::HASHACTION_HASH_FIND,
std::ptr::null_mut(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, it is good to know why passing nullptr is okay here.

)
};

if entry.is_null() {
return None;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return None;
None

} else {
let value_ptr = value_ptr!(entry);
let value = unsafe { std::ptr::read(value_ptr) };
return Some(value.value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return Some(value.value);
Some(value.value)

Comment on lines +183 to +185
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this returns V and not &V? Because we cannot rely on concurrent changes not altering the data in the K/V pair? It would be good to understand the soundness conditions better here and why things look the way they do.

}
}

/// Remove the value from the HashMap and return it.
pub fn remove(&self, key: K) -> Option<V> {
if let Some(value) = self.get(key) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably use let-else, like so:

Suggested change
if let Some(value) = self.get(key) {
let Some(value) = self.get(key) else { return None };

let mut htab = self.htab.exclusive();
let (key_ptr, hash_value) = key!(key, htab);

// Dangling pointer, don't touch it.
let _ = unsafe {
pg_sys::hash_search_with_hash_value(
htab.htab,
key_ptr,
hash_value,
pg_sys::HASHACTION_HASH_REMOVE,
std::ptr::null_mut(),
);
};

htab.elements -= 1;
return Some(value);
} else {
return None;
}
Comment on lines +208 to +211
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if you do use let-else this should probably be updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this, but then it will want you to hit it with rustfmt.

Suggested change
return Some(value);
} else {
return None;
}
Some(value)

}

/// Get the number of elements in the HashMap.
pub fn len(&self) -> usize {
let htab = self.htab.exclusive();
htab.elements.try_into().unwrap()
}
}

impl<K: Copy + Clone, V: Copy + Clone> PgSharedMemoryInitialization for PgHashMap<K, V> {
fn pg_init(&'static self) {
PgSharedMem::pg_init_locked(&self.htab);
}

fn shmem_init(&'static self) {
PgSharedMem::shmem_init_locked(&self.htab);
let mut htab = self.htab.exclusive();

let mut hash_ctl = pg_sys::HASHCTL::default();
hash_ctl.keysize = std::mem::size_of::<Key<K>>();
hash_ctl.entrysize = std::mem::size_of::<Value<V>>();

let shm_name =
alloc::ffi::CString::new(Uuid::new_v4().to_string()).expect("CString::new() failed");

let htab_ptr = unsafe {
pg_sys::ShmemInitHash(
Comment on lines +238 to +239
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preconditions, postconditions, "why is this sound?", etc.

shm_name.into_raw(),
self.size.try_into().unwrap(),
self.size.try_into().unwrap(),
&mut hash_ctl,
(pg_sys::HASH_ELEM | pg_sys::HASH_BLOBS).try_into().unwrap(),
)
};

htab.htab = htab_ptr;
}
}