Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid expiration and eviction during data syncing #1185

Open
wants to merge 8 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6648,6 +6648,11 @@ int clusterCommandSpecial(client *c) {
return 1;
}

if (server.import_mode) {
addReplyError(c, "CLUSTER REPLICATE not allowed in import mode.");
return 1;
}

/* If the instance is currently a primary, it should have no assigned
* slots nor keys to accept to replicate some other node.
* Replicas can switch to another primary without issues. */
Expand Down
29 changes: 29 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,34 @@ struct COMMAND_ARG CLIENT_CAPA_Args[] = {
#define CLIENT_ID_Keyspecs NULL
#endif

/********** CLIENT IMPORT_SOURCE ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLIENT IMPORT_SOURCE history */
#define CLIENT_IMPORT_SOURCE_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLIENT IMPORT_SOURCE tips */
#define CLIENT_IMPORT_SOURCE_Tips NULL
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLIENT IMPORT_SOURCE key specs */
#define CLIENT_IMPORT_SOURCE_Keyspecs NULL
#endif

/* CLIENT IMPORT_SOURCE enabled argument table */
struct COMMAND_ARG CLIENT_IMPORT_SOURCE_enabled_Subargs[] = {
{MAKE_ARG("on",ARG_TYPE_PURE_TOKEN,-1,"ON",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("off",ARG_TYPE_PURE_TOKEN,-1,"OFF",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLIENT IMPORT_SOURCE argument table */
struct COMMAND_ARG CLIENT_IMPORT_SOURCE_Args[] = {
{MAKE_ARG("enabled",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_IMPORT_SOURCE_enabled_Subargs},
};

/********** CLIENT INFO ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -1630,6 +1658,7 @@ struct COMMAND_STRUCT CLIENT_Subcommands[] = {
{MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)},
{MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)},
{MAKE_CMD("id","Returns the unique client ID of the connection.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_ID_History,0,CLIENT_ID_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_ID_Keyspecs,0,NULL,0)},
{MAKE_CMD("import-source","Mark this client as an import source when server is in import mode.","O(1)","8.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_IMPORT_SOURCE_History,0,CLIENT_IMPORT_SOURCE_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_IMPORT_SOURCE_Keyspecs,0,NULL,1),.args=CLIENT_IMPORT_SOURCE_Args},
{MAKE_CMD("info","Returns information about the connection.","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_INFO_History,0,CLIENT_INFO_Tips,1,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_INFO_Keyspecs,0,NULL,0)},
{MAKE_CMD("kill","Terminates open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_KILL_History,7,CLIENT_KILL_Tips,0,clientCommand,-3,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_KILL_Keyspecs,0,NULL,1),.args=CLIENT_KILL_Args},
{MAKE_CMD("list","Lists open connections.","O(N) where N is the number of client connections","2.4.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_LIST_History,7,CLIENT_LIST_Tips,1,clientCommand,-2,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_LIST_Keyspecs,0,NULL,2),.args=CLIENT_LIST_Args},
Expand Down
40 changes: 40 additions & 0 deletions src/commands/client-import-source.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"IMPORT-SOURCE": {
"summary": "Mark this client as an import source when server is in import mode.",
"complexity": "O(1)",
"group": "connection",
"since": "8.2.0",
"arity": 3,
"container": "CLIENT",
"function": "clientCommand",
"command_flags": [
"NOSCRIPT",
"LOADING",
"STALE"
],
"acl_categories": [
"CONNECTION"
],
"reply_schema": {
"const": "OK"
},
"arguments": [
{
"name": "enabled",
"type": "oneof",
"arguments": [
{
"name": "on",
"type": "pure-token",
"token": "ON"
},
{
"name": "off",
"type": "pure-token",
"token": "OFF"
}
]
}
]
}
}
14 changes: 14 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,14 @@ static int isValidActiveDefrag(int val, const char **err) {
return 1;
}

static int isValidImportMode(int val, const char **err) {
if (server.primary_host && val) {
*err = "Server is already in replica mode";
return 0;
}
return 1;
}

static int isValidClusterConfigFile(char *val, const char **err) {
if (!strcmp(val, "")) {
*err = "cluster-config-file can't be empty";
Expand Down Expand Up @@ -2949,6 +2957,11 @@ static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc,
return 0;
}

if (server.import_mode) {
*err = "REPLICAOF not allowed in import mode";
return 0;
}

sdsfree(server.primary_host);
server.primary_host = NULL;
if (!strcasecmp(argv[0], "no") && !strcasecmp(argv[1], "one")) {
Expand Down Expand Up @@ -3136,6 +3149,7 @@ standardConfig static_configs[] = {
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, MODIFIABLE_CONFIG, server.import_mode, 0, isValidImportMode, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
5 changes: 4 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ robj *dbRandomKey(serverDb *db) {
key = dictGetKey(de);
keyobj = createStringObject(key, sdslen(key));
if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) {
if (allvolatile && server.primary_host && --maxtries == 0) {
if (allvolatile && (server.primary_host || server.import_mode) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
* expired in the repilca, so the function cannot stop because
Expand Down Expand Up @@ -1826,6 +1826,9 @@ keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int di
if (server.primary_host != NULL) {
if (server.current_client && (server.current_client->flag.primary)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
} else if (server.import_mode) {
if (server.current_client && (server.current_client->flag.import_source)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
}

/* In some cases we're explicitly instructed to return an indication of a
Expand Down
4 changes: 2 additions & 2 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ int performEvictions(void) {
goto update_metrics;
}

if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) {
result = EVICT_FAIL; /* We need to free memory, but policy forbids. */
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION || server.import_mode) {
result = EVICT_FAIL; /* We need to free memory, but policy forbids or we are in import mode. */
goto update_metrics;
}

Expand Down
2 changes: 1 addition & 1 deletion src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ int checkAlreadyExpired(long long when) {
*
* Instead we add the already expired key to the database with expire time
* (possibly in the past) and wait for an explicit DEL from the primary. */
return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host);
return (when <= commandTimeSnapshot() && !server.loading && !server.primary_host && !server.import_mode);
}

#define EXPIRE_NX (1 << 0)
Expand Down
20 changes: 20 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -3567,6 +3567,10 @@ void clientCommand(client *c) {
" Protect current client connection from eviction.",
"NO-TOUCH (ON|OFF)",
" Will not touch LRU/LFU stats when this mode is on.",
"IMPORT-SOURCE (ON|OFF)",
" Mark this connection as an import source if server.import_mode is true.",
" Sync tools can set their connections into 'import-source' state to visit",
" expired keys.",
NULL};
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr, "id") && c->argc == 2) {
Expand Down Expand Up @@ -4040,6 +4044,22 @@ void clientCommand(client *c) {
}
}
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "import-source")) {
/* CLIENT IMPORT-SOURCE ON|OFF */
if (!server.import_mode) {
addReplyError(c, "Server is not in import mode");
return;
}
if (!strcasecmp(c->argv[2]->ptr, "on")) {
c->flag.import_source = 1;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[2]->ptr, "off")) {
c->flag.import_source = 0;
addReply(c, shared.ok);
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
} else {
addReplySubcommandSyntaxError(c);
}
Expand Down
5 changes: 5 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3961,6 +3961,11 @@ void replicaofCommand(client *c) {
return;
}

