Skip to content

Commit

Permalink
feat(CancellationSource): add cooperative cancellation to wait method
Browse files Browse the repository at this point in the history
  • Loading branch information
soumyamahunt committed Mar 10, 2023
1 parent 3f3dd36 commit 3a2451f
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 19 deletions.
81 changes: 62 additions & 19 deletions Sources/AsyncObjects/CancellationSource/CancellationSource.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Foundation
import AsyncAlgorithms

/// An object that controls cooperative cancellation of multiple registered tasks and linked object registered tasks.
///
Expand Down Expand Up @@ -36,16 +37,19 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
internal typealias Continuation = GlobalContinuation<Void, Error>
/// The cancellable work with invocation context.
internal typealias WorkItem = (
Cancellable, id: UUID, file: String, function: String, line: UInt
any Cancellable, id: UUID, file: String, function: String, line: UInt
)

/// The lifetime task that is cancelled when
/// `CancellationSource` is cancelled.
@usableFromInline
var lifetime: Task<Void, Error>!
let lifetime: Task<Void, Error>
/// The stream continuation used to register work items
/// for cooperative cancellation.
var pipe: AsyncStream<WorkItem>.Continuation!
let pipe: AsyncStream<WorkItem>.Continuation
/// The channel that controls waiting on the `CancellationSource`.
/// Once `CancellationSource` is cancelled, channel finishes.
let waiter: AsyncChannel<Void>

/// A Boolean value that indicates whether cancellation is already
/// invoked on the source.
Expand All @@ -61,24 +65,57 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
///
/// - Returns: The newly created cancellation source.
public init() {
let stream = AsyncStream<WorkItem> { self.pipe = $0 }
self.lifetime = Task.detached {
try await withThrowingTaskGroup(of: Void.self) { group in
for await item in stream {
group.addTask {
try? await waitHandlingCancelation(
for: item.0, associatedId: item.id,
file: item.file,
function: item.function,
line: item.line
)
var continuation: AsyncStream<WorkItem>.Continuation!
let stream = AsyncStream<WorkItem> { continuation = $0 }
let channel = AsyncChannel<Void>()
self.pipe = continuation
self.waiter = channel

func lifetime() -> Task<Void, Error> {
return Task.detached {
await withThrowingTaskGroup(of: Void.self) { group in
for await item in stream {
group.addTask {
try? await waitHandlingCancelation(
for: item.0, associatedId: item.id,
file: item.file,
function: item.function,
line: item.line
)
}
}

group.cancelAll()
}
channel.finish()
}
}

group.cancelAll()
try await group.waitForAll()
#if swift(>=5.8)
if #available(macOS 13.3, iOS 16.4, tvOS 16.4, watchOS 9.4, *) {
self.lifetime = Task.detached {
await withDiscardingTaskGroup { group in
for await item in stream {
group.addTask {
try? await waitHandlingCancelation(
for: item.0, associatedId: item.id,
file: item.file,
function: item.function,
line: item.line
)
}
}

group.cancelAll()
}
channel.finish()
}
} else {
self.lifetime = lifetime()
}
#else
self.lifetime = lifetime()
#endif
}

/// Register cancellable work for cooperative cancellation
Expand Down Expand Up @@ -163,11 +200,17 @@ public struct CancellationSource: AsyncObject, Cancellable, Loggable {
file: String = #fileID,
function: String = #function,
line: UInt = #line
) async {
) async throws {
let id = UUID()
log("Waiting", id: id, file: file, function: function, line: line)
let _ = await lifetime.result
log("Completed", id: id, file: file, function: function, line: line)
await waiter.send(())
do {
try Task.checkCancellation()
log("Completed", id: id, file: file, function: function, line: line)
} catch {
log("Cancelled", id: id, file: file, function: function, line: line)
throw error
}
}
}

Expand Down
36 changes: 36 additions & 0 deletions Tests/AsyncObjectsTests/CancellationSourceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,39 @@ class CancellationSourceInitializationTests: XCTestCase {
XCTAssertTrue(task.isCancelled)
}
}

@MainActor
class CancellationSourceWaitTests: XCTestCase {

func testWithoutCancellation() async throws {
let source = CancellationSource()
let task = Task.detached {
try await Task.sleep(seconds: 10)
XCTFail("Unexpected task progression")
}
source.register(task: task)
do {
try await source.wait(forSeconds: 3)
XCTFail("Unexpected task progression")
} catch is DurationTimeoutError {}
XCTAssertFalse(source.isCancelled)
XCTAssertFalse(task.isCancelled)
}

func testCooperativeCancellation() async throws {
let source = CancellationSource()
Task.detached(cancellationSource: source) {
try await Task.sleep(seconds: 20)
XCTFail("Unexpected task progression")
}
let task = Task.detached {
do {
try await source.wait(forSeconds: 5)
XCTFail("Unexpected task progression")
} catch is CancellationError {}
}
task.cancel()
try await task.value
XCTAssertFalse(source.isCancelled)
}
}

0 comments on commit 3a2451f

Please sign in to comment.