Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pthread_mutex_t dangling pointers #111

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions Flow.xcodeproj/xcshareddata/xcschemes/FlowTests.xcscheme
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1220"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "F6D80B621BBBB2ED008F8574"
BuildableName = "FlowTests.xctest"
BlueprintName = "FlowTests"
ReferencedContainer = "container:Flow.xcodeproj">
</BuildableReference>
</TestableReference>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>
3 changes: 1 addition & 2 deletions Flow/Callbacker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ public final class Callbacker<Value> {
}

private var callbacks = Callbacks.none
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

public init() {
mutex.initialize()
Expand Down
10 changes: 5 additions & 5 deletions Flow/Disposable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public struct NilDisposer: Disposable {
/// - Note: Is thread safe and reentrant (dispose callback could call itself)
public final class Disposer: Disposable {
private var disposer: (() -> ())?
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

/// Pass a closure to be called when being disposed
public init(_ disposer: @escaping () -> () = {}) {
Expand Down Expand Up @@ -58,8 +57,7 @@ public final class Disposer: Disposable {
/// - Note: New disposables could be added after a disposal.
public final class DisposeBag: Disposable {
private var disposables: [Disposable]
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

/// Create an empty instance
public init() {
Expand All @@ -86,7 +84,9 @@ public final class DisposeBag: Disposable {

/// Returns true if there is currently no disposables to dispose.
public var isEmpty: Bool {
return mutex.protect { disposables.isEmpty }
mutex.lock()
defer { mutex.unlock() }
return disposables.isEmpty
}

public func dispose() {
Expand Down
7 changes: 4 additions & 3 deletions Flow/Future.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public final class Future<Value> {

private var state: State
private let clone: () -> Future
private var _mutex = pthread_mutex_t()
private var mutex = pthread_mutex_t()

/// Helper used to move external futures inside `Future.init`'s `onComplete` closure. Needed for repetition to work properly.
public struct Mover {
Expand Down Expand Up @@ -327,10 +327,11 @@ func memPrint(_ str: String, _ count: Int32) {
}

private extension Future {
var mutex: PThreadMutex { return PThreadMutex(&_mutex) }

private var protectedState: State {
return mutex.protect { state }
mutex.lock()
defer { mutex.unlock() }
return state
}

func lock() {
Expand Down
36 changes: 21 additions & 15 deletions Flow/FutureQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public final class FutureQueue<Resource> {
private let queueScheduler: Scheduler
private var _closedError: Error?
private let isEmptyCallbacker = Callbacker<Bool>()
private var _mutex = pthread_mutex_t()
private var mutex = pthread_mutex_t()

// enqueued items.
private var items: [Executable] = [] {
Expand All @@ -41,9 +41,11 @@ public final class FutureQueue<Resource> {
queueScheduler = executeOn
OSAtomicIncrement32(&futureQueueUnitTestAliveCount)
memPrint("Queue init", futureQueueUnitTestAliveCount)
mutex.initialize(as: .recursive)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before adding the ability to initialize this as a recursive mutex, it deadlocked in FlowTests.FutureQueueTests testBatchQueue.

}

deinit {
mutex.deinitialize()
OSAtomicDecrement32(&futureQueueUnitTestAliveCount)
memPrint("Queue deinit", futureQueueUnitTestAliveCount)
}
Expand All @@ -61,9 +63,9 @@ public extension FutureQueue {
return Future { completion in
let item = QueueItem<Output>(operation: operation, completion: completion)

self.mutex.protect {
self.items.append(item)
}
self.mutex.lock()
self.items.append(item)
self.mutex.unlock()

self.executeNextItem()

Expand Down Expand Up @@ -119,7 +121,10 @@ public extension FutureQueue {
public extension FutureQueue {
/// Do we have any enqueued operations?
var isEmpty: Bool {
return mutex.protect { items.isEmpty }
mutex.lock()
defer { mutex.unlock() }

return items.isEmpty
}

/// Returns a signal that will signal when `isEmpty` is changed.
Expand Down Expand Up @@ -164,19 +169,21 @@ public extension FutureQueue {

/// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true.
var closedError: Error? {
return mutex.protect { _closedError }
mutex.lock()
defer { mutex.unlock() }

return _closedError
}
}

private extension FutureQueue {
var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
func lock() { mutex.lock() }
func unlock() { mutex.unlock() }

func removeItem(_ item: Executable) {
mutex.protect {
_ = items.firstIndex { $0 === item }.map { items.remove(at: $0) }
}
mutex.lock()
defer { mutex.unlock() }
_ = items.firstIndex { $0 === item }.map { items.remove(at: $0) }
}

func executeNextItem() {
Expand All @@ -188,9 +195,9 @@ private extension FutureQueue {
unlock()

item.execute(on: queueScheduler) {
self.mutex.protect {
self.concurrentCount -= 1
}
self.mutex.lock()
self.concurrentCount -= 1
self.mutex.unlock()
self.removeItem(item)
self.executeNextItem()
}
Expand All @@ -214,7 +221,7 @@ private final class QueueItem<Output>: Executable {
private let completion: (Result<Output>) -> ()
private weak var future: Future<Output>?
private var hasBeenCancelled = false
private var _mutex = pthread_mutex_t()
private var mutex = pthread_mutex_t()

init(operation: @escaping () throws -> Future<Output>, completion: @escaping (Result<Output>) -> ()) {
self.completion = completion
Expand All @@ -231,7 +238,6 @@ private final class QueueItem<Output>: Executable {
memPrint("Queue Item deinit", queueItemUnitTestAliveCount)
}

private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private func lock() { mutex.lock() }
private func unlock() { mutex.unlock() }

Expand Down
60 changes: 46 additions & 14 deletions Flow/Locking.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import Foundation

/// A reference wrapper around a POSIX thread mutex
public final class Mutex {
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

public init() {
mutex.initialize()
Expand Down Expand Up @@ -40,18 +39,60 @@ public final class Mutex {
}
}

enum MutexType {
case normal
case recursive

var attrType: Int32 {
switch self {
case .normal: return PTHREAD_MUTEX_NORMAL
case .recursive: return PTHREAD_MUTEX_RECURSIVE
}
}
}

internal extension pthread_mutex_t {

mutating func initialize(as type: MutexType = .normal) {
withUnsafeMutablePointer(to: &self) {
$0.initialize(as: type)
}
}

mutating func deinitialize() {
withUnsafeMutablePointer(to: &self) {
$0.deinitialize()
}
}

mutating func lock() {
withUnsafeMutablePointer(to: &self) {
$0.lock()
}
}

mutating func unlock() {
withUnsafeMutablePointer(to: &self) {
$0.unlock()
}
}

}

typealias PThreadMutex = UnsafeMutablePointer<pthread_mutex_t>

/// Helper methods to work directly with a Pthread mutex pointer to avoid overhead of alloction and reference counting of using the Mutex reference type.
/// - Note: You have to explicity call `initialize()` before use (typically in a class init) and `deinitialize()` when done (typically in a class deinit)
extension UnsafeMutablePointer where Pointee == pthread_mutex_t {
func initialize() {

func initialize(as type: MutexType = .recursive) {
var attr = pthread_mutexattr_t()
defer { pthread_mutexattr_destroy(&attr) }
guard pthread_mutexattr_init(&attr) == 0 else {
preconditionFailure()
}

pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL)
pthread_mutexattr_settype(&attr, type.attrType)

guard pthread_mutex_init(self, &attr) == 0 else {
preconditionFailure()
Expand All @@ -71,23 +112,14 @@ extension UnsafeMutablePointer where Pointee == pthread_mutex_t {
func unlock() {
pthread_mutex_unlock(self)
}

/// Will lock `self`, call `block`, then unlock `self`
Copy link
Author

@digitaliz digitaliz May 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed protect because it triggered Thread 1: Simultaneous accesses to 0x1032d57a8, but modification requires exclusive access when running the FlowTests. I currently don't have a good enough mental model to understand why that is (but a guess is recursively calling protect, starts two possible mutating scopes). Since I was unable to clearly reason about this construct, and for now can't do more research, I decided to remove it in favour of straight mutex lock/unlock that I understand better.

@discardableResult
func protect<T>(_ block: () throws -> T) rethrows -> T {
pthread_mutex_lock(self)
defer { pthread_mutex_unlock(self) }
return try block()
}
}

/// Internal helper to help manage state in stateful transforms.
final class StateAndCallback<Value, State>: Disposable {
var callback: ((Value) -> ())?
var val: State
fileprivate var disposables = [Disposable]()
private var _mutex = pthread_mutex_t()
fileprivate var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
fileprivate var mutex = pthread_mutex_t()

init(state: State, callback: @escaping (Value) -> ()) {
val = state
Expand Down
33 changes: 21 additions & 12 deletions Flow/OrderedCallbacker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import Foundation
/// - Note: Is thread safe.
public final class OrderedCallbacker<OrderedValue, CallbackValue> {
private var callbacks: [Key: (OrderedValue, (CallbackValue) -> Future<()>)] = [:]
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

public init() {
mutex.initialize()
Expand All @@ -27,30 +26,40 @@ public final class OrderedCallbacker<OrderedValue, CallbackValue> {

/// - Returns: True if no callbacks has been registered.
public var isEmpty: Bool {
return mutex.protect { callbacks.isEmpty }
mutex.lock()
defer { mutex.unlock() }

return callbacks.isEmpty
}

/// Register a callback and orderedValue to be called when `callAll` is executed.
/// - Parameter callback: The next callback won't be called until `callback` return `Future` completes
/// - Parameter orderedValue: The value used to order this callback
/// - Returns: A `Disposable` to be disposed to unregister the callback.
public func addCallback(_ callback: @escaping (CallbackValue) -> Future<()>, orderedBy orderedValue: OrderedValue) -> Disposable {
return mutex.protect {
let key = generateKey()
callbacks[key] = (orderedValue, callback)
return Disposer {
self.mutex.protect { self.callbacks[key] = nil }
}
mutex.lock()
defer { mutex.unlock() }

let key = generateKey()
callbacks[key] = (orderedValue, callback)
return Disposer {
self.mutex.lock()
self.callbacks[key] = nil
self.mutex.unlock()
}
}

/// Will call all registered callbacks with `value` in the order set by `isOrderedBefore`
/// - Returns: A `Future` that will complete when all callbacks has been called.
@discardableResult
public func callAll(with value: CallbackValue, isOrderedBefore: (OrderedValue, OrderedValue) -> Bool) -> Future<()> {
return mutex.protect {
callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 }
}.mapToFuture { $0(value) }.toVoid()

mutex.lock()
defer { mutex.unlock() }
return callbacks.values
.sorted { isOrderedBefore($0.0, $1.0) }
.map { $1 }
.mapToFuture { $0(value) }.toVoid()
}
}

Expand Down
6 changes: 2 additions & 4 deletions Flow/Signal+Construction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ private final class CallbackState<Value>: Disposable {
private var shared: SharedState<Value>?
let sharedKey: Key

private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

init(shared: SharedState<Value>? = nil, getValue: (() -> Value)?, callback: @escaping (EventType<Value>) -> Void) {
self.shared = shared
Expand Down Expand Up @@ -292,8 +291,7 @@ private final class CallbackState<Value>: Disposable {
/// Helper to implement sharing of a single `onEvent` if more than one listner, see `SignalOption.shared`
final class SharedState<Value> {
private let getValue: (() -> Value)?
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

typealias Callback = (EventType<Value>) -> Void
var firstCallback: (key: Key, value: Callback)?
Expand Down
Loading