if (server.import_mode) {
addReplyError(c, "REPLICAOF not allowed in import mode.");
return;
}

if (server.failover_state != NO_FAILOVER) {
addReplyError(c, "REPLICAOF not allowed while failing over.");
return;
Expand Down
5 changes: 3 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ void clientsCron(void) {
void databasesCron(void) {
/* Expire keys by random sampling. Not required for replicas
* as primary will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (server.active_expire_enabled && !server.import_mode) {
if (iAmPrimary()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
Expand Down Expand Up @@ -1651,7 +1651,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {

/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
if (server.active_expire_enabled && !server.import_mode && iAmPrimary()) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

if (moduleCount()) {
moduleFireServerEvent(VALKEYMODULE_EVENT_EVENTLOOP, VALKEYMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP, NULL);
Expand Down Expand Up @@ -2057,6 +2057,7 @@ void initServerConfig(void) {
server.extended_redis_compat = 0;
server.pause_cron = 0;
server.dict_resizing = 1;
server.import_mode = 0;

server.latency_tracking_info_percentiles_len = 3;
server.latency_tracking_info_percentiles = zmalloc(sizeof(double) * (server.latency_tracking_info_percentiles_len));
Expand Down
5 changes: 4 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,8 @@ typedef struct ClientFlags {
* knows that it does not need the cache and required a full sync. With this
* flag, we won't cache the primary in freeClient. */
uint64_t fake : 1; /* This is a fake client without a real connection. */
uint64_t reserved : 5; /* Reserved for future use */
uint64_t import_source : 1; /* This client is importing data to server and can visit expired key. */
uint64_t reserved : 4; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down Expand Up @@ -2070,6 +2071,8 @@ struct valkeyServer {
char primary_replid[CONFIG_RUN_ID_SIZE + 1]; /* Primary PSYNC runid. */
long long primary_initial_offset; /* Primary PSYNC offset. */
int repl_replica_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Import Mode */
int import_mode; /* If true, server is in import mode and forbid expiration and eviction. */
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */
int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */
Expand Down
74 changes: 74 additions & 0 deletions tests/unit/expire.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,80 @@ start_server {tags {"expire"}} {
close_replication_stream $repl
assert_equal [r debug set-active-expire 1] {OK}
} {} {needs:debug}

test {Import mode should forbid active expiration} {
r flushall

r config set import-mode yes
assert_equal [r client import-source on] {OK}

r set foo1 bar PX 1
r set foo2 bar PX 1
after 100

assert_equal [r dbsize] {2}

assert_equal [r client import-source off] {OK}
r config set import-mode no

# Verify all keys have expired
wait_for_condition 40 100 {
[r dbsize] eq 0
} else {
fail "Keys did not actively expire."
}
}

test {Import mode should forbid lazy expiration} {
r flushall
r debug set-active-expire 0

r config set import-mode yes
assert_equal [r client import-source on] {OK}

r set foo1 1 PX 1
after 10

r get foo1
assert_equal [r dbsize] {1}

assert_equal [r client import-source off] {OK}
r config set import-mode no

r get foo1

assert_equal [r dbsize] {0}

assert_equal [r debug set-active-expire 1] {OK}
} {} {needs:debug}

test {RANDOMKEY can return expired key in import mode} {
r flushall

r config set import-mode yes
assert_equal [r client import-source on] {OK}

r set foo1 bar PX 1
after 10

set client [valkey [srv "host"] [srv "port"] 0 $::tls]
if {!$::singledb} {
$client select 9
}
assert_equal [$client ttl foo1] {-2}

assert_equal [r randomkey] {foo1}

assert_equal [r client import-source off] {OK}
r config set import-mode no

# Verify all keys have expired
wait_for_condition 40 100 {
[r dbsize] eq 0
} else {
fail "Keys did not actively expire."
}
}
}

start_cluster 1 0 {tags {"expire external:skip cluster"}} {
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/maxmemory.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,21 @@ start_server {tags {"maxmemory" "external:skip"}} {
assert {[r object freq foo] == 5}
}
}

start_server {tags {"maxmemory" "external:skip"}} {
test {Import mode should forbid eviction} {
r set key val
r config set import-mode yes
assert_equal [r client import-source on] {OK}
r config set maxmemory-policy allkeys-lru
r config set maxmemory 1

assert_equal [r dbsize] {1}
assert_error {OOM command not allowed*} {r set key1 val1}

assert_equal [r client import-source off] {OK}
r config set import-mode no

assert_equal [r dbsize] {0}
}
}
7 changes: 7 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,13 @@ replica-priority 100
#
# replica-ignore-disk-write-errors no

# Make the primary forbid expiration and eviction.
# This is useful for sync tools, because expiration and eviction may cause the data corruption.
# Sync tools can mark their connections as importing source by CLIENT IMPORT-SOURCE.
# NOTICE: Clients should avoid writing the same key on the source server and the destination server.
#
# import-mode no

# -----------------------------------------------------------------------------
# By default, Sentinel includes all replicas in its reports. A replica
# can be excluded from Sentinel's announcements. An unannounced replica
Expand Down
Loading