Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hashset feature] Replace dict with hashset for keys and expire #1178

Draft
wants to merge 77 commits into
base: hashset
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
e44f048
WIP new hashtable
zuiderkwast Jun 2, 2024
a9178ad
More implementation
zuiderkwast Aug 23, 2024
f3af277
Lots of changes (resize, scan, etc)
zuiderkwast Aug 26, 2024
0ec3218
Add release, delete, pop + rename and reorder stuff
zuiderkwast Aug 27, 2024
29a9422
Spelling
zuiderkwast Aug 27, 2024
b34094a
clang-format (with manual rewrites to avoid superlong lines)
zuiderkwast Aug 27, 2024
c89cd6b
Implement iterators, fix scan end condition
zuiderkwast Sep 1, 2024
5db23c9
update todo in .h file
zuiderkwast Sep 1, 2024
5a6c775
Some updates
zuiderkwast Sep 2, 2024
cc7a25e
Add two-phase insert
zuiderkwast Sep 2, 2024
bafce7d
Add random element, some refac
zuiderkwast Sep 5, 2024
e59548c
Tuning, cleanup
zuiderkwast Sep 6, 2024
e67b148
empty, with progress callback
zuiderkwast Sep 6, 2024
3580b38
Add stats collection functions, untested
zuiderkwast Sep 6, 2024
2d997f6
Add two-phase pop
zuiderkwast Sep 8, 2024
7b00ece
Add 'instant_rehashing' type flag (non-incremental)
zuiderkwast Sep 8, 2024
dc0ae98
Spellcheck
zuiderkwast Sep 11, 2024
638d222
Clang-format
zuiderkwast Sep 11, 2024
0cc2b99
clang-format .h file
zuiderkwast Sep 11, 2024
a62a16f
Build errors
zuiderkwast Sep 11, 2024
e82b69b
add hashtabFindRef
zuiderkwast Sep 21, 2024
fbccb4b
Rename to hashset
zuiderkwast Sep 21, 2024
9c139bf
Add hashsetBuckets() and HASHET_BUCKET_SIZE
zuiderkwast Sep 23, 2024
75f054a
Try to fix 32-bit warnings
zuiderkwast Sep 24, 2024
52649ca
Merge remote-tracking branch 'valkey/unstable' into hashset
zuiderkwast Sep 25, 2024
765eec0
Replace dict with hashset in command tables (#1065)
SoftlyRaining Sep 27, 2024
36e7c8a
Add hashsetDefragInternals (defrag helper)
zuiderkwast Sep 28, 2024
9c34fe7
Some hashset functions
zuiderkwast Sep 28, 2024
2b77a02
Merge remote-tracking branch 'valkey/unstable' into hashset
zuiderkwast Oct 1, 2024
2c76792
Fix wrong indexes in hashtabScan while rehashing
zuiderkwast Oct 2, 2024
8a4875d
Implement resizeAllowed callback
zuiderkwast Oct 2, 2024
20fade7
Fix error in call to hashset callback resizeAllowed
zuiderkwast Oct 2, 2024
3c91ea5
Fix hashset 'htstats' implementation
zuiderkwast Oct 7, 2024
6cc89bc
Merge remote-tracking branch 'valkey/unstable' into hashset
zuiderkwast Oct 7, 2024
8bae46f
Avoid __builtin_clzl(0) which is undefined
zuiderkwast Oct 10, 2024
7e70d65
Add 'everfulls' counter and rehash if too many
zuiderkwast Oct 10, 2024
36ac9c6
Prevent infinite loop due to tombstones in find and scan
zuiderkwast Oct 10, 2024
ce75787
Add unit test for full-of-tombstones prevention
zuiderkwast Oct 10, 2024
8b5c624
Fix skip-already-rehashed error in scan
zuiderkwast Oct 10, 2024
ce68f5c
Don't rehash a step at scan
zuiderkwast Oct 11, 2024
1809564
Fix spelling and 32-bit build warning
zuiderkwast Oct 11, 2024
42d8d59
Hashset two-phase pop find ref
zuiderkwast Oct 14, 2024
555fe40
Merge remote-tracking branch 'valkey/unstable' into hashset
zuiderkwast Oct 14, 2024
f5f4380
Fix missing release memory in unit test
zuiderkwast Oct 14, 2024
c855cfb
Add valkeyCreate() and valkeyGetKey() functions
zuiderkwast Sep 11, 2024
b959b1b
Clang-format
zuiderkwast Sep 11, 2024
e218b8d
Replace dict in kvstore and more, WIP
zuiderkwast Sep 21, 2024
7599d85
Kvstore and defrag WIP
zuiderkwast Sep 23, 2024
8427bc5
Make it compile
zuiderkwast Sep 25, 2024
170e343
Handle dbAdd and dbReplaceValue return value
zuiderkwast Sep 25, 2024
559ab0e
Fix error in valkeyGetExpire
zuiderkwast Sep 26, 2024
8e045a7
Fix some things including MOVE
zuiderkwast Sep 27, 2024
7d47457
Fix (delete before add) in RENAME command
zuiderkwast Sep 28, 2024
b1a77cc
Fix memory leak when overwriting a key
zuiderkwast Sep 29, 2024
5907a1d
Optimizations in db.c
zuiderkwast Sep 29, 2024
1b1fb67
Optimization in dbGenericDelete for expires
zuiderkwast Sep 30, 2024
ba298c6
Fix leak when freeing non-string obj with key
zuiderkwast Oct 1, 2024
a8ef616
Minor stuff in kvstore and hashset
zuiderkwast Oct 2, 2024
b47df73
Fix test case "expire scan should skip dictionaries with lot's of emp…
zuiderkwast Oct 2, 2024
9c9bf21
Fix resize allowed maxmemory check
zuiderkwast Oct 4, 2024
0538d28
Fix kvstore unit tests
zuiderkwast Oct 4, 2024
6402519
Fix maxmemory test (increase size)
zuiderkwast Oct 4, 2024
02450c3
Delete test cases about shared integers
zuiderkwast Oct 7, 2024
50075ea
Scan single step in active expire
zuiderkwast Oct 7, 2024
6ce144b
Seed hashset hash function on startup
zuiderkwast Oct 7, 2024
3a9f35b
Set resize policy when a fork is active, and fix test case
zuiderkwast Oct 8, 2024
680887e
Fix rehashing test case in unit/info suite
zuiderkwast Oct 8, 2024
2d0023e
Fix more resize test cases in unit/other
zuiderkwast Oct 8, 2024
6a06e53
Fix scan testcases not expecting duplicates
zuiderkwast Oct 8, 2024
b848bf4
Another run at the unit/expire test suite
zuiderkwast Oct 8, 2024
4001406
Fix typo in defrag pubsub channels
zuiderkwast Oct 11, 2024
dc8f62d
Make dbAddRDBLoad return the (maybe reallocated) object
zuiderkwast Oct 12, 2024
8dd9658
Fix LRU issue when overwriting existing key
zuiderkwast Oct 14, 2024
5911b9c
Fix module string-DMA issue
zuiderkwast Oct 14, 2024
7a16526
Add missing unit test file test_object.c
zuiderkwast Oct 14, 2024
364e1f7
Account for scan duplicates in valkey-cli test
zuiderkwast Oct 14, 2024
cc16018
Fix refcount bits in robj
zuiderkwast Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashset.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
62 changes: 31 additions & 31 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -652,14 +652,14 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int
unsigned long id = cmd->id;
ACLSetSelectorCommandBit(selector, id, allow);
ACLResetFirstArgsForCommand(selector, id);
if (cmd->subcommands_dict) {
dictEntry *de;
dictIterator *di = dictGetSafeIterator(cmd->subcommands_dict);
while ((de = dictNext(di)) != NULL) {
struct serverCommand *sub = (struct serverCommand *)dictGetVal(de);
if (cmd->subcommands_set) {
hashsetIterator iter;
hashsetInitSafeIterator(&iter, cmd->subcommands_set);
struct serverCommand *sub;
while (hashsetNext(&iter, (void **)&sub)) {
ACLSetSelectorCommandBit(selector, sub->id, allow);
}
dictReleaseIterator(di);
hashsetResetIterator(&iter);
}
}

Expand All @@ -669,19 +669,19 @@ void ACLChangeSelectorPerm(aclSelector *selector, struct serverCommand *cmd, int
* value. Since the category passed by the user may be non existing, the
* function returns C_ERR if the category was not found, or C_OK if it was
* found and the operation was performed. */
void ACLSetSelectorCommandBitsForCategory(dict *commands, aclSelector *selector, uint64_t cflag, int value) {
dictIterator *di = dictGetIterator(commands);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
struct serverCommand *cmd = dictGetVal(de);
void ACLSetSelectorCommandBitsForCategory(hashset *commands, aclSelector *selector, uint64_t cflag, int value) {
hashsetIterator iter;
hashsetInitIterator(&iter, commands);
struct serverCommand *cmd;
while (hashsetNext(&iter, (void **)&cmd)) {
if (cmd->acl_categories & cflag) {
ACLChangeSelectorPerm(selector, cmd, value);
}
if (cmd->subcommands_dict) {
ACLSetSelectorCommandBitsForCategory(cmd->subcommands_dict, selector, cflag, value);
if (cmd->subcommands_set) {
ACLSetSelectorCommandBitsForCategory(cmd->subcommands_set, selector, cflag, value);
}
}
dictReleaseIterator(di);
hashsetResetIterator(&iter);
}

/* This function is responsible for recomputing the command bits for all selectors of the existing users.
Expand Down Expand Up @@ -732,26 +732,26 @@ int ACLSetSelectorCategory(aclSelector *selector, const char *category, int allo
return C_OK;
}

void ACLCountCategoryBitsForCommands(dict *commands,
void ACLCountCategoryBitsForCommands(hashset *commands,
aclSelector *selector,
unsigned long *on,
unsigned long *off,
uint64_t cflag) {
dictIterator *di = dictGetIterator(commands);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
struct serverCommand *cmd = dictGetVal(de);
hashsetIterator iter;
hashsetInitIterator(&iter, commands);
struct serverCommand *cmd;
while (hashsetNext(&iter, (void **)&cmd)) {
if (cmd->acl_categories & cflag) {
if (ACLGetSelectorCommandBit(selector, cmd->id))
(*on)++;
else
(*off)++;
}
if (cmd->subcommands_dict) {
ACLCountCategoryBitsForCommands(cmd->subcommands_dict, selector, on, off, cflag);
if (cmd->subcommands_set) {
ACLCountCategoryBitsForCommands(cmd->subcommands_set, selector, on, off, cflag);
}
}
dictReleaseIterator(di);
hashsetResetIterator(&iter);
}

/* Return the number of commands allowed (on) and denied (off) for the user 'u'
Expand Down Expand Up @@ -1163,7 +1163,7 @@ int ACLSetSelector(aclSelector *selector, const char *op, size_t oplen) {
return C_ERR;
}

if (cmd->subcommands_dict) {
if (cmd->subcommands_set) {
/* If user is trying to allow a valid subcommand we can just add its unique ID */
cmd = ACLLookupCommand(op + 1);
if (cmd == NULL) {
Expand Down Expand Up @@ -2754,22 +2754,22 @@ sds getAclErrorMessage(int acl_res, user *user, struct serverCommand *cmd, sds e
* ==========================================================================*/

/* ACL CAT category */
void aclCatWithFlags(client *c, dict *commands, uint64_t cflag, int *arraylen) {
dictEntry *de;
dictIterator *di = dictGetIterator(commands);
void aclCatWithFlags(client *c, hashset *commands, uint64_t cflag, int *arraylen) {
hashsetIterator iter;
hashsetInitIterator(&iter, commands);

while ((de = dictNext(di)) != NULL) {
struct serverCommand *cmd = dictGetVal(de);
struct serverCommand *cmd;
while (hashsetNext(&iter, (void **)&cmd)) {
if (cmd->acl_categories & cflag) {
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
(*arraylen)++;
}

if (cmd->subcommands_dict) {
aclCatWithFlags(c, cmd->subcommands_dict, cflag, arraylen);
if (cmd->subcommands_set) {
aclCatWithFlags(c, cmd->subcommands_set, cflag, arraylen);
}
}
dictReleaseIterator(di);
hashsetResetIterator(&iter);
}

/* Add the formatted response from a single selector to the ACL GETUSER
Expand Down
12 changes: 6 additions & 6 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2190,7 +2190,7 @@ static int rewriteFunctions(rio *aof) {
}

int rewriteAppendOnlyFileRio(rio *aof) {
dictEntry *de;
valkey *o;
int j;
long key_count = 0;
long long updated_time = 0;
Expand Down Expand Up @@ -2219,17 +2219,17 @@ int rewriteAppendOnlyFileRio(rio *aof) {

kvs_it = kvstoreIteratorInit(db->keys);
/* Iterate this DB writing every entry */
while ((de = kvstoreIteratorNext(kvs_it)) != NULL) {
while (kvstoreIteratorNext(kvs_it, (void **)&o)) {
sds keystr;
robj key, *o;
robj key;
long long expiretime;
size_t aof_bytes_before_key = aof->processed_bytes;

keystr = dictGetKey(de);
o = dictGetVal(de);
keystr = valkeyGetKey(o);
initStaticStringObject(key, keystr);

expiretime = getExpire(db, &key);
//expiretime = getExpire(db, &key);
expiretime = valkeyGetExpire(o);

/* Save the key and associated value */
if (o->type == OBJ_STRING) {
Expand Down
2 changes: 1 addition & 1 deletion src/bitops.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ robj *lookupStringForBitCommand(client *c, uint64_t maxbit, int *dirty) {

if (o == NULL) {
o = createObject(OBJ_STRING, sdsnewlen(NULL, byte + 1));
dbAdd(c->db, c->argv[1], o);
o = dbAdd(c->db, c->argv[1], o);
if (dirty) *dirty = 1;
} else {
o = dbUnshareStringValue(c->db, c->argv[1], o);
Expand Down
17 changes: 8 additions & 9 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ void restoreCommand(client *c) {
}

/* Create the key and set the TTL if any */
dbAdd(c->db, key, obj);
obj = dbAdd(c->db, key, obj);
if (ttl) {
setExpire(c, c->db, key, ttl);
if (!absttl) {
Expand Down Expand Up @@ -811,7 +811,7 @@ static int shouldReturnTlsInfo(void) {
}

unsigned int countKeysInSlot(unsigned int slot) {
return kvstoreDictSize(server.db->keys, slot);
return kvstoreHashsetSize(server.db->keys, slot);
}

void clusterCommandHelp(client *c) {
Expand Down Expand Up @@ -908,16 +908,15 @@ void clusterCommand(client *c) {
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c, numkeys);
kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictIterator(server.db->keys, slot);
kvstoreHashsetIterator *kvs_di = NULL;
valkey *valkey = NULL;
kvs_di = kvstoreGetHashsetIterator(server.db->keys, slot);
for (unsigned int i = 0; i < numkeys; i++) {
de = kvstoreDictIteratorNext(kvs_di);
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
serverAssert(kvstoreHashsetIteratorNext(kvs_di, (void **)&valkey));
sds sdskey = valkeyGetKey(valkey);
addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreReleaseHashsetIterator(kvs_di);
} else if ((!strcasecmp(c->argv[1]->ptr, "slaves") || !strcasecmp(c->argv[1]->ptr, "replicas")) && c->argc == 3) {
/* CLUSTER REPLICAS <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
Expand Down
16 changes: 8 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6034,16 +6034,16 @@ void removeChannelsInSlot(unsigned int slot) {
/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
if (!kvstoreDictSize(server.db->keys, hashslot)) return 0;
if (!kvstoreHashsetSize(server.db->keys, hashslot)) return 0;

unsigned int j = 0;

kvstoreDictIterator *kvs_di = NULL;
dictEntry *de = NULL;
kvs_di = kvstoreGetDictSafeIterator(server.db->keys, hashslot);
while ((de = kvstoreDictIteratorNext(kvs_di)) != NULL) {
kvstoreHashsetIterator *kvs_di = NULL;
valkey *valkey = NULL;
kvs_di = kvstoreGetHashsetSafeIterator(server.db->keys, hashslot);
while (kvstoreHashsetIteratorNext(kvs_di, (void **)&valkey)) {
enterExecutionUnit(1, 0);
sds sdskey = dictGetKey(de);
sds sdskey = valkeyGetKey(valkey);
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
Expand All @@ -6058,14 +6058,14 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
j++;
server.dirty++;
}
kvstoreReleaseDictIterator(kvs_di);
kvstoreReleaseHashsetIterator(kvs_di);

return j;
}

/* Get the count of the channels for a given slot. */
unsigned int countChannelsInSlot(unsigned int hashslot) {
return kvstoreDictSize(server.pubsubshard_channels, hashslot);
return kvstoreHashsetSize(server.pubsubshard_channels, hashslot);
}

clusterNode *getMyClusterNode(void) {
Expand Down
12 changes: 4 additions & 8 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ void loadServerConfigFromString(char *config) {
loadServerConfig(argv[1], 0, NULL);
} else if (!strcasecmp(argv[0], "rename-command") && argc == 3) {
struct serverCommand *cmd = lookupCommandBySds(argv[1]);
int retval;

if (!cmd) {
err = "No such command in rename-command";
Expand All @@ -541,16 +540,13 @@ void loadServerConfigFromString(char *config) {

/* If the target command name is the empty string we just
* remove it from the command table. */
retval = dictDelete(server.commands, argv[1]);
serverAssert(retval == DICT_OK);
serverAssert(hashsetDelete(server.commands, argv[1]));

/* Otherwise we re-add the command under a different name. */
if (sdslen(argv[2]) != 0) {
sds copy = sdsdup(argv[2]);

retval = dictAdd(server.commands, copy, cmd);
if (retval != DICT_OK) {
sdsfree(copy);
sdsfree(cmd->fullname);
cmd->fullname = sdsdup(argv[2]);
if (!hashsetAdd(server.commands, cmd)) {
err = "Target command name already exists";
goto loaderr;
}
Expand Down
Loading