Skip to content

Commit

Permalink
[Cluster] await methods for becoming, or discovering a leader
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Apr 24, 2023
1 parent 9c98ff6 commit 60502d1
Showing 1 changed file with 101 additions and 1 deletion.
102 changes: 101 additions & 1 deletion Sources/DistributedCluster/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,109 @@ public struct ClusterControl {
}
}

/// Perform a **local change** to the membership such that the leader of the cluster is assumed to be `member`.
///
/// In order to implement various leader election mechanisms, users are free to choose whichever mechanism
/// suits them: rely on an external system, implement a leader election
///
/// > NOTE: This change is NOT applied to other nodes via gossip./
///
/// If there are any pending leader actions stashed on the new leader (such as downing nodes),
/// they will be executed as soon as it becomes the leader.
///
/// If the passed in event applied to the current membership is an effective change,
/// the change will be published using the `system.cluster.events`. On the contrary, if the change either was
/// already caused independently, or of the target leader already is the leader, no new event will be emitted.
///
/// - Parameter member: the change to apply to the cluster.
public func assumeLeader(_ member: Cluster.Member) {
// old leader does not matter for applying this change, we assume the new leader based on external decisions.
guard let change = Cluster.LeadershipChange(oldLeader: nil, newLeader: member) else {
fatalError("Impossible that leadership change would be ineffective when moving from 'nil' to: \(member)")
}
self.ref.tell(.requestMembershipChange(.leadershipChange(change)))
}

/// Wait, within the given duration, for a leader to be found in the cluster membership and have **at least** the specified status.
///
/// - Parameters
/// - node: The node to be joined by this system.
/// - atLeastStatus: The minimum expected member status.
/// - within: Duration to wait for.
///
/// - Returns `Cluster.Member` for the joined node with the minimum expected status.
/// If the expected status is at least `.down` or `.removed`, and either a tombstone exists for the node or the associated
/// membership is not found, the `Cluster.Member` returned would have `.removed` status and *unreachable*.
@discardableResult
public func waitForLeader(atLeast atLeastStatus: Cluster.MemberStatus, within: Duration, file: String = #fileID, line: UInt = #line) async throws -> Cluster.Member? {
try await self.waitForMembershipEventually(Cluster.Member?.self, within: within, file: file, line: line) { membership in
guard let foundLeader = membership.leader else {
if atLeastStatus == .down || atLeastStatus == .removed {
return nil
}
throw Cluster.MembershipError(.notFoundAny(endpoint, in: membership), file: file, line: line)
}

guard foundLeader.status >= atLeastStatus else {
throw Cluster.MembershipError(.atLeastStatusRequirementNotMet(expectedAtLeast: atLeastStatus, found: foundLeader), file: file, line: line)
}
return foundLeader
}
}

/// Wait, within the given duration, for this node to become a leader of the cluster.
@discardableResult
public func waitToBecomeLeader(file: String = #fileID, line: UInt = #line) async throws -> Cluster.Member {
for await event in self.events {
switch event {
case .leadershipChange(let leaderChanged):
guard let newLeader = leaderChanged.newLeader else {
continue // keep waiting
}
guard newLeader.node == self.node else {
continue // keep waiting, someone else has become leader
}
return newLeader // yay, we have become the leader!

case .membershipChange(let change):
guard change.member.node == self.node else {
// change about some other node, we're not concerned about those here
continue
}
if change.member.status.isDown || change.member.status.isRemoved {
// If we became down, we'll never become leader
throw Cluster.MembershipError(.statusRequirementNotMet(expected: .joining, found: change.member))
}
continue

case .snapshot(let snapshot):
guard let member = snapshot.member(self.node) else {
continue
}
if member.status.isDown || member.status.isRemoved {
// If we became down, we'll never become leader
throw Cluster.MembershipError(.statusRequirementNotMet(expected: .joining, found: member))
}
continue

default:
continue
}
}

/// We broke out of looking at the events before becoming the leader, throe that we didn't find "it";
/// This likely would happen if the task running the wait method would have been cancelled.
if Task.isCancelled {
throw CancellationError()
}

throw await Cluster.MembershipError(.notFound(self.node, in: self.membershipSnapshot))
}

private func waitForMembershipEventually<T>(_: T.Type = T.self,
within: Duration,
interval: Duration = .milliseconds(100),
file: String = #fileID, line: UInt = #line,
_ block: (Cluster.Membership) async throws -> T) async throws -> T
{
let deadline = ContinuousClock.Instant.fromNow(within)
Expand All @@ -359,6 +459,6 @@ public struct ClusterControl {
}
}

throw Cluster.MembershipError(.awaitStatusTimedOut(within, lastError))
throw Cluster.MembershipError(.awaitStatusTimedOut(within, lastError), file: file, line: line)
}
}

0 comments on commit 60502d1

Please sign in to comment.