Skip to content

Commit

Permalink
Fix data loss when the old primary takes over the slots after online
Browse files Browse the repository at this point in the history
There is a race in clusterHandleConfigEpochCollision, which may cause
the old primary node to take over the slots again after coming online
and cause data loss. It happens when the old primary and the new primary
have the same config epoch, and the old primary has a smaller node id
and win the collision.

In this case, the old primary and the new primary are in the same shard,
we are not sure which is strictly the latest. To prevent data loss,
now in clusterHandleConfigEpochCollision we will let the node with the
larger offset win the conflict.

In addition to this change, when a node increments the config epoch
throught conflicts, or CLUSTER FAILOVER TAKEOVER, or CLUSTER BUMPEPOCH,
we will send PONGs to all ndoes to allow the cluster to reach consensus
on the new config epoch more quickly.

This also can closes valkey-io#969.

Signed-off-by: Binbin <[email protected]>

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Aug 31, 2024
1 parent fea49bc commit 273ccbf
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
39 changes: 35 additions & 4 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ int verifyClusterNodeId(const char *name, int length);
sds clusterEncodeOpenSlotsAuxField(int rdbflags);
int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);

#define CLUSTER_BROADCAST_ALL 0
#define CLUSTER_BROADCAST_LOCAL_REPLICAS 1
void clusterBroadcastPong(int target);

/* Only primaries that own slots have voting rights.
* Returns 1 if the node has voting rights, otherwise returns 0. */
static inline int clusterNodeIsVotingPrimary(clusterNode *n) {
Expand Down Expand Up @@ -1830,6 +1834,7 @@ int clusterBumpConfigEpochWithoutConsensus(void) {
if (myself->configEpoch == 0 || myself->configEpoch != maxEpoch) {
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);
serverLog(LL_NOTICE, "New configEpoch set to %llu", (unsigned long long)myself->configEpoch);
return C_OK;
Expand Down Expand Up @@ -1880,6 +1885,14 @@ int clusterBumpConfigEpochWithoutConsensus(void) {
* with the conflicting epoch (the 'sender' node), it will assign itself
* the greatest configuration epoch currently detected among nodes plus 1.
*
* The above is an optimistic scenario. It this node and the sender node
* are in the same shard, their conflict in configEpoch indicates that a
* node has experienced a partition. Or for example, the old primary node
* was down then up again, and the new primary node won the election. In
* this case, we need to take the replication offset into consideration,
* otherwise, if the old primary wins the collision, we will lose some of
* the new primary's data.
*
* This means that even if there are multiple nodes colliding, the node
* with the greatest Node ID never moves forward, so eventually all the nodes
* end with a different configuration epoch.
Expand All @@ -1888,11 +1901,31 @@ void clusterHandleConfigEpochCollision(clusterNode *sender) {
/* Prerequisites: nodes have the same configEpoch and are both primaries. */
if (sender->configEpoch != myself->configEpoch || !clusterNodeIsPrimary(sender) || !clusterNodeIsPrimary(myself))
return;
/* Don't act if the colliding node has a smaller Node ID. */
if (memcmp(sender->name, myself->name, CLUSTER_NAMELEN) <= 0) return;

/* If sender and myself are in the same shard, the one with the
* bigger offset will win. Otherwise if sender and myself are not
* in the same shard, the one will the lexicographically small
* Node ID will win.*/
if (areInSameShard(sender, myself)) {
long long sender_offset = getNodeReplicationOffset(sender);
long long myself_offset = getNodeReplicationOffset(myself);
if (sender_offset > myself_offset) {
/* Don't act if the colliding node has a bigger offset. */
return;
} else if (sender_offset == myself_offset) {
/* If the offset are the same, we fall back to Node ID logic.
* Don't act if the colliding node has a smaller Node ID. */
if (memcmp(sender->name, myself->name, CLUSTER_NAMELEN) <= 0) return;
}
} else {
/* Don't act if the colliding node has a smaller Node ID. */
if (memcmp(sender->name, myself->name, CLUSTER_NAMELEN) <= 0) return;
}

/* Get the next ID available at the best of this node knowledge. */
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterSaveConfigOrDie(1);
serverLog(LL_NOTICE, "configEpoch collision with node %.40s (%s). configEpoch set to %llu", sender->name,
sender->human_nodename, (unsigned long long)myself->configEpoch);
Expand Down Expand Up @@ -4001,8 +4034,6 @@ void clusterSendPing(clusterLink *link, int type) {
* CLUSTER_BROADCAST_ALL -> All known instances.
* CLUSTER_BROADCAST_LOCAL_REPLICAS -> All replicas in my primary-replicas ring.
*/
#define CLUSTER_BROADCAST_ALL 0
#define CLUSTER_BROADCAST_LOCAL_REPLICAS 1
void clusterBroadcastPong(int target) {
dictIterator *di;
dictEntry *de;
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/cluster/manual-takeover.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ set paused_pid [srv 0 pid]
set paused_pid1 [srv -1 pid]
set paused_pid2 [srv -2 pid]
test "Killing majority of master nodes" {
# Bumping the epochs to increase the chance of conflicts.
R 0 cluster bumpepoch
pause_process $paused_pid
R 1 cluster bumpepoch
pause_process $paused_pid1
R 2 cluster bumpepoch
pause_process $paused_pid2
}

Expand Down

0 comments on commit 273ccbf

Please sign in to comment.