Skip to content

Commit

Permalink
Added @mainactor to SharedSequence, Driver and Signal functions takin…
Browse files Browse the repository at this point in the history
…g closure arguments.
  • Loading branch information
fabianmuecke committed Oct 28, 2024
1 parent 7570d44 commit 9cee95e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 23 deletions.
10 changes: 6 additions & 4 deletions RxCocoa/Traits/Driver/Driver+Subscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,11 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt
gracefully completed, errored, or if the generation is canceled by disposing subscription)
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
@preconcurrency @MainActor
public func drive<Object: AnyObject>(
with object: Object,
onNext: ((Object, Element) -> Void)? = nil,
onCompleted: ((Object) -> Void)? = nil,
onNext: (@MainActor (Object, Element) -> Void)? = nil,
onCompleted: (@MainActor (Object) -> Void)? = nil,
onDisposed: ((Object) -> Void)? = nil
) -> Disposable {
MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
Expand All @@ -178,9 +179,10 @@ extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingSt
gracefully completed, errored, or if the generation is canceled by disposing subscription)
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
@preconcurrency @MainActor
public func drive(
onNext: ((Element) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onNext: (@MainActor (Element) -> Void)? = nil,
onCompleted: (@MainActor () -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
Expand Down
45 changes: 30 additions & 15 deletions RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ extension SharedSequenceConvertibleType {
- parameter selector: A transform function to apply to each source element.
- returns: An observable sequence whose elements are the result of invoking the transform function on each element of source.
*/
public func map<Result>(_ selector: @escaping (Element) -> Result) -> SharedSequence<SharingStrategy, Result> {
@preconcurrency @MainActor
public func map<Result>(_ selector: @escaping @MainActor (Element) -> Result) -> SharedSequence<SharingStrategy, Result> {
let source = self
.asObservable()
.map(selector)
Expand All @@ -35,7 +36,8 @@ extension SharedSequenceConvertibleType {
- returns: An observable sequence whose elements are the result of filtering the transform function for each element of the source.

*/
public func compactMap<Result>(_ selector: @escaping (Element) -> Result?) -> SharedSequence<SharingStrategy, Result> {
@preconcurrency @MainActor
public func compactMap<Result>(_ selector: @escaping @MainActor (Element) -> Result?) -> SharedSequence<SharingStrategy, Result> {
let source = self
.asObservable()
.compactMap(selector)
Expand All @@ -51,7 +53,8 @@ extension SharedSequenceConvertibleType {
- parameter predicate: A function to test each source element for a condition.
- returns: An observable sequence that contains elements from the input sequence that satisfy the condition.
*/
public func filter(_ predicate: @escaping (Element) -> Bool) -> SharedSequence<SharingStrategy, Element> {
@preconcurrency @MainActor
public func filter(_ predicate: @escaping @MainActor (Element) -> Bool) -> SharedSequence<SharingStrategy, Element> {
let source = self
.asObservable()
.filter(predicate)
Expand Down Expand Up @@ -92,7 +95,8 @@ extension SharedSequenceConvertibleType {
- returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an
Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received.
*/
public func flatMapLatest<Sharing, Result>(_ selector: @escaping (Element) -> SharedSequence<Sharing, Result>)
@preconcurrency @MainActor
public func flatMapLatest<Sharing, Result>(_ selector: @escaping @MainActor (Element) -> SharedSequence<Sharing, Result>)
-> SharedSequence<Sharing, Result> {
let source: Observable<Result> = self
.asObservable()
Expand All @@ -111,7 +115,8 @@ extension SharedSequenceConvertibleType {
- parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel.
- returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated.
*/
public func flatMapFirst<Sharing, Result>(_ selector: @escaping (Element) -> SharedSequence<Sharing, Result>)
@preconcurrency @MainActor
public func flatMapFirst<Sharing, Result>(_ selector: @escaping @MainActor (Element) -> SharedSequence<Sharing, Result>)
-> SharedSequence<Sharing, Result> {
let source: Observable<Result> = self
.asObservable()
Expand All @@ -134,7 +139,8 @@ extension SharedSequenceConvertibleType {
- parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed.
- returns: The source sequence with the side-effecting behavior applied.
*/
public func `do`(onNext: ((Element) -> Void)? = nil, afterNext: ((Element) -> Void)? = nil, onCompleted: (() -> Void)? = nil, afterCompleted: (() -> Void)? = nil, onSubscribe: (() -> Void)? = nil, onSubscribed: (() -> Void)? = nil, onDispose: (() -> Void)? = nil)
@preconcurrency @MainActor
public func `do`(onNext: (@MainActor (Element) -> Void)? = nil, afterNext: (@MainActor (Element) -> Void)? = nil, onCompleted: (@MainActor () -> Void)? = nil, afterCompleted: ( @MainActor () -> Void)? = nil, onSubscribe: (@MainActor () -> Void)? = nil, onSubscribed: (@MainActor () -> Void)? = nil, onDispose: (() -> Void)? = nil)
-> SharedSequence<SharingStrategy, Element> {
let source = self.asObservable()
.do(onNext: onNext, afterNext: afterNext, onCompleted: onCompleted, afterCompleted: afterCompleted, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose)
Expand Down Expand Up @@ -184,7 +190,8 @@ extension SharedSequenceConvertibleType {
- parameter keySelector: A function to compute the comparison key for each element.
- returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence.
*/
public func distinctUntilChanged<Key: Equatable>(_ keySelector: @escaping (Element) -> Key) -> SharedSequence<SharingStrategy, Element> {
@preconcurrency @MainActor
public func distinctUntilChanged<Key: Equatable>(_ keySelector: @escaping @MainActor (Element) -> Key) -> SharedSequence<SharingStrategy, Element> {
let source = self.asObservable()
.distinctUntilChanged(keySelector, comparer: { $0 == $1 })
return SharedSequence(source)
Expand All @@ -196,7 +203,8 @@ extension SharedSequenceConvertibleType {
- parameter comparer: Equality comparer for computed key values.
- returns: An observable sequence only containing the distinct contiguous elements, based on `comparer`, from the source sequence.
*/
public func distinctUntilChanged(_ comparer: @escaping (Element, Element) -> Bool) -> SharedSequence<SharingStrategy, Element> {
@preconcurrency @MainActor
public func distinctUntilChanged(_ comparer: @escaping @MainActor (Element, Element) -> Bool) -> SharedSequence<SharingStrategy, Element> {
let source = self.asObservable()
.distinctUntilChanged({ $0 }, comparer: comparer)
return SharedSequence<SharingStrategy, Element>(source)
Expand All @@ -209,7 +217,8 @@ extension SharedSequenceConvertibleType {
- parameter comparer: Equality comparer for computed key values.
- returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value and the comparer, from the source sequence.
*/
public func distinctUntilChanged<K>(_ keySelector: @escaping (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence<SharingStrategy, Element> {
@preconcurrency @MainActor
public func distinctUntilChanged<K>(_ keySelector: @escaping @MainActor (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence<SharingStrategy, Element> {
let source = self.asObservable()
.distinctUntilChanged(keySelector, comparer: comparer)
return SharedSequence<SharingStrategy, Element>(source)
Expand All @@ -226,7 +235,8 @@ extension SharedSequenceConvertibleType {
- parameter selector: A transform function to apply to each element.
- returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.
*/
public func flatMap<Sharing, Result>(_ selector: @escaping (Element) -> SharedSequence<Sharing, Result>) -> SharedSequence<Sharing, Result> {
@preconcurrency @MainActor
public func flatMap<Sharing, Result>(_ selector: @escaping @MainActor (Element) -> SharedSequence<Sharing, Result>) -> SharedSequence<Sharing, Result> {
let source = self.asObservable()
.flatMap(selector)

Expand Down Expand Up @@ -355,7 +365,8 @@ extension SharedSequenceConvertibleType {
- parameter accumulator: An accumulator function to be invoked on each element.
- returns: An observable sequence containing the accumulated values.
*/
public func scan<A>(_ seed: A, accumulator: @escaping (A, Element) -> A)
@preconcurrency @MainActor
public func scan<A>(_ seed: A, accumulator: @escaping @MainActor (A, Element) -> A)
-> SharedSequence<SharingStrategy, A> {
let source = self.asObservable()
.scan(seed, accumulator: accumulator)
Expand Down Expand Up @@ -398,7 +409,8 @@ extension SharedSequence {
- parameter resultSelector: Function to invoke for each series of elements at corresponding indexes in the sources.
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
public static func zip<Collection: Swift.Collection, Result>(_ collection: Collection, resultSelector: @escaping ([Element]) throws -> Result) -> SharedSequence<SharingStrategy, Result>
@preconcurrency @MainActor
public static func zip<Collection: Swift.Collection, Result>(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence<SharingStrategy, Result>
where Collection.Element == SharedSequence<SharingStrategy, Element> {
let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }, resultSelector: resultSelector)
return SharedSequence<SharingStrategy, Result>(source)
Expand All @@ -425,7 +437,8 @@ extension SharedSequence {
- parameter resultSelector: Function to invoke whenever any of the sources produces an element.
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
public static func combineLatest<Collection: Swift.Collection, Result>(_ collection: Collection, resultSelector: @escaping ([Element]) throws -> Result) -> SharedSequence<SharingStrategy, Result>
@preconcurrency @MainActor
public static func combineLatest<Collection: Swift.Collection, Result>(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence<SharingStrategy, Result>
where Collection.Element == SharedSequence<SharingStrategy, Element> {
let source = Observable.combineLatest(collection.map { $0.asObservable() }, resultSelector: resultSelector)
return SharedSequence<SharingStrategy, Result>(source)
Expand Down Expand Up @@ -456,9 +469,10 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
- parameter resultSelector: A function to combine the unretained referenced on `obj` and the value of the observable sequence.
- returns: An observable sequence that contains the result of `resultSelector` being called with an unretained reference on `obj` and the values of the original sequence.
*/
@preconcurrency @MainActor
public func withUnretained<Object: AnyObject, Out>(
_ obj: Object,
resultSelector: @escaping (Object, Element) -> Out
resultSelector: @escaping @MainActor (Object, Element) -> Out
) -> SharedSequence<SharingStrategy, Out> {
SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector))
}
Expand Down Expand Up @@ -503,7 +517,8 @@ extension SharedSequenceConvertibleType {
- parameter resultSelector: Function to invoke for each element from the self combined with the latest element from the second source, if any.
- returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function.
*/
public func withLatestFrom<SecondO: SharedSequenceConvertibleType, ResultType>(_ second: SecondO, resultSelector: @escaping (Element, SecondO.Element) -> ResultType) -> SharedSequence<SharingStrategy, ResultType> where SecondO.SharingStrategy == SharingStrategy {
@preconcurrency @MainActor
public func withLatestFrom<SecondO: SharedSequenceConvertibleType, ResultType>(_ second: SecondO, resultSelector: @escaping @MainActor (Element, SecondO.Element) -> ResultType) -> SharedSequence<SharingStrategy, ResultType> where SecondO.SharingStrategy == SharingStrategy {
let source = self.asObservable()
.withLatestFrom(second.asSharedSequence(), resultSelector: resultSelector)

Expand Down
10 changes: 6 additions & 4 deletions RxCocoa/Traits/Signal/Signal+Subscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
gracefully completed, errored, or if the generation is canceled by disposing subscription)
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
@preconcurrency @MainActor
public func emit<Object: AnyObject>(
with object: Object,
onNext: ((Object, Element) -> Void)? = nil,
onCompleted: ((Object) -> Void)? = nil,
onNext: (@MainActor (Object, Element) -> Void)? = nil,
onCompleted: (@MainActor (Object) -> Void)? = nil,
onDisposed: ((Object) -> Void)? = nil
) -> Disposable {
self.asObservable().subscribe(
Expand All @@ -156,9 +157,10 @@ extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingSt
gracefully completed, errored, or if the generation is canceled by disposing subscription)
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
@preconcurrency @MainActor
public func emit(
onNext: ((Element) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onNext: (@MainActor (Element) -> Void)? = nil,
onCompleted: (@MainActor () -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
self.asObservable().subscribe(onNext: onNext, onCompleted: onCompleted, onDisposed: onDisposed)
Expand Down

0 comments on commit 9cee95e

Please sign in to comment.