Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swift-clusterd PoC which serves as seed node #1155

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,19 @@ var targets: [PackageDescription.Target] = [
// Depend on tests to run:
"DistributedActorsMultiNodeTests",

// Dependencies:
"MultiNodeTestKit",
.product(name: "ArgumentParser", package: "swift-argument-parser"),
]
),

.executableTarget(
name: "Clusterd",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename to swift-clusterd now? Or at least lowercased clusterd would be better.

dependencies: [
"DistributedCluster",
.product(name: "ArgumentParser", package: "swift-argument-parser"),
]
),

// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Multi Node Tests

Expand Down
46 changes: 46 additions & 0 deletions Sources/Clusterd/boot+ClusterD.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2020-2024 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedCluster

import ArgumentParser

@main
struct ClusterDBoot: AsyncParsableCommand {
@Option(name: .shortAndLong, help: "The port to bind the cluster daemon on.")
var port: Int = ClusterDaemon.defaultEndpoint.port

@Option(help: "The host address to bid the cluster daemon on.")
var host: String = ClusterDaemon.defaultEndpoint.host

mutating func run() async throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not be mutating? 🤔

let daemon = await ClusterSystem.startClusterDaemon(configuredWith: self.configure)

#if DEBUG
daemon.system.log.warning("RUNNING ClusterD DEBUG binary, operation is likely to be negatively affected. Please build/run the ClusterD process using '-c release' configuration!")
#endif

try await daemon.system.park()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await is not needed, for now at least

}

