Skip to content

Commit

Permalink
SHUTDOWN failover - wip
Browse files Browse the repository at this point in the history
  • Loading branch information
enjoy-binbin committed Sep 3, 2024
1 parent 981f977 commit 2fbc90e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 4 deletions.
9 changes: 9 additions & 0 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6645,6 +6645,15 @@ int clusterCommandSpecial(client *c) {
* generates a new configuration epoch for this node without
* consensus, claims the primary's slots, and broadcast the new
* configuration. */
if (c == server.primary) {
/* If takeover is initiated by the primary node, we set the
* primary_takeover flag and return, does not need a reply. */
serverLog(LL_NOTICE, "Taking over the primary (primary request).");
server.primary->flag.primary_takeover = 1;
clusterBumpConfigEpochWithoutConsensus();
clusterFailoverReplaceYourPrimary();
return 1;
}
serverLog(LL_NOTICE, "Taking over the primary (user request).");
clusterBumpConfigEpochWithoutConsensus();
clusterFailoverReplaceYourPrimary();
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3107,6 +3107,7 @@ standardConfig static_configs[] = {
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("auto-failover-on-shutdown", NULL, HIDDEN_CONFIG | MODIFIABLE_CONFIG, server.auto_failover_on_shutdown, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
12 changes: 10 additions & 2 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1669,7 +1669,7 @@ void freeClient(client *c) {
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.primary && c->flag.primary) {
if (server.primary && c->flag.primary && !c->flag.primary_takeover) {
serverLog(LL_NOTICE, "Connection with primary lost.");
if (!(c->flag.protocol_error || c->flag.blocked)) {
c->flag.close_asap = 0;
Expand Down Expand Up @@ -1771,7 +1771,15 @@ void freeClient(client *c) {

/* Primary/replica cleanup Case 2:
* we lost the connection with the primary. */
if (c->flag.primary) replicationHandlePrimaryDisconnection();
if (c->flag.primary) {
if (c->flag.primary_takeover) {
/* The primary triggers the takeover, so here reset primary_host
* to NULL to avoid the reconnection. */
sdsfree(server.primary_host);
server.primary_host = NULL;
}
replicationHandlePrimaryDisconnection();
}

/* Remove client from memory usage buckets */
if (c->mem_usage_bucket) {
Expand Down
5 changes: 3 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3802,7 +3802,8 @@ void replicationUnsetPrimary(void) {
* replicationHandlePrimaryDisconnection which can attempt to re-connect. */
sdsfree(server.primary_host);
server.primary_host = NULL;
if (server.primary) freeClient(server.primary);
if (server.primary && server.primary->flag.primary_takeover) freeClientAsync(server.primary);
if (server.primary && !server.primary->flag.primary_takeover) freeClient(server.primary);
replicationDiscardCachedPrimary();
cancelReplicationHandshake(0);
/* When a replica is turned into a primary, the current replication ID
Expand Down Expand Up @@ -4507,7 +4508,7 @@ void replicationCron(void) {
}

/* Check if we should connect to a PRIMARY */
if (server.repl_state == REPL_STATE_CONNECT) {
if (server.primary_host && server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE, "Connecting to PRIMARY %s:%d", server.primary_host, server.primary_port);
connectWithPrimary();
}
Expand Down
17 changes: 17 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -4295,6 +4295,7 @@ int finishShutdown(void) {
int force = server.shutdown_flags & SHUTDOWN_FORCE;

/* Log a warning for each replica that is lagging. */
client *best_replica = NULL;
listIter replicas_iter;
listNode *replicas_list_node;
int num_replicas = 0, num_lagging_replicas = 0;
Expand All @@ -4309,6 +4310,11 @@ int finishShutdown(void) {
replicationGetReplicaName(replica), server.primary_repl_offset - replica->repl_ack_off, lag,
replstateToString(replica->repl_state));
}
if (replica->repl_state == REPLICA_STATE_ONLINE) {
if (best_replica == NULL || replica->repl_ack_off > best_replica->repl_ack_off) {
best_replica = replica;
}
}
}
if (num_replicas > 0) {
serverLog(LL_NOTICE, "%d of %d replicas are in sync when shutting down.", num_replicas - num_lagging_replicas,
Expand Down Expand Up @@ -4408,6 +4414,17 @@ int finishShutdown(void) {
* send them pending writes. */
flushReplicasOutputBuffers();

if (server.auto_failover_on_shutdown && server.cluster_enabled && best_replica) {
/* Sending a CLUSTER FAILOVER TAKEOVER to the best replica. */
const char *buf = "*3\r\n$7\r\nCLUSTER\r\n$8\r\nFAILOVER\r\n$8\r\nTAKEOVER\r\n";
if (connWrite(best_replica->conn, buf, strlen(buf)) == (int)strlen(buf)) {
serverLog(LL_WARNING, "Sending CLUSTER FAILOVER TAIKEOVER to replica %s succeeded.",
replicationGetReplicaName(best_replica));
} else {
serverLog(LL_WARNING, "Failed to send CLUSTER FAILOVER TAIKEOVER to replica: %s", strerror(errno));
}
}

/* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1);

Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,7 @@ typedef struct ClientFlags {
* By using this flag, we ensure that the RDB client remains intact until the replica
* \ has successfully initiated PSYNC. */
uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */
uint64_t primary_takeover : 1; /* The primary actively requests CLUSTER FAILOVER TAKEOVER from this replica. */
uint64_t reserved : 7; /* Reserved for future use */
} ClientFlags;

Expand Down Expand Up @@ -2184,6 +2185,7 @@ struct valkeyServer {
unsigned long cluster_blacklist_ttl; /* Duration in seconds that a node is denied re-entry into
* the cluster after it is forgotten with CLUSTER FORGET. */
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
int auto_failover_on_shutdown; /* Trigger manual failover on shutdown to primary. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */
Expand Down

0 comments on commit 2fbc90e

Please sign in to comment.