From 2ae2301de565e090ec320bff51cbe20234f64708 Mon Sep 17 00:00:00 2001 From: Daniel Ericsson Date: Thu, 29 Apr 2021 11:39:56 +0200 Subject: [PATCH 1/8] Add pthread_mutexattr cleanup --- Flow/Locking.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 684e6d2..3e0210a 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -47,6 +47,7 @@ typealias PThreadMutex = UnsafeMutablePointer extension UnsafeMutablePointer where Pointee == pthread_mutex_t { func initialize() { var attr = pthread_mutexattr_t() + defer { pthread_mutexattr_destroy(&attr) } guard pthread_mutexattr_init(&attr) == 0 else { preconditionFailure() } From 5cf11e2b5713acc1bb51b24dbf83b9c8aee2193f Mon Sep 17 00:00:00 2001 From: Daniel Ericsson Date: Thu, 29 Apr 2021 11:45:37 +0200 Subject: [PATCH 2/8] Add FlowTests scheme --- .../xcshareddata/xcschemes/FlowTests.xcscheme | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 Flow.xcodeproj/xcshareddata/xcschemes/FlowTests.xcscheme diff --git a/Flow.xcodeproj/xcshareddata/xcschemes/FlowTests.xcscheme b/Flow.xcodeproj/xcshareddata/xcschemes/FlowTests.xcscheme new file mode 100644 index 0000000..aadfb68 --- /dev/null +++ b/Flow.xcodeproj/xcshareddata/xcschemes/FlowTests.xcscheme @@ -0,0 +1,52 @@ + + + + + + + + + + + + + + + + + + + + + From a1c92c3f8135519ef2c2accd3f149dfc041d6f15 Mon Sep 17 00:00:00 2001 From: Daniel Ericsson Date: Thu, 29 Apr 2021 12:13:08 +0200 Subject: [PATCH 3/8] Extend convenience API to pthread_mutex_t directly --- Flow/Locking.swift | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 3e0210a..c0d31f1 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -40,11 +40,46 @@ public final class Mutex { } } +internal extension pthread_mutex_t { + + @inlinable mutating func initialize() { + withUnsafeMutablePointer(to: &self) { + $0.initialize() + } + } + + @inlinable mutating func deinitialize() { + withUnsafeMutablePointer(to: &self) { + $0.deinitialize() + } + } + + @inlinable mutating func lock() { + withUnsafeMutablePointer(to: &self) { + $0.lock() + } + } + + @inlinable mutating func unlock() { + withUnsafeMutablePointer(to: &self) { + $0.unlock() + } + } + + @inlinable mutating func protect(_ block: () throws -> T) rethrows -> T { + try withUnsafeMutablePointer(to: &self) { + return try $0.protect(block) + } + } + +} + typealias PThreadMutex = UnsafeMutablePointer /// Helper methods to work directly with a Pthread mutex pointer to avoid overhead of alloction and reference counting of using the Mutex reference type. /// - Note: You have to explicity call `initialize()` before use (typically in a class init) and `deinitialize()` when done (typically in a class deinit) extension UnsafeMutablePointer where Pointee == pthread_mutex_t { + @usableFromInline func initialize() { var attr = pthread_mutexattr_t() defer { pthread_mutexattr_destroy(&attr) } @@ -59,22 +94,25 @@ extension UnsafeMutablePointer where Pointee == pthread_mutex_t { } } + @usableFromInline func deinitialize() { pthread_mutex_destroy(self) } /// Attempt to acquire the lock, blocking a thread’s execution until the lock can be acquired. + @usableFromInline func lock() { pthread_mutex_lock(self) } /// Releases a previously acquired lock. + @usableFromInline func unlock() { pthread_mutex_unlock(self) } /// Will lock `self`, call `block`, then unlock `self` - @discardableResult + @discardableResult @usableFromInline func protect(_ block: () throws -> T) rethrows -> T { pthread_mutex_lock(self) defer { pthread_mutex_unlock(self) } From 796f046b1bbf24551f3e3c2557c8a2a7e1677dae Mon Sep 17 00:00:00 2001 From: Daniel Ericsson Date: Thu, 29 Apr 2021 12:21:34 +0200 Subject: [PATCH 4/8] Remove PThreadMutex stored unsafe pointers --- Flow/Callbacker.swift | 3 +-- Flow/Disposable.swift | 6 ++---- Flow/Future.swift | 3 +-- Flow/FutureQueue.swift | 3 +-- Flow/Locking.swift | 6 ++---- Flow/OrderedCallbacker.swift | 3 +-- Flow/Signal+Construction.swift | 6 ++---- Flow/Signal+Scheduling.swift | 3 +-- 8 files changed, 11 insertions(+), 22 deletions(-) diff --git a/Flow/Callbacker.swift b/Flow/Callbacker.swift index 853802e..1c32e6b 100644 --- a/Flow/Callbacker.swift +++ b/Flow/Callbacker.swift @@ -20,8 +20,7 @@ public final class Callbacker { } private var callbacks = Callbacks.none - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() public init() { mutex.initialize() diff --git a/Flow/Disposable.swift b/Flow/Disposable.swift index 70a50c1..9799336 100644 --- a/Flow/Disposable.swift +++ b/Flow/Disposable.swift @@ -28,8 +28,7 @@ public struct NilDisposer: Disposable { /// - Note: Is thread safe and reentrant (dispose callback could call itself) public final class Disposer: Disposable { private var disposer: (() -> ())? - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() /// Pass a closure to be called when being disposed public init(_ disposer: @escaping () -> () = {}) { @@ -58,8 +57,7 @@ public final class Disposer: Disposable { /// - Note: New disposables could be added after a disposal. public final class DisposeBag: Disposable { private var disposables: [Disposable] - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() /// Create an empty instance public init() { diff --git a/Flow/Future.swift b/Flow/Future.swift index d94872f..87f0dc1 100644 --- a/Flow/Future.swift +++ b/Flow/Future.swift @@ -57,7 +57,7 @@ public final class Future { private var state: State private let clone: () -> Future - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() /// Helper used to move external futures inside `Future.init`'s `onComplete` closure. Needed for repetition to work properly. public struct Mover { @@ -327,7 +327,6 @@ func memPrint(_ str: String, _ count: Int32) { } private extension Future { - var mutex: PThreadMutex { return PThreadMutex(&_mutex) } private var protectedState: State { return mutex.protect { state } diff --git a/Flow/FutureQueue.swift b/Flow/FutureQueue.swift index abd4418..dc8e268 100644 --- a/Flow/FutureQueue.swift +++ b/Flow/FutureQueue.swift @@ -214,7 +214,7 @@ private final class QueueItem: Executable { private let completion: (Result) -> () private weak var future: Future? private var hasBeenCancelled = false - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() init(operation: @escaping () throws -> Future, completion: @escaping (Result) -> ()) { self.completion = completion @@ -231,7 +231,6 @@ private final class QueueItem: Executable { memPrint("Queue Item deinit", queueItemUnitTestAliveCount) } - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } private func lock() { mutex.lock() } private func unlock() { mutex.unlock() } diff --git a/Flow/Locking.swift b/Flow/Locking.swift index c0d31f1..6863c0c 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -10,8 +10,7 @@ import Foundation /// A reference wrapper around a POSIX thread mutex public final class Mutex { - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() public init() { mutex.initialize() @@ -125,8 +124,7 @@ final class StateAndCallback: Disposable { var callback: ((Value) -> ())? var val: State fileprivate var disposables = [Disposable]() - private var _mutex = pthread_mutex_t() - fileprivate var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + fileprivate var mutex = pthread_mutex_t() init(state: State, callback: @escaping (Value) -> ()) { val = state diff --git a/Flow/OrderedCallbacker.swift b/Flow/OrderedCallbacker.swift index c1edfaa..a48f2e9 100644 --- a/Flow/OrderedCallbacker.swift +++ b/Flow/OrderedCallbacker.swift @@ -14,8 +14,7 @@ import Foundation /// - Note: Is thread safe. public final class OrderedCallbacker { private var callbacks: [Key: (OrderedValue, (CallbackValue) -> Future<()>)] = [:] - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() public init() { mutex.initialize() diff --git a/Flow/Signal+Construction.swift b/Flow/Signal+Construction.swift index e83fc39..2372db0 100644 --- a/Flow/Signal+Construction.swift +++ b/Flow/Signal+Construction.swift @@ -112,8 +112,7 @@ private final class CallbackState: Disposable { private var shared: SharedState? let sharedKey: Key - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() init(shared: SharedState? = nil, getValue: (() -> Value)?, callback: @escaping (EventType) -> Void) { self.shared = shared @@ -292,8 +291,7 @@ private final class CallbackState: Disposable { /// Helper to implement sharing of a single `onEvent` if more than one listner, see `SignalOption.shared` final class SharedState { private let getValue: (() -> Value)? - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() typealias Callback = (EventType) -> Void var firstCallback: (key: Key, value: Callback)? diff --git a/Flow/Signal+Scheduling.swift b/Flow/Signal+Scheduling.swift index c5e094f..7ec9acd 100644 --- a/Flow/Signal+Scheduling.swift +++ b/Flow/Signal+Scheduling.swift @@ -119,8 +119,7 @@ internal extension CoreSignal { // Using custom Disposable instead of DisposeBag for efficiency (less allocations) private final class OnEventTypeDisposer: Disposable { private var disposable: Disposable? - private var _mutex = pthread_mutex_t() - private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } + private var mutex = pthread_mutex_t() private let scheduler: Scheduler private var callback: ((EventType) -> Void)? From 1709d3c9c82ba037db34ee43efd9082215a9969e Mon Sep 17 00:00:00 2001 From: Daniel Ericsson Date: Thu, 29 Apr 2021 12:41:38 +0200 Subject: [PATCH 5/8] Add recursive mutex type, remove protect --- Flow/Disposable.swift | 5 ++++- Flow/Future.swift | 5 ++++- Flow/FutureQueue.swift | 33 ++++++++++++++++++++------------- Flow/Locking.swift | 34 +++++++++++++++++----------------- Flow/OrderedCallbacker.swift | 30 ++++++++++++++++++++---------- 5 files changed, 65 insertions(+), 42 deletions(-) diff --git a/Flow/Disposable.swift b/Flow/Disposable.swift index 9799336..619d518 100644 --- a/Flow/Disposable.swift +++ b/Flow/Disposable.swift @@ -84,7 +84,10 @@ public final class DisposeBag: Disposable { /// Returns true if there is currently no disposables to dispose. public var isEmpty: Bool { - return mutex.protect { disposables.isEmpty } + mutex.lock() + defer { mutex.unlock() } + + return disposables.isEmpty } public func dispose() { diff --git a/Flow/Future.swift b/Flow/Future.swift index 87f0dc1..d7dac83 100644 --- a/Flow/Future.swift +++ b/Flow/Future.swift @@ -329,7 +329,10 @@ func memPrint(_ str: String, _ count: Int32) { private extension Future { private var protectedState: State { - return mutex.protect { state } + mutex.lock() + defer { mutex.unlock() } + + return state } func lock() { diff --git a/Flow/FutureQueue.swift b/Flow/FutureQueue.swift index dc8e268..d28c388 100644 --- a/Flow/FutureQueue.swift +++ b/Flow/FutureQueue.swift @@ -18,7 +18,7 @@ public final class FutureQueue { private let queueScheduler: Scheduler private var _closedError: Error? private let isEmptyCallbacker = Callbacker() - private var _mutex = pthread_mutex_t() + private var mutex = pthread_mutex_t() // enqueued items. private var items: [Executable] = [] { @@ -41,9 +41,11 @@ public final class FutureQueue { queueScheduler = executeOn OSAtomicIncrement32(&futureQueueUnitTestAliveCount) memPrint("Queue init", futureQueueUnitTestAliveCount) + mutex.initialize(as: .recursive) } deinit { + mutex.deinitialize() OSAtomicDecrement32(&futureQueueUnitTestAliveCount) memPrint("Queue deinit", futureQueueUnitTestAliveCount) } @@ -61,9 +63,9 @@ public extension FutureQueue { return Future { completion in let item = QueueItem(operation: operation, completion: completion) - self.mutex.protect { - self.items.append(item) - } + self.mutex.lock() + self.items.append(item) + self.mutex.unlock() self.executeNextItem() @@ -119,7 +121,10 @@ public extension FutureQueue { public extension FutureQueue { /// Do we have any enqueued operations? var isEmpty: Bool { - return mutex.protect { items.isEmpty } + mutex.lock() + defer { mutex.unlock() } + + return items.isEmpty } /// Returns a signal that will signal when `isEmpty` is changed. @@ -164,19 +169,21 @@ public extension FutureQueue { /// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true. var closedError: Error? { - return mutex.protect { _closedError } + mutex.lock() + defer { mutex.unlock() } + + return _closedError } } private extension FutureQueue { - var mutex: PThreadMutex { return PThreadMutex(&_mutex) } func lock() { mutex.lock() } func unlock() { mutex.unlock() } func removeItem(_ item: Executable) { - mutex.protect { - _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } - } + mutex.lock() + defer { mutex.unlock() } + _ = items.firstIndex { $0 === item }.map { items.remove(at: $0) } } func executeNextItem() { @@ -188,9 +195,9 @@ private extension FutureQueue { unlock() item.execute(on: queueScheduler) { - self.mutex.protect { - self.concurrentCount -= 1 - } + self.mutex.lock() + self.concurrentCount -= 1 + self.mutex.unlock() self.removeItem(item) self.executeNextItem() } diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 6863c0c..2992889 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -39,11 +39,24 @@ public final class Mutex { } } +@usableFromInline +enum MutexType { + case normal + case recursive + + var attrType: Int32 { + switch self { + case .normal: return PTHREAD_MUTEX_NORMAL + case .recursive: return PTHREAD_MUTEX_RECURSIVE + } + } +} + internal extension pthread_mutex_t { - @inlinable mutating func initialize() { + @inlinable mutating func initialize(as type: MutexType = .normal) { withUnsafeMutablePointer(to: &self) { - $0.initialize() + $0.initialize(as: type) } } @@ -65,12 +78,6 @@ internal extension pthread_mutex_t { } } - @inlinable mutating func protect(_ block: () throws -> T) rethrows -> T { - try withUnsafeMutablePointer(to: &self) { - return try $0.protect(block) - } - } - } typealias PThreadMutex = UnsafeMutablePointer @@ -79,14 +86,14 @@ typealias PThreadMutex = UnsafeMutablePointer /// - Note: You have to explicity call `initialize()` before use (typically in a class init) and `deinitialize()` when done (typically in a class deinit) extension UnsafeMutablePointer where Pointee == pthread_mutex_t { @usableFromInline - func initialize() { + func initialize(as type: MutexType = .normal) { var attr = pthread_mutexattr_t() defer { pthread_mutexattr_destroy(&attr) } guard pthread_mutexattr_init(&attr) == 0 else { preconditionFailure() } - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL) + pthread_mutexattr_settype(&attr, type.attrType) guard pthread_mutex_init(self, &attr) == 0 else { preconditionFailure() @@ -110,13 +117,6 @@ extension UnsafeMutablePointer where Pointee == pthread_mutex_t { pthread_mutex_unlock(self) } - /// Will lock `self`, call `block`, then unlock `self` - @discardableResult @usableFromInline - func protect(_ block: () throws -> T) rethrows -> T { - pthread_mutex_lock(self) - defer { pthread_mutex_unlock(self) } - return try block() - } } /// Internal helper to help manage state in stateful transforms. diff --git a/Flow/OrderedCallbacker.swift b/Flow/OrderedCallbacker.swift index a48f2e9..0a459ab 100644 --- a/Flow/OrderedCallbacker.swift +++ b/Flow/OrderedCallbacker.swift @@ -26,7 +26,10 @@ public final class OrderedCallbacker { /// - Returns: True if no callbacks has been registered. public var isEmpty: Bool { - return mutex.protect { callbacks.isEmpty } + mutex.lock() + defer { mutex.unlock() } + + return callbacks.isEmpty } /// Register a callback and orderedValue to be called when `callAll` is executed. @@ -34,12 +37,15 @@ public final class OrderedCallbacker { /// - Parameter orderedValue: The value used to order this callback /// - Returns: A `Disposable` to be disposed to unregister the callback. public func addCallback(_ callback: @escaping (CallbackValue) -> Future<()>, orderedBy orderedValue: OrderedValue) -> Disposable { - return mutex.protect { - let key = generateKey() - callbacks[key] = (orderedValue, callback) - return Disposer { - self.mutex.protect { self.callbacks[key] = nil } - } + mutex.lock() + defer { mutex.unlock() } + + let key = generateKey() + callbacks[key] = (orderedValue, callback) + return Disposer { + self.mutex.lock() + self.callbacks[key] = nil + self.mutex.unlock() } } @@ -47,9 +53,13 @@ public final class OrderedCallbacker { /// - Returns: A `Future` that will complete when all callbacks has been called. @discardableResult public func callAll(with value: CallbackValue, isOrderedBefore: (OrderedValue, OrderedValue) -> Bool) -> Future<()> { - return mutex.protect { - callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 } - }.mapToFuture { $0(value) }.toVoid() + + mutex.lock() + defer { mutex.unlock() } + return callbacks.values + .sorted { isOrderedBefore($0.0, $1.0) } + .map { $1 } + .mapToFuture { $0(value) }.toVoid() } } From ca1a7a223dbec965e3253ae86a4ddc79ded5ae99 Mon Sep 17 00:00:00 2001 From: Daniel Ericsson Date: Thu, 29 Apr 2021 12:47:07 +0200 Subject: [PATCH 6/8] Fix whitespace warnings --- Flow/Disposable.swift | 1 - Flow/Future.swift | 1 - 2 files changed, 2 deletions(-) diff --git a/Flow/Disposable.swift b/Flow/Disposable.swift index 619d518..a7f5f10 100644 --- a/Flow/Disposable.swift +++ b/Flow/Disposable.swift @@ -86,7 +86,6 @@ public final class DisposeBag: Disposable { public var isEmpty: Bool { mutex.lock() defer { mutex.unlock() } - return disposables.isEmpty } diff --git a/Flow/Future.swift b/Flow/Future.swift index d7dac83..ccb9d33 100644 --- a/Flow/Future.swift +++ b/Flow/Future.swift @@ -331,7 +331,6 @@ private extension Future { private var protectedState: State { mutex.lock() defer { mutex.unlock() } - return state } From f32c1573b9e0329df68f463965423c1a9563ecc7 Mon Sep 17 00:00:00 2001 From: Daniel Ericsson Date: Mon, 17 May 2021 17:18:46 +0200 Subject: [PATCH 7/8] Remove inlining annotations Unclear to me if it's supported --- Flow/Locking.swift | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 2992889..3cce65f 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -39,7 +39,6 @@ public final class Mutex { } } -@usableFromInline enum MutexType { case normal case recursive @@ -54,25 +53,25 @@ enum MutexType { internal extension pthread_mutex_t { - @inlinable mutating func initialize(as type: MutexType = .normal) { + mutating func initialize(as type: MutexType = .normal) { withUnsafeMutablePointer(to: &self) { $0.initialize(as: type) } } - @inlinable mutating func deinitialize() { + mutating func deinitialize() { withUnsafeMutablePointer(to: &self) { $0.deinitialize() } } - @inlinable mutating func lock() { + mutating func lock() { withUnsafeMutablePointer(to: &self) { $0.lock() } } - @inlinable mutating func unlock() { + mutating func unlock() { withUnsafeMutablePointer(to: &self) { $0.unlock() } @@ -85,7 +84,7 @@ typealias PThreadMutex = UnsafeMutablePointer /// Helper methods to work directly with a Pthread mutex pointer to avoid overhead of alloction and reference counting of using the Mutex reference type. /// - Note: You have to explicity call `initialize()` before use (typically in a class init) and `deinitialize()` when done (typically in a class deinit) extension UnsafeMutablePointer where Pointee == pthread_mutex_t { - @usableFromInline + func initialize(as type: MutexType = .normal) { var attr = pthread_mutexattr_t() defer { pthread_mutexattr_destroy(&attr) } @@ -100,23 +99,19 @@ extension UnsafeMutablePointer where Pointee == pthread_mutex_t { } } - @usableFromInline func deinitialize() { pthread_mutex_destroy(self) } /// Attempt to acquire the lock, blocking a thread’s execution until the lock can be acquired. - @usableFromInline func lock() { pthread_mutex_lock(self) } /// Releases a previously acquired lock. - @usableFromInline func unlock() { pthread_mutex_unlock(self) } - } /// Internal helper to help manage state in stateful transforms. From 8e12c7fa48cea37f288909fc87375ca17053c624 Mon Sep 17 00:00:00 2001 From: Daniel Ericsson Date: Sat, 5 Jun 2021 07:37:12 +0200 Subject: [PATCH 8/8] Make recursive locks the default --- Flow/Locking.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Flow/Locking.swift b/Flow/Locking.swift index 3cce65f..5e06ef1 100644 --- a/Flow/Locking.swift +++ b/Flow/Locking.swift @@ -85,7 +85,7 @@ typealias PThreadMutex = UnsafeMutablePointer /// - Note: You have to explicity call `initialize()` before use (typically in a class init) and `deinitialize()` when done (typically in a class deinit) extension UnsafeMutablePointer where Pointee == pthread_mutex_t { - func initialize(as type: MutexType = .normal) { + func initialize(as type: MutexType = .recursive) { var attr = pthread_mutexattr_t() defer { pthread_mutexattr_destroy(&attr) } guard pthread_mutexattr_init(&attr) == 0 else {