func configure(_ settings: inout ClusterSystemSettings) {
// other nodes will be discovering us, not the opposite
settings.discovery = .init(static: [])

settings.endpoint = Cluster.Endpoint(
systemName: "clusterd",
host: host,
port: port)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ extension ActorTestProbe where Message == _Reception.Listing<_ActorRef<String>>
public func eventuallyExpectListing(
expected: Set<_ActorRef<String>>, within timeout: Duration,
verbose: Bool = false,
file: StaticString = #filePath, line: UInt = #line, column: UInt = #column
file: StaticString = #fileID, line: UInt = #line, column: UInt = #column
) throws {
do {
let listing = try self.fishForMessages(within: timeout, file: file, line: line) {
Expand Down
6 changes: 6 additions & 0 deletions Sources/DistributedCluster/Cluster/DiscoveryShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ final class DiscoveryShell {

var behavior: _Behavior<Message> {
.setup { context in

// FIXME: should have a behavior to bridge the async world...
context.log.info("Initializing discovery: \(self.settings.implementation)")
self.settings.initialize(context.system)
context.log.info("Initializing discovery, done.")

self.subscription = self.settings.subscribe(onNext: { result in
switch result {
case .success(let instances):
Expand Down
90 changes: 90 additions & 0 deletions Sources/DistributedCluster/ClusterSystem+Clusterd.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Atomics
import Backtrace
import CDistributedActorsMailbox
import Dispatch
@_exported import Distributed
import DistributedActorsConcurrencyHelpers
import Foundation // for UUID
import Logging
import NIO

extension ClusterSystem {
public static func startClusterDaemon(configuredWith configureSettings: (inout ClusterSystemSettings) -> Void = { _ in () }) async -> ClusterDaemon {
let system = await ClusterSystem("clusterd") { settings in
settings.endpoint = ClusterDaemon.defaultEndpoint
configureSettings(&settings)
}

return ClusterDaemon(system: system)
}
}

public struct ClusterDaemon {
public let system: ClusterSystem
public var settings: ClusterSystemSettings {
system.settings
}

public init(system: ClusterSystem) {
self.system = system
}
}

extension ClusterDaemon {

/// Suspends until the ``ClusterSystem`` is terminated by a call to ``shutdown()``.
public var terminated: Void {
get async throws {
try await self.system.terminated
}
}

/// Returns `true` if the system was already successfully terminated (i.e. awaiting ``terminated`` would resume immediately).
public var isTerminated: Bool {
self.system.isTerminated
}

/// Forcefully stops this actor system and all actors that live within it.
/// This is an asynchronous operation and will be executed on a separate thread.
///
/// You can use `shutdown().wait()` to synchronously await on the system's termination,
/// or provide a callback to be executed after the system has completed it's shutdown.
///
/// - Returns: A `Shutdown` value that can be waited upon until the system has completed the shutdown.
@discardableResult
public func shutdown() throws -> ClusterSystem.Shutdown {
try self.system.shutdown()
}
}

extension ClusterDaemon {
/// The default endpoint
public static let defaultEndpoint = Cluster.Endpoint(host: "127.0.0.1", port: 3137)
}

internal distributed actor ClusterDaemonServant {
typealias ActorSystem = ClusterSystem

@ActorID.Metadata(\.wellKnown)
public var wellKnownName: String

init(system: ClusterSystem) async {
self.actorSystem = system
self.wellKnownName = "$cluster-daemon-servant"
}

}
5 changes: 5 additions & 0 deletions Sources/DistributedCluster/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
public func wait() async throws {
// TODO: implement without blocking the internal task;
try await Task.detached {
print("BLOCKING ON receptacle")
if let error = self.receptacle.wait() {
throw error
}
Expand Down Expand Up @@ -1049,6 +1050,10 @@ extension ClusterSystem {
if let wellKnownName = actor.id.metadata.wellKnown {
self._managedWellKnownDistributedActors[wellKnownName] = actor
}

// if let receptionID = actor.id.metadata.receptionID {
// self.receptionist.checkIn(actor)
// }
}

/// Advertise to the cluster system that a "well known" distributed actor has become ready.
Expand Down
42 changes: 41 additions & 1 deletion Sources/DistributedCluster/ClusterSystemSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -404,31 +404,65 @@ protocol ClusterSystemInstrumentationProvider {
/// all the nodes of an existing cluster.
public struct ServiceDiscoverySettings {
let implementation: ServiceDiscoveryImplementation
private let _initialize: (ClusterSystem) -> Void
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit unclear naming, took some time to figure out, guess something more specific about cluster daemon, like _initializeClusterd. Wonder also if it should be optional and only set in init(clusterdEndpoint: Cluster.Endpoint), otherwise again—not very clear function.

private let _subscribe: (@escaping (Result<[Cluster.Endpoint], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken?

public init<Discovery, S>(_ implementation: Discovery, service: S)
where Discovery: ServiceDiscovery, Discovery.Instance == Cluster.Endpoint,
S == Discovery.Service
{
self.implementation = .dynamic(AnyServiceDiscovery(implementation))
self._initialize = { _ in }
self._subscribe = { onNext, onComplete in
implementation.subscribe(to: service, onNext: onNext, onComplete: onComplete)
}
}

init(clusterdEndpoint: Cluster.Endpoint) {
self.implementation = .clusterDaemon(clusterdEndpoint)

self._initialize = { system in
system.log.info("Joining [clusterd] at \(clusterdEndpoint)")
system.cluster.join(endpoint: clusterdEndpoint)
}
self._subscribe = { onNext, onComplete in
return nil
}
}

/// Locate the default `ClusterD` process and use it for discovering cluster nodes.
///
///
public static var clusterd: Self {
get {
Self.clusterd(endpoint: nil)
}
}

/// Locate the default `ClusterD` process and use it for discovering cluster nodes.
public static func clusterd(endpoint: Cluster.Endpoint?) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's neat, my initial impression was it's for local clustering, but guess we can define any endpoint.

return ServiceDiscoverySettings(clusterdEndpoint: endpoint ?? ClusterDaemon.defaultEndpoint)
}

public init<Discovery, S>(_ implementation: Discovery, service: S, mapInstanceToNode transformer: @escaping (Discovery.Instance) throws -> Cluster.Endpoint)
where Discovery: ServiceDiscovery,
S == Discovery.Service
{
let mappedDiscovery: MapInstanceServiceDiscovery<Discovery, Cluster.Endpoint> = implementation.mapInstance(transformer)
self.implementation = .dynamic(AnyServiceDiscovery(mappedDiscovery))
self._initialize = { _ in }
self._subscribe = { onNext, onComplete in
mappedDiscovery.subscribe(to: service, onNext: onNext, onComplete: onComplete)
}
}

public static func `seed`(nodes: Set<Cluster.Endpoint>) -> Self {
.init(static: nodes)
}

public init(static nodes: Set<Cluster.Endpoint>) {
self.implementation = .static(nodes)
self._initialize = { _ in }
self._subscribe = { onNext, _ in
// Call onNext once and never again since the list of nodes doesn't change
onNext(.success(Array(nodes)))
Expand All @@ -441,12 +475,18 @@ public struct ServiceDiscoverySettings {

/// Similar to `ServiceDiscovery.subscribe` however it allows the handling of the listings to be generic and handled by the cluster system.
/// This function is only intended for internal use by the `DiscoveryShell`.
func subscribe(onNext nextResultHandler: @escaping (Result<[Cluster.Endpoint], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken? {
func subscribe(onNext nextResultHandler: @escaping (Result<[Cluster.Endpoint], Error>) -> Void,
onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken? {
self._subscribe(nextResultHandler, completionHandler)
}

func initialize(_ system: ClusterSystem) -> Void {
self._initialize(system)
}

enum ServiceDiscoveryImplementation {
case `static`(Set<Cluster.Endpoint>)
case clusterDaemon(Cluster.Endpoint)
case dynamic(AnyServiceDiscovery)
}
}
Expand Down
1 change: 0 additions & 1 deletion Sources/MultiNodeTestKit/MultiNodeTestConductor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ extension MultiNodeTestConductor {
self.log.warning("Checkpoint failed, informing node [\(node)]", metadata: [
"checkPoint/node": "\(node)",
"checkPoint/error": "\(checkPointError)",
"checkPoint/error": "\(checkPointError)",
])
cc.resume(throwing: checkPointError)

Expand Down
7 changes: 5 additions & 2 deletions Sources/MultiNodeTestKit/MultiNodeTestKit.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public struct MultiNodeTest {
public let crashRegex: String?
public let runTest: (any MultiNodeTestControlProtocol) async throws -> Void
public let configureActorSystem: (inout ClusterSystemSettings) -> Void
public let startNode: (ClusterSystemSettings) async throws -> ClusterSystem
public let configureMultiNodeTest: (inout MultiNodeTestSettings) -> Void
public let makeControl: (String) -> any MultiNodeTestControlProtocol

Expand All @@ -51,6 +52,7 @@ public struct MultiNodeTest {
}

self.configureActorSystem = TestSuite.configureActorSystem
self.startNode = TestSuite.startNode
self.configureMultiNodeTest = TestSuite.configureMultiNodeTest

self.makeControl = { nodeName -> Control<TestSuite.Nodes> in
Expand Down Expand Up @@ -80,6 +82,7 @@ public protocol MultiNodeTestSuite {
init()
associatedtype Nodes: MultiNodeNodes
static func configureActorSystem(settings: inout ClusterSystemSettings)
// static func startNode(settings: ClusterSystemSettings) -> ClusterSystem
static func configureMultiNodeTest(settings: inout MultiNodeTestSettings)
}

Expand All @@ -88,8 +91,8 @@ extension MultiNodeTestSuite {
"\(Self.self)".split(separator: ".").last.map(String.init) ?? ""
}

public func configureActorSystem(settings: inout ClusterSystemSettings) {
// do nothing by default
public static func startNode(settings: ClusterSystemSettings) async throws -> ClusterSystem {
await ClusterSystem(settings: settings)
}

var nodeNames: [String] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,28 @@ extension MultiNodeTestKitRunnerBoot {
)
}

let actorSystem = await ClusterSystem(nodeName) { settings in
settings.bindHost = myNode.host
settings.bindPort = myNode.port

/// By default get better backtraces in case we crash:
settings.installSwiftBacktrace = true

/// Configure a nicer logger, that pretty prints metadata and also includes source location of logs
if multiNodeSettings.installPrettyLogger {
settings.logging.baseLogger = Logger(label: nodeName, factory: { label in
PrettyMultiNodeLogHandler(nodeName: label, settings: multiNodeSettings.logCapture)
})
}

// we use the singleton to implement a simple Coordinator
// TODO: if the node hosting the coordinator dies we'd potentially have some races at hand
// there's a few ways to solve this... but for now this is good enough.
settings += ClusterSingletonPlugin()

multiNodeTest.configureActorSystem(&settings)
}
control._actorSystem = actorSystem
var settings = ClusterSystemSettings(name: nodeName)
settings.bindHost = myNode.host
settings.bindPort = myNode.port

/// By default get better backtraces in case we crash:
settings.installSwiftBacktrace = true

/// Configure a nicer logger, that pretty prints metadata and also includes source location of logs
if multiNodeSettings.installPrettyLogger {
settings.logging.baseLogger = Logger(label: nodeName, factory: { label in
PrettyMultiNodeLogHandler(nodeName: label, settings: multiNodeSettings.logCapture)
})
}

// we use the singleton to implement a simple Coordinator
// TODO: if the node hosting the coordinator dies we'd potentially have some races at hand
// there's a few ways to solve this... but for now this is good enough.
settings += ClusterSingletonPlugin()
multiNodeTest.configureActorSystem(&settings)

let actorSystem = try await multiNodeTest.startNode(settings)
control._actorSystem = actorSystem

let signalQueue = DispatchQueue(label: "multi.node.\(multiNodeTest.testSuiteName).\(multiNodeTest.testName).\(nodeName).SignalHandlerQueue")
let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: signalQueue)
Expand Down
Loading