diff --git a/src/rdb.c b/src/rdb.c index bc2d03e86c..33df3b5260 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1407,7 +1407,7 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { long key_counter = 0; int j; - if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; + if (server.rdb_checksum && !(rdbflags & RDBFLAGS_CKSUM_SKIP)) rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic, sizeof(magic), "REDIS%04d", RDB_VERSION); if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr; @@ -1451,6 +1451,7 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { * without doing any processing of the content. */ int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; + int skip_cksum_repl = RDBFLAGS_REPLICATION; startSaving(RDBFLAGS_REPLICATION); getRandomHexChars(eofmark, RDB_EOF_MARK_SIZE); @@ -1458,7 +1459,9 @@ int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) { if (rioWrite(rdb, "$EOF:", 5) == 0) goto werr; if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb, "\r\n", 2) == 0) goto werr; - if (rdbSaveRio(req, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr; + if (server.repl_diskless_sync && req & REPLICA_REQ_CHKSUM_SKIP) + skip_cksum_repl |= RDBFLAGS_CKSUM_SKIP; + if (rdbSaveRio(req, rdb, error, skip_cksum_repl, rsi) == C_ERR) goto werr; if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr; stopSaving(1); return C_OK; diff --git a/src/rdb.h b/src/rdb.h index e9d53fa398..0b7a49a001 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -133,6 +133,7 @@ #define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/ #define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/ #define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */ +#define RDBFLAGS_CKSUM_SKIP (1 << 5) /* Skip checksum for diskless sync. */ /* When rdbLoadObject() returns NULL, the err flag is * set to hold the type of error that occurred */ diff --git a/src/replication.c b/src/replication.c index e2941c6d9c..f1d4fac949 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1109,9 +1109,11 @@ void syncCommand(client *c) { } if (primaryTryPartialResynchronization(c, psync_offset) == C_OK) { + serverLog(LL_NOTICE, "===> completed partial resync"); server.stat_sync_partial_ok++; return; /* No full resync needed, return. */ } else { + serverLog(LL_NOTICE, "===> completed partial resync with failure"); char *primary_replid = c->argv[1]->ptr; /* Increment stats for failed PSYNCs, but only if the @@ -1271,6 +1273,9 @@ void syncCommand(client *c) { * - rdb-channel <1|0> * Used to identify the client as a replica's rdb connection in an dual channel * sync session. + * + * - repl-diskless-load <1|0> + * Replica is capable of load data from replication stream, request to skip checksum. * */ void replconfCommand(client *c) { int j; @@ -1313,6 +1318,13 @@ void replconfCommand(client *c) { * replconf option. */ c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL; } + } else if (!strcasecmp(c->argv[j]->ptr, "repl-diskless-load")) { + /* REPLCONF repl-diskless-load is used to identify the client is capable of + * load directly without creating rdb file */ + long rdb_diskless_load = 0; + if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], 0, 1, &rdb_diskless_load, NULL) != C_OK) return; + if (rdb_diskless_load == 1) + c->replica_req |= REPLICA_REQ_CHKSUM_SKIP; } else if (!strcasecmp(c->argv[j]->ptr, "ack")) { /* REPLCONF ACK is used by replica to inform the primary the amount * of replication stream that it processed so far. It is an @@ -2632,7 +2644,7 @@ static void fullSyncWithPrimary(connection *conn) { /* Send replica lisening port to primary for clarification */ sds portstr = getReplicaPortString(); err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1", "rdb-channel", "1", "listening-port", - portstr, NULL); + portstr, "repl-diskless-load", useDisklessLoad() ? "1" : "0", NULL); sdsfree(portstr); if (err) { serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", err); @@ -3427,6 +3439,11 @@ void syncWithPrimary(connection *conn) { if (err) goto write_error; } + if (useDisklessLoad()) { + err = sendCommand(conn, "REPLCONF", "repl-diskless-load", "1", NULL); + if (err) goto write_error; + } + /* Inform the primary of our (replica) capabilities. * * EOF: supports EOF-style RDB transfer for diskless replication. diff --git a/src/server.h b/src/server.h index 29d675bb18..e53ffe370a 100644 --- a/src/server.h +++ b/src/server.h @@ -438,6 +438,8 @@ typedef enum { #define REPLICA_REQ_RDB_EXCLUDE_DATA (1 << 0) /* Exclude data from RDB */ #define REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS (1 << 1) /* Exclude functions from RDB */ #define REPLICA_REQ_RDB_CHANNEL (1 << 2) /* Use dual-channel-replication */ +#define REPLICA_REQ_CHKSUM_SKIP (1 << 3) /* Exclude checksum from RDB */ + /* Mask of all bits in the replica requirements bitfield that represent non-standard (filtered) RDB requirements */ #define REPLICA_REQ_RDB_MASK (REPLICA_REQ_RDB_EXCLUDE_DATA | REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS)