Skip to content

Commit

Permalink
fix shmem API on Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
usamoi committed Nov 1, 2024
1 parent 9b28a4e commit da9862b
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 54 deletions.
40 changes: 35 additions & 5 deletions pgrx-examples/shmem/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,45 @@ pub struct Pgtest {

unsafe impl PGRXSharedMemory for Pgtest {}

static DEQUE: PgLwLock<heapless::Deque<Pgtest, 400>> = PgLwLock::new();
static VEC: PgLwLock<heapless::Vec<Pgtest, 400>> = PgLwLock::new();
static HASH: PgLwLock<heapless::FnvIndexMap<i32, i32, 4>> = PgLwLock::new();
static STRUCT: PgLwLock<Pgtest> = PgLwLock::new();
static PRIMITIVE: PgLwLock<i32> = PgLwLock::new();
static DEQUE: PgLwLock<heapless::Deque<Pgtest, 400>> = PgLwLock::new("shmem_deque");
static VEC: PgLwLock<heapless::Vec<Pgtest, 400>> = PgLwLock::new("shmem_vec");
static HASH: PgLwLock<heapless::FnvIndexMap<i32, i32, 4>> = PgLwLock::new("shmem_hash");
static STRUCT: PgLwLock<Pgtest> = PgLwLock::new("shmem_struct");
static PRIMITIVE: PgLwLock<i32> = PgLwLock::new("shmem_primtive");
static ATOMIC: PgAtomic<std::sync::atomic::AtomicBool> = PgAtomic::new();

#[pg_guard]
pub extern "C" fn _PG_init() {
unsafe {
static mut PREV_SHMEM_REQUEST_HOOK: Option<unsafe extern "C" fn()> = None;
PREV_SHMEM_REQUEST_HOOK = pg_sys::shmem_request_hook;
pg_sys::shmem_request_hook = Some(__pgrx_private_shmem_request_hook);

#[pg_guard]
extern "C" fn __pgrx_private_shmem_request_hook() {
unsafe {
eprintln!("shmem request hook is called from process {}", std::process::id());
if let Some(i) = PREV_SHMEM_REQUEST_HOOK {
i();
}
}
}
}
unsafe {
static mut PREV_SHMEM_STARTUP_HOOK: Option<unsafe extern "C" fn()> = None;
PREV_SHMEM_STARTUP_HOOK = pg_sys::shmem_startup_hook;
pg_sys::shmem_startup_hook = Some(__pgrx_private_shmem_hook);

#[pg_guard]
extern "C" fn __pgrx_private_shmem_hook() {
unsafe {
eprintln!("shmem startup hook is called from process {}", std::process::id());
if let Some(i) = PREV_SHMEM_STARTUP_HOOK {
i();
}
}
}
}
pg_shmem_init!(DEQUE);
pg_shmem_init!(VEC);
pg_shmem_init!(HASH);
Expand Down
1 change: 0 additions & 1 deletion pgrx-tests/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ mod rel_tests;
mod result_tests;
mod roundtrip_tests;
mod schema_tests;
#[cfg(target_family = "unix")]
mod shmem_tests;
mod spi_tests;
mod srf_tests;
Expand Down
2 changes: 1 addition & 1 deletion pgrx-tests/src/tests/shmem_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use pgrx::{pg_shmem_init, PgAtomic, PgLwLock, PgSharedMemoryInitialization};
use std::sync::atomic::AtomicBool;

static ATOMIC: PgAtomic<AtomicBool> = PgAtomic::new();
static LWLOCK: PgLwLock<bool> = PgLwLock::new();
static LWLOCK: PgLwLock<bool> = PgLwLock::new("pgrx_tests_lwlock");

#[pg_guard]
pub extern "C" fn _PG_init() {
Expand Down
62 changes: 22 additions & 40 deletions pgrx/src/lwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
#![allow(clippy::needless_borrow)]
use crate::pg_sys;
use core::ops::{Deref, DerefMut};
use once_cell::sync::OnceCell;
use std::cell::UnsafeCell;
use std::fmt;
use uuid::Uuid;

/// A Rust locking mechanism which uses a PostgreSQL LWLock to lock the data
///
Expand All @@ -34,7 +32,7 @@ use uuid::Uuid;
/// PostgreSQL cleanly.
pub struct PgLwLock<T> {
inner: UnsafeCell<PgLwLockInner<T>>,
name: OnceCell<&'static str>,
name: &'static str,
}

unsafe impl<T: Send> Send for PgLwLock<T> {}
Expand All @@ -43,28 +41,13 @@ unsafe impl<T: Send + Sync> Sync for PgLwLock<T> {}
impl<T> PgLwLock<T> {
/// Create an empty lock which can be created as a global with None as a
/// sentinel value
pub const fn new() -> Self {
PgLwLock { inner: UnsafeCell::new(PgLwLockInner::empty()), name: OnceCell::new() }
}

/// Create a new lock for T by attaching a LWLock, which is looked up by name
pub fn from_named(input_name: &'static str, value: *mut T) -> Self {
let name = OnceCell::new();
let inner = UnsafeCell::new(PgLwLockInner::<T>::new(input_name, value));
name.set(input_name).unwrap();
PgLwLock { inner, name }
pub const fn new(name: &'static str) -> Self {
PgLwLock { inner: UnsafeCell::new(PgLwLockInner::empty()), name }
}

/// Get the name of the PgLwLock
pub fn get_name(&self) -> &'static str {
match self.name.get() {
None => {
let name = Box::leak(Uuid::new_v4().to_string().into_boxed_str());
self.name.set(name).unwrap();
name
}
Some(name) => name,
}
self.name
}

/// Obtain a shared lock (which comes with `&T` access)
Expand All @@ -79,14 +62,19 @@ impl<T> PgLwLock<T> {

/// Attach an empty PgLwLock lock to a LWLock, and wrap T
/// SAFETY: Must only be called from inside the Postgres shared memory init hook
pub unsafe fn attach(&self, value: *mut T) {
*self.inner.get() = PgLwLockInner::<T>::new(self.get_name(), value);
pub unsafe fn attach(&self, value: *mut PgLwLockShared<T>) {
*self.inner.get() = PgLwLockInner::<T>::new(value);
}
}

#[repr(C)]
pub struct PgLwLockShared<T> {
pub data: T,
pub lock_ptr: *mut pg_sys::LWLock,
}

pub struct PgLwLockInner<T> {
lock_ptr: *mut pg_sys::LWLock,
data: *mut T,
shared: *mut PgLwLockShared<T>,
}

impl<T> fmt::Debug for PgLwLockInner<T> {
Expand All @@ -96,39 +84,33 @@ impl<T> fmt::Debug for PgLwLockInner<T> {
}

impl<'a, T> PgLwLockInner<T> {
fn new(name: &'static str, data: *mut T) -> Self {
unsafe {
let lock = alloc::ffi::CString::new(name).expect("CString::new failed");
PgLwLockInner {
lock_ptr: &mut (*pg_sys::GetNamedLWLockTranche(lock.as_ptr())).lock,
data,
}
}
unsafe fn new(shared: *mut PgLwLockShared<T>) -> Self {
PgLwLockInner { shared }
}

const fn empty() -> Self {
PgLwLockInner { lock_ptr: std::ptr::null_mut(), data: std::ptr::null_mut() }
PgLwLockInner { shared: std::ptr::null_mut() }
}

fn share(&self) -> PgLwLockShareGuard<T> {
if self.lock_ptr.is_null() {
if self.shared.is_null() {
panic!("PgLwLock is not initialized");
}
unsafe {
pg_sys::LWLockAcquire(self.lock_ptr, pg_sys::LWLockMode::LW_SHARED);
pg_sys::LWLockAcquire((*self.shared).lock_ptr, pg_sys::LWLockMode::LW_SHARED);

PgLwLockShareGuard { data: self.data.as_ref().unwrap(), lock: self.lock_ptr }
PgLwLockShareGuard { data: &(*self.shared).data, lock: (*self.shared).lock_ptr }
}
}

fn exclusive(&self) -> PgLwLockExclusiveGuard<T> {
if self.lock_ptr.is_null() {
if self.shared.is_null() {
panic!("PgLwLock is not initialized");
}
unsafe {
pg_sys::LWLockAcquire(self.lock_ptr, pg_sys::LWLockMode::LW_EXCLUSIVE);
pg_sys::LWLockAcquire((*self.shared).lock_ptr, pg_sys::LWLockMode::LW_EXCLUSIVE);

PgLwLockExclusiveGuard { data: self.data.as_mut().unwrap(), lock: self.lock_ptr }
PgLwLockExclusiveGuard { data: &mut (*self.shared).data, lock: (*self.shared).lock_ptr }
}
}
}
Expand Down
23 changes: 16 additions & 7 deletions pgrx/src/shmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,20 @@ impl PgSharedMem {
&mut (*pg_sys::MainLWLockArray.add(21)).lock;
pg_sys::LWLockAcquire(addin_shmem_init_lock, pg_sys::LWLockMode::LW_EXCLUSIVE);

let fv_shmem =
pg_sys::ShmemInitStruct(shm_name.into_raw(), std::mem::size_of::<T>(), &mut found)
as *mut T;

std::ptr::write(fv_shmem, <T>::default());
let fv_shmem = pg_sys::ShmemInitStruct(
shm_name.clone().into_raw(),
std::mem::size_of::<T>(),
&mut found,
) as *mut PgLwLockShared<T>;

if !found {
fv_shmem.write(PgLwLockShared {
data: T::default(),
lock_ptr: &raw mut (*pg_sys::GetNamedLWLockTranche(shm_name.as_ptr())).lock,
});
}
lock.attach(fv_shmem);

pg_sys::LWLockRelease(addin_shmem_init_lock);
}
}
Expand All @@ -212,9 +219,11 @@ impl PgSharedMem {
pg_sys::ShmemInitStruct(shm_name.into_raw(), std::mem::size_of::<T>(), &mut found)
as *mut T;

if !found {
fv_shmem.write(T::default());
}
atomic.attach(fv_shmem);
let atomic = T::default();
std::ptr::copy(&atomic, fv_shmem, 1);

pg_sys::LWLockRelease(addin_shmem_init_lock);
}
}
Expand Down

0 comments on commit da9862b

Please sign in to comment.