Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip checksum for diskless replication #1180

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
long key_counter = 0;
int j;

if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum;
if (server.rdb_checksum && !(rdbflags & RDBFLAGS_CKSUM_SKIP)) rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr;
Expand Down Expand Up @@ -1451,14 +1451,17 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
int skip_cksum_repl = RDBFLAGS_REPLICATION;

startSaving(RDBFLAGS_REPLICATION);
getRandomHexChars(eofmark, RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb, "$EOF:", 5) == 0) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb, "\r\n", 2) == 0) goto werr;
if (rdbSaveRio(req, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr;
if (server.repl_diskless_sync && req & REPLICA_REQ_CHKSUM_SKIP)
skip_cksum_repl |= RDBFLAGS_CKSUM_SKIP;
if (rdbSaveRio(req, rdb, error, skip_cksum_repl, rsi) == C_ERR) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
return C_OK;
Expand Down
1 change: 1 addition & 0 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
#define RDBFLAGS_CKSUM_SKIP (1 << 5) /* Skip checksum for diskless sync. */

/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */
Expand Down
19 changes: 18 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1109,9 +1109,11 @@ void syncCommand(client *c) {
}

if (primaryTryPartialResynchronization(c, psync_offset) == C_OK) {
serverLog(LL_NOTICE, "===> completed partial resync");
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
serverLog(LL_NOTICE, "===> completed partial resync with failure");
char *primary_replid = c->argv[1]->ptr;

/* Increment stats for failed PSYNCs, but only if the
Expand Down Expand Up @@ -1271,6 +1273,9 @@ void syncCommand(client *c) {
* - rdb-channel <1|0>
* Used to identify the client as a replica's rdb connection in an dual channel
* sync session.
*
* - repl-diskless-load <1|0>
* Replica is capable of load data from replication stream, request to skip checksum.
* */
void replconfCommand(client *c) {
int j;
Expand Down Expand Up @@ -1313,6 +1318,13 @@ void replconfCommand(client *c) {
* replconf option. */
c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL;
}
} else if (!strcasecmp(c->argv[j]->ptr, "repl-diskless-load")) {
/* REPLCONF repl-diskless-load is used to identify the client is capable of
* load directly without creating rdb file */
long rdb_diskless_load = 0;
if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], 0, 1, &rdb_diskless_load, NULL) != C_OK) return;
if (rdb_diskless_load == 1)
c->replica_req |= REPLICA_REQ_CHKSUM_SKIP;
} else if (!strcasecmp(c->argv[j]->ptr, "ack")) {
/* REPLCONF ACK is used by replica to inform the primary the amount
* of replication stream that it processed so far. It is an
Expand Down Expand Up @@ -2632,7 +2644,7 @@ static void fullSyncWithPrimary(connection *conn) {
/* Send replica lisening port to primary for clarification */
sds portstr = getReplicaPortString();
err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1", "rdb-channel", "1", "listening-port",
portstr, NULL);
portstr, "repl-diskless-load", useDisklessLoad() ? "1" : "0", NULL);
sdsfree(portstr);
if (err) {
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", err);
Expand Down Expand Up @@ -3427,6 +3439,11 @@ void syncWithPrimary(connection *conn) {
if (err) goto write_error;
}

if (useDisklessLoad()) {
err = sendCommand(conn, "REPLCONF", "repl-diskless-load", "1", NULL);
if (err) goto write_error;
}

/* Inform the primary of our (replica) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ typedef enum {
#define REPLICA_REQ_RDB_EXCLUDE_DATA (1 << 0) /* Exclude data from RDB */
#define REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS (1 << 1) /* Exclude functions from RDB */
#define REPLICA_REQ_RDB_CHANNEL (1 << 2) /* Use dual-channel-replication */
#define REPLICA_REQ_CHKSUM_SKIP (1 << 3) /* Exclude checksum from RDB */

/* Mask of all bits in the replica requirements bitfield that represent non-standard (filtered) RDB requirements */
#define REPLICA_REQ_RDB_MASK (REPLICA_REQ_RDB_EXCLUDE_DATA | REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS)

Expand Down