Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilization of part-3 of shared-memory support, based on review comments. #616

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ $(BINDIR)/$(UNITDIR)/splinter_shmem_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ) \
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/splinter_shmem_oom_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ) \
$(LIBDIR)/libsplinterdb.so

$(BINDIR)/$(UNITDIR)/splinter_ipc_test: $(UTIL_SYS) \
$(COMMON_UNIT_TESTOBJ)

Expand All @@ -495,8 +499,6 @@ $(BINDIR)/$(UNITDIR)/splinterdb_heap_id_mgmt_test: $(COMMON_TESTOBJ) \
$(OBJDIR)/$(FUNCTIONAL_TESTSDIR)/test_async.o \
$(LIBDIR)/libsplinterdb.so



########################################
# Convenience mini unit-test targets
unit/util_test: $(BINDIR)/$(UNITDIR)/util_test
Expand Down
2 changes: 1 addition & 1 deletion include/splinterdb/splinterdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ splinterdb_open(const splinterdb_config *cfg, splinterdb **kvs);
// Close a splinterdb
//
// This will flush all data to disk and release all resources
void
int
splinterdb_close(splinterdb **kvs);

// Register the current thread so that it can be used with splinterdb.
Expand Down
1 change: 0 additions & 1 deletion src/PackedArray.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ void PACKEDARRAY_JOIN(__PackedArray_unpack_, PACKEDARRAY_IMPL_BITS_PER_ITEM)(con
#include "poison.h"

#define PACKEDARRAY_MALLOC(size) platform_malloc(size)
#define PACKEDARRAY_FREE(p) platform_free(p)

void PackedArray_pack(uint32* a, const uint32 offset, const uint32* in, uint32 count, size_t bitsPerItem)
{
Expand Down
18 changes: 9 additions & 9 deletions src/btree.c
Original file line number Diff line number Diff line change
Expand Up @@ -3103,6 +3103,12 @@ btree_pack_link_extent(btree_pack_req *req,
req->num_edges[height] = 0;
}

static inline bool
btree_pack_can_fit_tuple(btree_pack_req *req)
{
return req->num_tuples < req->max_tuples;
}

static inline btree_node *
btree_pack_create_next_node(btree_pack_req *req, uint64 height, key pivot)
{
Expand Down Expand Up @@ -3167,8 +3173,8 @@ btree_pack_loop(btree_pack_req *req, // IN/OUT
log_trace_key(tuple_key, "btree_pack_loop (bottom)");

if (req->hash) {
platform_assert(req->num_tuples < req->max_tuples);
req->fingerprint_arr[req->num_tuples] =
platform_assert(btree_pack_can_fit_tuple(req));
fingerprint_start(&req->fingerprint)[req->num_tuples] =
req->hash(key_data(tuple_key), key_length(tuple_key), req->seed);
}

Expand Down Expand Up @@ -3216,12 +3222,6 @@ btree_pack_post_loop(btree_pack_req *req, key last_key)
mini_release(&req->mini, last_key);
}

static bool32
btree_pack_can_fit_tuple(btree_pack_req *req, key tuple_key, message data)
{
return req->num_tuples < req->max_tuples;
}

static void
btree_pack_abort(btree_pack_req *req)
{
Expand Down Expand Up @@ -3259,7 +3259,7 @@ btree_pack(btree_pack_req *req)

while (iterator_can_next(req->itor)) {
iterator_curr(req->itor, &tuple_key, &data);
if (!btree_pack_can_fit_tuple(req, tuple_key, data)) {
if (!btree_pack_can_fit_tuple(req)) {
platform_error_log("%s(): req->num_tuples=%lu exceeded output size "
"limit, req->max_tuples=%lu\n",
__func__,
Expand Down
24 changes: 15 additions & 9 deletions src/btree.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ typedef struct btree_pack_req {
btree_config *cfg;
iterator *itor; // the itor which is being packed
uint64 max_tuples;
hash_fn hash; // hash function used for calculating filter_hash
unsigned int seed; // seed used for calculating filter_hash
uint32 *fingerprint_arr; // IN/OUT: hashes of the keys in the tree
hash_fn hash; // hash function used for calculating filter_hash
unsigned int seed; // seed used for calculating filter_hash
fp_hdr fingerprint; // IN/OUT: hashes of the keys in the tree

// internal data
uint16 height;
Expand All @@ -168,6 +168,7 @@ typedef struct btree_pack_req {
uint64 num_tuples; // no. of tuples in the output tree
uint64 key_bytes; // total size of keys in tuples of the output tree
uint64 message_bytes; // total size of msgs in tuples of the output tree
uint64 line; // Caller's line #
} btree_pack_req;

struct btree_async_ctxt;
Expand Down Expand Up @@ -325,6 +326,10 @@ btree_iterator_init(cache *cc,
void
btree_iterator_deinit(btree_iterator *itor);

/*
* Initialize BTree Pack request structure. May allocate memory for fingerprint
* array.
*/
static inline platform_status
btree_pack_req_init(btree_pack_req *req,
cache *cc,
Expand All @@ -343,26 +348,27 @@ btree_pack_req_init(btree_pack_req *req,
req->hash = hash;
req->seed = seed;
if (hash != NULL && max_tuples > 0) {
req->fingerprint_arr =
TYPED_ARRAY_ZALLOC(hid, req->fingerprint_arr, max_tuples);

fingerprint_init(&req->fingerprint, hid, max_tuples); // Allocates memory

// When we run with shared-memory configured, we expect that it is sized
// big-enough to not get OOMs from here. Hence, only a debug_assert().
debug_assert(req->fingerprint_arr,
debug_assert(!fingerprint_is_empty(&req->fingerprint),
"Unable to allocate memory for %lu tuples",
max_tuples);
if (!req->fingerprint_arr) {
if (fingerprint_is_empty(&req->fingerprint)) {
return STATUS_NO_MEMORY;
}
}
return STATUS_OK;
}

// Free memory if any was allocated for fingerprint array.
static inline void
btree_pack_req_deinit(btree_pack_req *req, platform_heap_id hid)
{
if (req->fingerprint_arr) {
platform_free(hid, req->fingerprint_arr);
if (!fingerprint_is_empty(&req->fingerprint)) {
fingerprint_deinit(hid, &req->fingerprint);
}
}

Expand Down
42 changes: 29 additions & 13 deletions src/clockcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1818,20 +1818,25 @@ clockcache_init(clockcache *cc, // OUT
cc->heap_id = hid;

/* lookup maps addrs to entries, entry contains the entries themselves */
cc->lookup =
TYPED_ARRAY_MALLOC(cc->heap_id, cc->lookup, allocator_page_capacity);
platform_memfrag memfrag_cc_lookup;
cc->lookup = TYPED_ARRAY_MALLOC_MF(
&memfrag_cc_lookup, cc->heap_id, cc->lookup, allocator_page_capacity);
if (!cc->lookup) {
goto alloc_error;
}
cc->lookup_size = memfrag_size(&memfrag_cc_lookup);

for (i = 0; i < allocator_page_capacity; i++) {
cc->lookup[i] = CC_UNMAPPED_ENTRY;
}

cc->entry =
TYPED_ARRAY_ZALLOC(cc->heap_id, cc->entry, cc->cfg->page_capacity);
platform_memfrag memfrag_cc_entry;
cc->entry = TYPED_ARRAY_ZALLOC_MF(
&memfrag_cc_entry, cc->heap_id, cc->entry, cc->cfg->page_capacity);
if (!cc->entry) {
goto alloc_error;
}
cc->entry_size = memfrag_size(&memfrag_cc_entry);

platform_status rc = STATUS_NO_MEMORY;

Expand Down Expand Up @@ -1860,11 +1865,13 @@ clockcache_init(clockcache *cc, // OUT
cc->refcount = platform_buffer_getaddr(&cc->rc_bh);

/* Separate ref counts for pins */
cc->pincount =
TYPED_ARRAY_ZALLOC(cc->heap_id, cc->pincount, cc->cfg->page_capacity);
platform_memfrag memfrag_cc_pincount;
cc->pincount = TYPED_ARRAY_ZALLOC_MF(
&memfrag_cc_pincount, cc->heap_id, cc->pincount, cc->cfg->page_capacity);
if (!cc->pincount) {
goto alloc_error;
}
cc->pincount_size = memfrag_size(&memfrag_cc_pincount);

/* The hands and associated page */
cc->free_hand = 0;
Expand All @@ -1873,13 +1880,16 @@ clockcache_init(clockcache *cc, // OUT
cc->per_thread[thr_i].free_hand = CC_UNMAPPED_ENTRY;
cc->per_thread[thr_i].enable_sync_get = TRUE;
}
platform_memfrag memfrag_cc_batch_busy;
cc->batch_busy =
TYPED_ARRAY_ZALLOC(cc->heap_id,
cc->batch_busy,
cc->cfg->page_capacity / CC_ENTRIES_PER_BATCH);
TYPED_ARRAY_ZALLOC_MF(&memfrag_cc_batch_busy,
cc->heap_id,
cc->batch_busy,
(cc->cfg->page_capacity / CC_ENTRIES_PER_BATCH));
if (!cc->batch_busy) {
goto alloc_error;
}
cc->batch_busy_size = memfrag_size(&memfrag_cc_batch_busy);

return STATUS_OK;

Expand Down Expand Up @@ -1907,10 +1917,12 @@ clockcache_deinit(clockcache *cc) // IN/OUT
}

if (cc->lookup) {
platform_free(cc->heap_id, cc->lookup);
platform_free_mem(cc->heap_id, cc->lookup, cc->lookup_size);
cc->lookup = NULL;
}
if (cc->entry) {
platform_free(cc->heap_id, cc->entry);
platform_free_mem(cc->heap_id, cc->entry, cc->entry_size);
cc->entry = NULL;
}

debug_only platform_status rc = STATUS_TEST_FAILED;
Expand All @@ -1929,11 +1941,15 @@ clockcache_deinit(clockcache *cc) // IN/OUT
cc->refcount = NULL;
}

platform_memfrag mf = {0};
if (cc->pincount) {
platform_free_volatile(cc->heap_id, cc->pincount);
memfrag_init(&mf, cc->heap_id, (void *)cc->pincount, cc->pincount_size);
platform_free_volatile(cc->heap_id, &mf);
}
if (cc->batch_busy) {
platform_free_volatile(cc->heap_id, cc->batch_busy);
memfrag_init(
&mf, cc->heap_id, (void *)cc->batch_busy, cc->batch_busy_size);
platform_free_volatile(cc->heap_id, &mf);
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/clockcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,17 @@ struct clockcache {

// Stats
cache_stats stats[MAX_THREADS];
size_t lookup_size;
size_t entry_size;
size_t pincount_size;
size_t batch_busy_size;
};


/*
*-----------------------------------------------------------------------------
* Function declarations
*-----------------------------------------------------------------------------
*/

void
clockcache_config_init(clockcache_config *cache_config,
io_config *io_cfg,
Expand Down
13 changes: 8 additions & 5 deletions src/memtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,19 @@ memtable_context_create(platform_heap_id hid,
process_fn process,
void *process_ctxt)
{
platform_memfrag memfrag_ctxt = {0};
memtable_context *ctxt =
TYPED_FLEXIBLE_STRUCT_ZALLOC(hid, ctxt, mt, cfg->max_memtables);
ctxt->cc = cc;
ctxt->mf_size = memfrag_size(&memfrag_ctxt);
ctxt->cc = cc;
memmove(&ctxt->cfg, cfg, sizeof(ctxt->cfg));

platform_mutex_init(
&ctxt->incorporation_mutex, platform_get_module_id(), hid);
ctxt->rwlock = TYPED_MALLOC(hid, ctxt->rwlock);
platform_memfrag memfrag_rwlock = {0};
ctxt->rwlock = TYPED_MALLOC_MF(&memfrag_rwlock, hid, ctxt->rwlock);
platform_batch_rwlock_init(ctxt->rwlock);
ctxt->rwlock_mf_size = memfrag_size(&memfrag_rwlock);

for (uint64 mt_no = 0; mt_no < cfg->max_memtables; mt_no++) {
uint64 generation = mt_no;
Expand Down Expand Up @@ -343,9 +347,8 @@ memtable_context_destroy(platform_heap_id hid, memtable_context *ctxt)
}

platform_mutex_destroy(&ctxt->incorporation_mutex);
platform_free(hid, ctxt->rwlock);

platform_free(hid, ctxt);
platform_free_mem(hid, ctxt->rwlock, ctxt->rwlock_mf_size);
platform_free_mem(hid, ctxt, ctxt->mf_size);
}

void
Expand Down
4 changes: 3 additions & 1 deletion src/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ typedef struct memtable_context {
// read lock to read and write lock to modify.
volatile uint64 generation_retired;

bool32 is_empty;
bool is_empty;
size_t mf_size; // # of bytes of memory allocated to this struct
size_t rwlock_mf_size; // # of bytes of memory allocated to rwlock

// Effectively thread local, no locking at all:
btree_scratch scratch[MAX_THREADS];
Expand Down
3 changes: 2 additions & 1 deletion src/merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ merge_iterator_create(platform_heap_id hid,
== ARRAY_SIZE(merge_itor->ordered_iterators),
"size mismatch");

platform_memfrag memfrag_merge_itor;
merge_itor = TYPED_ZALLOC(PROCESS_PRIVATE_HEAP_ID, merge_itor);
if (merge_itor == NULL) {
return STATUS_NO_MEMORY;
Expand Down Expand Up @@ -598,7 +599,7 @@ platform_status
merge_iterator_destroy(platform_heap_id hid, merge_iterator **merge_itor)
{
merge_accumulator_deinit(&(*merge_itor)->merge_buffer);
platform_free(PROCESS_PRIVATE_HEAP_ID, *merge_itor);
platform_free_heap(PROCESS_PRIVATE_HEAP_ID, *merge_itor);
*merge_itor = NULL;

return STATUS_OK;
Expand Down
14 changes: 9 additions & 5 deletions src/pcq.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

typedef struct {
uint32 num_elems;
cache_aligned_uint32 tail; // Producers enqueue to here
cache_aligned_uint32 head; // Consumer dequeues from here
cache_aligned_uint32 tail; // Producers enqueue to here
cache_aligned_uint32 head; // Consumer dequeues from here
size_t mf_size; // of memory fragment allocated for this struct
void *elems[];
} pcq;

Expand All @@ -28,9 +29,11 @@ pcq_alloc(platform_heap_id hid, size_t num_elems)
{
pcq *q;

platform_memfrag memfrag_q;
q = TYPED_FLEXIBLE_STRUCT_ZALLOC(hid, q, elems, num_elems);
if (q != NULL) {
q->num_elems = num_elems;
q->mf_size = memfrag_size(&memfrag_q);
}

return q;
Expand Down Expand Up @@ -61,11 +64,12 @@ pcq_is_full(const pcq *q)
return pcq_count(q) == q->num_elems;
}

// Deallocate a PCQ
// Deallocate a PCQ, and NULL out input handle
static inline void
pcq_free(platform_heap_id hid, pcq *q)
pcq_free(platform_heap_id hid, pcq **q)
{
platform_free(hid, q);
platform_free_mem(hid, *q, (*q)->mf_size);
*q = NULL;
}

// Enqueue an elem to a PCQ. Element must not be NULL
Expand Down
Loading