diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index a4c8c9fb0..f873c09da 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -651,7 +651,6 @@ C89AB1DA1DAAC3350065FBE6 /* Driver.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B11DAAC3350065FBE6 /* Driver.swift */; }; C89AB1DE1DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B21DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift */; }; C89AB1EA1DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B61DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift */; }; - C89AB1F21DAAC3350065FBE6 /* SharedSequence+Operators.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B81DAAC3350065FBE6 /* SharedSequence+Operators.swift */; }; C89AB1F61DAAC3350065FBE6 /* SharedSequence.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1B91DAAC3350065FBE6 /* SharedSequence.swift */; }; C89AB2021DAAC3350065FBE6 /* KVORepresentable+CoreGraphics.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1BD1DAAC3350065FBE6 /* KVORepresentable+CoreGraphics.swift */; }; C89AB2061DAAC3350065FBE6 /* KVORepresentable+Swift.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89AB1BE1DAAC3350065FBE6 /* KVORepresentable+Swift.swift */; }; @@ -789,6 +788,8 @@ CB883B451BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; CD8F7AC527BA9187001574EB /* Infallible+Driver.swift in Sources */ = {isa = PBXBuildFile; fileRef = CD8F7AC427BA9187001574EB /* Infallible+Driver.swift */; }; CDDEF16A1D4FB40000CA8546 /* Disposables.swift in Sources */ = {isa = PBXBuildFile; fileRef = CDDEF1691D4FB40000CA8546 /* Disposables.swift */; }; + D2B78EEC2CCF9F8B0054AB01 /* SharedSequence+Operators.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2B78EEB2CCF9F8B0054AB01 /* SharedSequence+Operators.swift */; }; + D2B78EEE2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2B78EED2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift */; }; D9080ACF1EA05AE0002B433B /* RxNavigationControllerDelegateProxy.swift in Sources */ = {isa = PBXBuildFile; fileRef = D9080ACD1EA05A16002B433B /* RxNavigationControllerDelegateProxy.swift */; }; D9080AD41EA05DE9002B433B /* UINavigationController+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = D9080AD21EA05DDF002B433B /* UINavigationController+Rx.swift */; }; D9080AD81EA06189002B433B /* UINavigationController+RxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D9080AD71EA06189002B433B /* UINavigationController+RxTests.swift */; }; @@ -1353,7 +1354,6 @@ C89AB1B21DAAC3350065FBE6 /* ObservableConvertibleType+Driver.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "ObservableConvertibleType+Driver.swift"; sourceTree = ""; }; C89AB1B61DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators+arity.swift"; sourceTree = ""; }; C89AB1B71DAAC3350065FBE6 /* SharedSequence+Operators+arity.tt */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = "SharedSequence+Operators+arity.tt"; sourceTree = ""; }; - C89AB1B81DAAC3350065FBE6 /* SharedSequence+Operators.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators.swift"; sourceTree = ""; }; C89AB1B91DAAC3350065FBE6 /* SharedSequence.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SharedSequence.swift; sourceTree = ""; }; C89AB1BD1DAAC3350065FBE6 /* KVORepresentable+CoreGraphics.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "KVORepresentable+CoreGraphics.swift"; sourceTree = ""; }; C89AB1BE1DAAC3350065FBE6 /* KVORepresentable+Swift.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "KVORepresentable+Swift.swift"; sourceTree = ""; }; @@ -1453,6 +1453,8 @@ CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; CD8F7AC427BA9187001574EB /* Infallible+Driver.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "Infallible+Driver.swift"; sourceTree = ""; }; CDDEF1691D4FB40000CA8546 /* Disposables.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Disposables.swift; sourceTree = ""; }; + D2B78EEB2CCF9F8B0054AB01 /* SharedSequence+Operators.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators.swift"; sourceTree = ""; }; + D2B78EED2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "SharedSequence+Operators+MainActor.swift"; sourceTree = ""; }; D9080ACD1EA05A16002B433B /* RxNavigationControllerDelegateProxy.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxNavigationControllerDelegateProxy.swift; sourceTree = ""; }; D9080AD21EA05DDF002B433B /* UINavigationController+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationController+Rx.swift"; sourceTree = ""; }; D9080AD71EA06189002B433B /* UINavigationController+RxTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UINavigationController+RxTests.swift"; sourceTree = ""; }; @@ -2346,7 +2348,8 @@ children = ( C89AB1B61DAAC3350065FBE6 /* SharedSequence+Operators+arity.swift */, C89AB1B71DAAC3350065FBE6 /* SharedSequence+Operators+arity.tt */, - C89AB1B81DAAC3350065FBE6 /* SharedSequence+Operators.swift */, + D2B78EEB2CCF9F8B0054AB01 /* SharedSequence+Operators.swift */, + D2B78EED2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift */, DB08833626FB0637005805BE /* SharedSequence+Concurrency.swift */, C89AB1B91DAAC3350065FBE6 /* SharedSequence.swift */, C85E6FBD1F53025700C5681E /* SchedulerType+SharedSequence.swift */, @@ -3045,6 +3048,7 @@ B562478F203515DD00D3EE75 /* RxCollectionViewDataSourcePrefetchingProxy.swift in Sources */, 84E4D3921C9AFD3400ADFDC9 /* UISearchController+Rx.swift in Sources */, C88254341B8A752B00B02D69 /* UITableView+Rx.swift in Sources */, + D2B78EEC2CCF9F8B0054AB01 /* SharedSequence+Operators.swift in Sources */, CD8F7AC527BA9187001574EB /* Infallible+Driver.swift in Sources */, C89AB1A61DAAC25A0065FBE6 /* RxCocoaObjCRuntimeError+Extensions.swift in Sources */, C88254161B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */, @@ -3080,7 +3084,6 @@ C89AB1CA1DAAC3350065FBE6 /* ControlProperty.swift in Sources */, ECBBA59E1DF8C0D400DDDC2E /* RxTabBarControllerDelegateProxy.swift in Sources */, 78F2D93E24C8D35700D13F0C /* RxWKNavigationDelegateProxy.swift in Sources */, - C89AB1F21DAAC3350065FBE6 /* SharedSequence+Operators.swift in Sources */, 9BA1CBD31C0F7D550044B50A /* UIActivityIndicatorView+Rx.swift in Sources */, 842A5A2C1C357F92003568D5 /* NSTextStorage+Rx.swift in Sources */, C88254241B8A752B00B02D69 /* RxTextViewDelegateProxy.swift in Sources */, @@ -3098,6 +3101,7 @@ C89AB2501DAAC3A60065FBE6 /* _RXObjCRuntime.m in Sources */, C89AB21E1DAAC3350065FBE6 /* NSObject+Rx.swift in Sources */, D9080AD41EA05DE9002B433B /* UINavigationController+Rx.swift in Sources */, + D2B78EEE2CCF9FDD0054AB01 /* SharedSequence+Operators+MainActor.swift in Sources */, 88718CFE1CE5D80000D88D60 /* UITabBar+Rx.swift in Sources */, 88D98F2E1CE7549A00D50457 /* RxTabBarDelegateProxy.swift in Sources */, C88254331B8A752B00B02D69 /* UISwitch+Rx.swift in Sources */, diff --git a/RxCocoa/Traits/Driver/Driver.swift b/RxCocoa/Traits/Driver/Driver.swift index 5de8b3a56..0301315d2 100644 --- a/RxCocoa/Traits/Driver/Driver.swift +++ b/RxCocoa/Traits/Driver/Driver.swift @@ -37,7 +37,7 @@ import RxSwift */ public typealias Driver = SharedSequence -public struct DriverSharingStrategy: SharingStrategyProtocol { +public struct DriverSharingStrategy: MainActorSharingStrategyProtocol { public static var scheduler: SchedulerType { SharingScheduler.make() } public static func share(_ source: Observable) -> Observable { source.share(replay: 1, scope: .whileConnected) diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift new file mode 100644 index 000000000..e72a4ebe7 --- /dev/null +++ b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift @@ -0,0 +1,540 @@ +// +// SharedSequence+Operators.swift +// RxCocoa +// +// Created by Krunoslav Zaher on 9/19/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import RxSwift + +// MARK: map +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence into a new form. + + - parameter selector: A transform function to apply to each source element. + - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. + */ + @preconcurrency @MainActor + public func map(_ selector: @escaping @MainActor (Element) -> Result) -> SharedSequence { + let source = self + .asObservable() + .map(selector) + return SharedSequence(source) + } +} + +// MARK: compactMap +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence into an optional form and filters all optional results. + + - parameter selector: A transform function to apply to each source element and which returns an element or nil. + - returns: An observable sequence whose elements are the result of filtering the transform function for each element of the source. + + */ + @preconcurrency @MainActor + public func compactMap(_ selector: @escaping @MainActor (Element) -> Result?) -> SharedSequence { + let source = self + .asObservable() + .compactMap(selector) + return SharedSequence(source) + } +} + +// MARK: filter +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Filters the elements of an observable sequence based on a predicate. + + - parameter predicate: A function to test each source element for a condition. + - returns: An observable sequence that contains elements from the input sequence that satisfy the condition. + */ + @preconcurrency @MainActor + public func filter(_ predicate: @escaping @MainActor (Element) -> Bool) -> SharedSequence { + let source = self + .asObservable() + .filter(predicate) + return SharedSequence(source) + } +} + +// MARK: switchLatest +extension SharedSequenceConvertibleType where Element: SharedSequenceConvertibleType, SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Transforms an observable sequence of observable sequences into an observable sequence + producing values only from the most recent observable sequence. + + Each time a new inner observable sequence is received, unsubscribe from the + previous inner observable sequence. + + - returns: The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received. + */ + public func switchLatest() -> SharedSequence { + let source: Observable = self + .asObservable() + .map { $0.asSharedSequence() } + .switchLatest() + return SharedSequence(source) + } +} + +// MARK: flatMapLatest +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Projects each element of an observable sequence into a new sequence of observable sequences and then + transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. + + It is a combination of `map` + `switchLatest` operator + + - parameter selector: A transform function to apply to each element. + - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an + Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received. + */ + @preconcurrency @MainActor + public func flatMapLatest(_ selector: @escaping @MainActor (Element) -> SharedSequence) + -> SharedSequence { + let source: Observable = self + .asObservable() + .flatMapLatest(selector) + return SharedSequence(source) + } +} + +// MARK: flatMapFirst +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + If element is received while there is some projected observable sequence being merged it will simply be ignored. + + - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. + - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. + */ + @preconcurrency @MainActor + public func flatMapFirst(_ selector: @escaping @MainActor (Element) -> SharedSequence) + -> SharedSequence { + let source: Observable = self + .asObservable() + .flatMapFirst(selector) + return SharedSequence(source) + } +} + +// MARK: do +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Invokes an action for each event in the observable sequence, and propagates all observer messages through the result sequence. + + - parameter onNext: Action to invoke for each element in the observable sequence. + - parameter afterNext: Action to invoke for each element after the observable has passed an onNext event along to its downstream. + - parameter onCompleted: Action to invoke upon graceful termination of the observable sequence. + - parameter afterCompleted: Action to invoke after graceful termination of the observable sequence. + - parameter onSubscribe: Action to invoke before subscribing to source observable sequence. + - parameter onSubscribed: Action to invoke after subscribing to source observable sequence. + - parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed. + - returns: The source sequence with the side-effecting behavior applied. + */ + @preconcurrency @MainActor + public func `do`(onNext: (@MainActor (Element) -> Void)? = nil, afterNext: (@MainActor (Element) -> Void)? = nil, onCompleted: (@MainActor () -> Void)? = nil, afterCompleted: ( @MainActor () -> Void)? = nil, onSubscribe: (@MainActor () -> Void)? = nil, onSubscribed: (@MainActor () -> Void)? = nil, onDispose: (() -> Void)? = nil) + -> SharedSequence { + let source = self.asObservable() + .do(onNext: onNext, afterNext: afterNext, onCompleted: onCompleted, afterCompleted: afterCompleted, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose) + + return SharedSequence(source) + } +} + +// MARK: debug +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Prints received events for all observers on standard output. + + - parameter identifier: Identifier that is printed together with event description to standard output. + - returns: An observable sequence whose events are printed to standard output. + */ + public func debug(_ identifier: String? = nil, trimOutput: Bool = false, file: String = #file, line: UInt = #line, function: String = #function) -> SharedSequence { + let source = self.asObservable() + .debug(identifier, trimOutput: trimOutput, file: file, line: line, function: function) + return SharedSequence(source) + } +} + +// MARK: distinctUntilChanged +extension SharedSequenceConvertibleType where Element: Equatable, SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Returns an observable sequence that contains only distinct contiguous elements according to equality operator. + + - returns: An observable sequence only containing the distinct contiguous elements, based on equality operator, from the source sequence. + */ + public func distinctUntilChanged() + -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged({ $0 }, comparer: { ($0 == $1) }) + + return SharedSequence(source) + } +} + +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Returns an observable sequence that contains only distinct contiguous elements according to the `keySelector`. + + - parameter keySelector: A function to compute the comparison key for each element. + - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence. + */ + @preconcurrency @MainActor + public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> Key) -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged(keySelector, comparer: { $0 == $1 }) + return SharedSequence(source) + } + + /** + Returns an observable sequence that contains only distinct contiguous elements according to the `comparer`. + + - parameter comparer: Equality comparer for computed key values. + - returns: An observable sequence only containing the distinct contiguous elements, based on `comparer`, from the source sequence. + */ + @preconcurrency @MainActor + public func distinctUntilChanged(_ comparer: @escaping @MainActor (Element, Element) -> Bool) -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged({ $0 }, comparer: comparer) + return SharedSequence(source) + } + + /** + Returns an observable sequence that contains only distinct contiguous elements according to the keySelector and the comparer. + + - parameter keySelector: A function to compute the comparison key for each element. + - parameter comparer: Equality comparer for computed key values. + - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value and the comparer, from the source sequence. + */ + @preconcurrency @MainActor + public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { + let source = self.asObservable() + .distinctUntilChanged(keySelector, comparer: comparer) + return SharedSequence(source) + } +} + + +// MARK: flatMap +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + + - parameter selector: A transform function to apply to each element. + - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. + */ + @preconcurrency @MainActor + public func flatMap(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { + let source = self.asObservable() + .flatMap(selector) + + return SharedSequence(source) + } +} + +// MARK: merge +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges elements from all observable sequences from collection into a single observable sequence. + + - seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html) + + - parameter sources: Collection of observable sequences to merge. + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public static func merge(_ sources: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.merge(sources.map { $0.asObservable() }) + return SharedSequence(source) + } + + /** + Merges elements from all observable sequences from array into a single observable sequence. + + - seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html) + + - parameter sources: Array of observable sequences to merge. + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public static func merge(_ sources: [SharedSequence]) -> SharedSequence { + let source = Observable.merge(sources.map { $0.asObservable() }) + return SharedSequence(source) + } + + /** + Merges elements from all observable sequences into a single observable sequence. + + - seealso: [merge operator on reactivex.io](http://reactivex.io/documentation/operators/merge.html) + + - parameter sources: Collection of observable sequences to merge. + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public static func merge(_ sources: SharedSequence...) -> SharedSequence { + let source = Observable.merge(sources.map { $0.asObservable() }) + return SharedSequence(source) + } + +} + +// MARK: merge +extension SharedSequenceConvertibleType where Element: SharedSequenceConvertibleType, SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence. + + - returns: The observable sequence that merges the elements of the observable sequences. + */ + public func merge() -> SharedSequence { + let source = self.asObservable() + .map { $0.asSharedSequence() } + .merge() + return SharedSequence(source) + } + + /** + Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences. + + - parameter maxConcurrent: Maximum number of inner observable sequences being subscribed to concurrently. + - returns: The observable sequence that merges the elements of the inner sequences. + */ + public func merge(maxConcurrent: Int) + -> SharedSequence { + let source = self.asObservable() + .map { $0.asSharedSequence() } + .merge(maxConcurrent: maxConcurrent) + return SharedSequence(source) + } +} + +// MARK: throttle +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Returns an Observable that emits the first and the latest item emitted by the source Observable during sequential time windows of a specified duration. + + This operator makes sure that no two elements are emitted in less then dueTime. + + - seealso: [debounce operator on reactivex.io](http://reactivex.io/documentation/operators/debounce.html) + + - parameter dueTime: Throttling duration for each element. + - parameter latest: Should latest element received in a dueTime wide time window since last element emission be emitted. + - returns: The throttled sequence. + */ + public func throttle(_ dueTime: RxTimeInterval, latest: Bool = true) + -> SharedSequence { + let source = self.asObservable() + .throttle(dueTime, latest: latest, scheduler: SharingStrategy.scheduler) + + return SharedSequence(source) + } + + /** + Ignores elements from an observable sequence which are followed by another element within a specified relative time duration, using the specified scheduler to run throttling timers. + + - parameter dueTime: Throttling duration for each element. + - returns: The throttled sequence. + */ + public func debounce(_ dueTime: RxTimeInterval) + -> SharedSequence { + let source = self.asObservable() + .debounce(dueTime, scheduler: SharingStrategy.scheduler) + + return SharedSequence(source) + } +} + +// MARK: scan +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Applies an accumulator function over an observable sequence and returns each intermediate result. The specified seed value is used as the initial accumulator value. + + For aggregation behavior with no intermediate results, see `reduce`. + + - parameter seed: The initial accumulator value. + - parameter accumulator: An accumulator function to be invoked on each element. + - returns: An observable sequence containing the accumulated values. + */ + @preconcurrency @MainActor + public func scan(_ seed: A, accumulator: @escaping @MainActor (A, Element) -> A) + -> SharedSequence { + let source = self.asObservable() + .scan(seed, accumulator: accumulator) + return SharedSequence(source) + } +} + +// MARK: concat + +extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully. + + - returns: An observable sequence that contains the elements of each given sequence, in sequential order. + */ + public static func concat(_ sequence: Sequence) -> SharedSequence + where Sequence.Element == SharedSequence { + let source = Observable.concat(sequence.lazy.map { $0.asObservable() }) + return SharedSequence(source) + } + + /** + Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully. + + - returns: An observable sequence that contains the elements of each given sequence, in sequential order. + */ + public static func concat(_ collection: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.concat(collection.map { $0.asObservable() }) + return SharedSequence(source) + } +} + +// MARK: zip + +extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index. + + - parameter resultSelector: Function to invoke for each series of elements at corresponding indexes in the sources. + - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. + */ + @preconcurrency @MainActor + public static func zip(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }, resultSelector: resultSelector) + return SharedSequence(source) + } + + /** + Merges the specified observable sequences into one observable sequence all of the observable sequences have produced an element at a corresponding index. + + - returns: An observable sequence containing the result of combining elements of the sources. + */ + public static func zip(_ collection: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }) + return SharedSequence(source) + } +} + +// MARK: combineLatest + +extension SharedSequence where SharingStrategy: MainActorSharingStrategyProtocol { + /** + Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element. + + - parameter resultSelector: Function to invoke whenever any of the sources produces an element. + - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. + */ + @preconcurrency @MainActor + public static func combineLatest(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.combineLatest(collection.map { $0.asObservable() }, resultSelector: resultSelector) + return SharedSequence(source) + } + + /** + Merges the specified observable sequences into one observable sequence whenever any of the observable sequences produces an element. + + - returns: An observable sequence containing the result of combining elements of the sources. + */ + public static func combineLatest(_ collection: Collection) -> SharedSequence + where Collection.Element == SharedSequence { + let source = Observable.combineLatest(collection.map { $0.asObservable() }) + return SharedSequence(source) + } +} + +// MARK: - withUnretained +extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingStrategy { + /** + Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. + + In the case the provided object cannot be retained successfully, the sequence will complete. + + - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. + + - parameter obj: The object to provide an unretained reference on. + - parameter resultSelector: A function to combine the unretained referenced on `obj` and the value of the observable sequence. + - returns: An observable sequence that contains the result of `resultSelector` being called with an unretained reference on `obj` and the values of the original sequence. + */ + @preconcurrency @MainActor + public func withUnretained( + _ obj: Object, + resultSelector: @escaping @MainActor (Object, Element) -> Out + ) -> SharedSequence { + SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) + } + + /** + Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. + + In the case the provided object cannot be retained successfully, the sequence will complete. + + - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. + + - parameter obj: The object to provide an unretained reference on. + - returns: An observable sequence of tuples that contains both an unretained reference on `obj` and the values of the original sequence. + */ + public func withUnretained(_ obj: Object) -> SharedSequence { + withUnretained(obj) { ($0, $1) } + } +} + +extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy { + @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) + public func withUnretained( + _ obj: Object, + resultSelector: @escaping (Object, Element) -> Out + ) -> SharedSequence { + SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) + } + + @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) + public func withUnretained(_ obj: Object) -> SharedSequence { + SharedSequence(self.asObservable().withUnretained(obj) { ($0, $1) }) + } +} + +// MARK: withLatestFrom +extension SharedSequenceConvertibleType where SharingStrategy: MainActorSharingStrategyProtocol { + + /** + Merges two observable sequences into one observable sequence by combining each element from self with the latest element from the second source, if any. + + - parameter second: Second observable source. + - parameter resultSelector: Function to invoke for each element from the self combined with the latest element from the second source, if any. + - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. + */ + @preconcurrency @MainActor + public func withLatestFrom(_ second: SecondO, resultSelector: @escaping @MainActor (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { + let source = self.asObservable() + .withLatestFrom(second.asSharedSequence(), resultSelector: resultSelector) + + return SharedSequence(source) + } + + /** + Merges two observable sequences into one observable sequence by using latest element from the second sequence every time when `self` emits an element. + + - parameter second: Second observable source. + - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. + */ + public func withLatestFrom(_ second: SecondO) -> SharedSequence { + let source = self.asObservable() + .withLatestFrom(second.asSharedSequence()) + + return SharedSequence(source) + } +} diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift index cbfafaae6..7a207826f 100644 --- a/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift +++ b/RxCocoa/Traits/SharedSequence/SharedSequence+Operators.swift @@ -17,8 +17,7 @@ extension SharedSequenceConvertibleType { - parameter selector: A transform function to apply to each source element. - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source. */ - @preconcurrency @MainActor - public func map(_ selector: @escaping @MainActor (Element) -> Result) -> SharedSequence { + public func map(_ selector: @escaping (Element) -> Result) -> SharedSequence { let source = self .asObservable() .map(selector) @@ -36,8 +35,7 @@ extension SharedSequenceConvertibleType { - returns: An observable sequence whose elements are the result of filtering the transform function for each element of the source. */ - @preconcurrency @MainActor - public func compactMap(_ selector: @escaping @MainActor (Element) -> Result?) -> SharedSequence { + public func compactMap(_ selector: @escaping (Element) -> Result?) -> SharedSequence { let source = self .asObservable() .compactMap(selector) @@ -53,8 +51,7 @@ extension SharedSequenceConvertibleType { - parameter predicate: A function to test each source element for a condition. - returns: An observable sequence that contains elements from the input sequence that satisfy the condition. */ - @preconcurrency @MainActor - public func filter(_ predicate: @escaping @MainActor (Element) -> Bool) -> SharedSequence { + public func filter(_ predicate: @escaping (Element) -> Bool) -> SharedSequence { let source = self .asObservable() .filter(predicate) @@ -95,8 +92,7 @@ extension SharedSequenceConvertibleType { - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source producing an Observable of Observable sequences and that at any point in time produces the elements of the most recent inner observable sequence that has been received. */ - @preconcurrency @MainActor - public func flatMapLatest(_ selector: @escaping @MainActor (Element) -> SharedSequence) + public func flatMapLatest(_ selector: @escaping (Element) -> SharedSequence) -> SharedSequence { let source: Observable = self .asObservable() @@ -115,8 +111,7 @@ extension SharedSequenceConvertibleType { - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. */ - @preconcurrency @MainActor - public func flatMapFirst(_ selector: @escaping @MainActor (Element) -> SharedSequence) + public func flatMapFirst(_ selector: @escaping (Element) -> SharedSequence) -> SharedSequence { let source: Observable = self .asObservable() @@ -139,8 +134,7 @@ extension SharedSequenceConvertibleType { - parameter onDispose: Action to invoke after subscription to source observable has been disposed for any reason. It can be either because sequence terminates for some reason or observer subscription being disposed. - returns: The source sequence with the side-effecting behavior applied. */ - @preconcurrency @MainActor - public func `do`(onNext: (@MainActor (Element) -> Void)? = nil, afterNext: (@MainActor (Element) -> Void)? = nil, onCompleted: (@MainActor () -> Void)? = nil, afterCompleted: ( @MainActor () -> Void)? = nil, onSubscribe: (@MainActor () -> Void)? = nil, onSubscribed: (@MainActor () -> Void)? = nil, onDispose: (() -> Void)? = nil) + public func `do`(onNext: ((Element) -> Void)? = nil, afterNext: ((Element) -> Void)? = nil, onCompleted: (() -> Void)? = nil, afterCompleted: (() -> Void)? = nil, onSubscribe: (() -> Void)? = nil, onSubscribed: (() -> Void)? = nil, onDispose: (() -> Void)? = nil) -> SharedSequence { let source = self.asObservable() .do(onNext: onNext, afterNext: afterNext, onCompleted: onCompleted, afterCompleted: afterCompleted, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose) @@ -190,8 +184,7 @@ extension SharedSequenceConvertibleType { - parameter keySelector: A function to compute the comparison key for each element. - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value, from the source sequence. */ - @preconcurrency @MainActor - public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> Key) -> SharedSequence { + public func distinctUntilChanged(_ keySelector: @escaping (Element) -> Key) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged(keySelector, comparer: { $0 == $1 }) return SharedSequence(source) @@ -203,8 +196,7 @@ extension SharedSequenceConvertibleType { - parameter comparer: Equality comparer for computed key values. - returns: An observable sequence only containing the distinct contiguous elements, based on `comparer`, from the source sequence. */ - @preconcurrency @MainActor - public func distinctUntilChanged(_ comparer: @escaping @MainActor (Element, Element) -> Bool) -> SharedSequence { + public func distinctUntilChanged(_ comparer: @escaping (Element, Element) -> Bool) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged({ $0 }, comparer: comparer) return SharedSequence(source) @@ -217,8 +209,7 @@ extension SharedSequenceConvertibleType { - parameter comparer: Equality comparer for computed key values. - returns: An observable sequence only containing the distinct contiguous elements, based on a computed key value and the comparer, from the source sequence. */ - @preconcurrency @MainActor - public func distinctUntilChanged(_ keySelector: @escaping @MainActor (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { + public func distinctUntilChanged(_ keySelector: @escaping (Element) -> K, comparer: @escaping (K, K) -> Bool) -> SharedSequence { let source = self.asObservable() .distinctUntilChanged(keySelector, comparer: comparer) return SharedSequence(source) @@ -235,8 +226,7 @@ extension SharedSequenceConvertibleType { - parameter selector: A transform function to apply to each element. - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. */ - @preconcurrency @MainActor - public func flatMap(_ selector: @escaping @MainActor (Element) -> SharedSequence) -> SharedSequence { + public func flatMap(_ selector: @escaping (Element) -> SharedSequence) -> SharedSequence { let source = self.asObservable() .flatMap(selector) @@ -365,8 +355,7 @@ extension SharedSequenceConvertibleType { - parameter accumulator: An accumulator function to be invoked on each element. - returns: An observable sequence containing the accumulated values. */ - @preconcurrency @MainActor - public func scan(_ seed: A, accumulator: @escaping @MainActor (A, Element) -> A) + public func scan(_ seed: A, accumulator: @escaping (A, Element) -> A) -> SharedSequence { let source = self.asObservable() .scan(seed, accumulator: accumulator) @@ -409,8 +398,7 @@ extension SharedSequence { - parameter resultSelector: Function to invoke for each series of elements at corresponding indexes in the sources. - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. */ - @preconcurrency @MainActor - public static func zip(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + public static func zip(_ collection: Collection, resultSelector: @escaping ([Element]) throws -> Result) -> SharedSequence where Collection.Element == SharedSequence { let source = Observable.zip(collection.map { $0.asSharedSequence().asObservable() }, resultSelector: resultSelector) return SharedSequence(source) @@ -437,8 +425,7 @@ extension SharedSequence { - parameter resultSelector: Function to invoke whenever any of the sources produces an element. - returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function. */ - @preconcurrency @MainActor - public static func combineLatest(_ collection: Collection, resultSelector: @escaping @MainActor ([Element]) throws -> Result) -> SharedSequence + public static func combineLatest(_ collection: Collection, resultSelector: @escaping ([Element]) throws -> Result) -> SharedSequence where Collection.Element == SharedSequence { let source = Observable.combineLatest(collection.map { $0.asObservable() }, resultSelector: resultSelector) return SharedSequence(source) @@ -456,57 +443,6 @@ extension SharedSequence { } } -// MARK: - withUnretained -extension SharedSequenceConvertibleType where SharingStrategy == SignalSharingStrategy { - /** - Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. - - In the case the provided object cannot be retained successfully, the sequence will complete. - - - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. - - - parameter obj: The object to provide an unretained reference on. - - parameter resultSelector: A function to combine the unretained referenced on `obj` and the value of the observable sequence. - - returns: An observable sequence that contains the result of `resultSelector` being called with an unretained reference on `obj` and the values of the original sequence. - */ - @preconcurrency @MainActor - public func withUnretained( - _ obj: Object, - resultSelector: @escaping @MainActor (Object, Element) -> Out - ) -> SharedSequence { - SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) - } - - /** - Provides an unretained, safe to use (i.e. not implicitly unwrapped), reference to an object along with the events emitted by the sequence. - - In the case the provided object cannot be retained successfully, the sequence will complete. - - - note: Be careful when using this operator in a sequence that has a buffer or replay, for example `share(replay: 1)`, as the sharing buffer will also include the provided object, which could potentially cause a retain cycle. - - - parameter obj: The object to provide an unretained reference on. - - returns: An observable sequence of tuples that contains both an unretained reference on `obj` and the values of the original sequence. - */ - public func withUnretained(_ obj: Object) -> SharedSequence { - withUnretained(obj) { ($0, $1) } - } -} - -extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy { - @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) - public func withUnretained( - _ obj: Object, - resultSelector: @escaping (Object, Element) -> Out - ) -> SharedSequence { - SharedSequence(self.asObservable().withUnretained(obj, resultSelector: resultSelector)) - } - - @available(*, message: "withUnretained has been deprecated for Driver. Consider using `drive(with:onNext:onCompleted:onDisposed:)`, instead", unavailable) - public func withUnretained(_ obj: Object) -> SharedSequence { - SharedSequence(self.asObservable().withUnretained(obj) { ($0, $1) }) - } -} - // MARK: withLatestFrom extension SharedSequenceConvertibleType { @@ -517,8 +453,7 @@ extension SharedSequenceConvertibleType { - parameter resultSelector: Function to invoke for each element from the self combined with the latest element from the second source, if any. - returns: An observable sequence containing the result of combining each element of the self with the latest element from the second source, if any, using the specified result selector function. */ - @preconcurrency @MainActor - public func withLatestFrom(_ second: SecondO, resultSelector: @escaping @MainActor (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { + public func withLatestFrom(_ second: SecondO, resultSelector: @escaping (Element, SecondO.Element) -> ResultType) -> SharedSequence where SecondO.SharingStrategy == SharingStrategy { let source = self.asObservable() .withLatestFrom(second.asSharedSequence(), resultSelector: resultSelector) diff --git a/RxCocoa/Traits/SharedSequence/SharedSequence.swift b/RxCocoa/Traits/SharedSequence/SharedSequence.swift index 4596c8ec0..b86ff97c1 100644 --- a/RxCocoa/Traits/SharedSequence/SharedSequence.swift +++ b/RxCocoa/Traits/SharedSequence/SharedSequence.swift @@ -81,6 +81,11 @@ public protocol SharingStrategyProtocol { static func share(_ source: Observable) -> Observable } +/** + A marker protocol for all sharing strategies, which are guaranteed to run on the main thread. + */ +public protocol MainActorSharingStrategyProtocol: SharingStrategyProtocol {} + /** A type that can be converted to `SharedSequence`. */ diff --git a/RxCocoa/Traits/Signal/Signal.swift b/RxCocoa/Traits/Signal/Signal.swift index e066b7ec2..db2153241 100644 --- a/RxCocoa/Traits/Signal/Signal.swift +++ b/RxCocoa/Traits/Signal/Signal.swift @@ -29,7 +29,7 @@ import RxSwift */ public typealias Signal = SharedSequence -public struct SignalSharingStrategy: SharingStrategyProtocol { +public struct SignalSharingStrategy: MainActorSharingStrategyProtocol { public static var scheduler: SchedulerType { SharingScheduler.make() } public static func share(_ source: Observable) -> Observable { diff --git a/Sources/RxCocoa/SharedSequence+Operators+MainActor.swift b/Sources/RxCocoa/SharedSequence+Operators+MainActor.swift new file mode 120000 index 000000000..49285c88e --- /dev/null +++ b/Sources/RxCocoa/SharedSequence+Operators+MainActor.swift @@ -0,0 +1 @@ +../../RxCocoa/Traits/SharedSequence/SharedSequence+Operators+MainActor.swift \ No newline at end of file diff --git a/Tests/RxCocoaTests/SharedSequence+Test.swift b/Tests/RxCocoaTests/SharedSequence+Test.swift index e2406f477..afd3095bd 100644 --- a/Tests/RxCocoaTests/SharedSequence+Test.swift +++ b/Tests/RxCocoaTests/SharedSequence+Test.swift @@ -7,10 +7,10 @@ // import Dispatch -import RxSwift import RxCocoa -import XCTest +import RxSwift import RxTest +import XCTest class SharedSequenceTest: RxTest { var backgroundScheduler = SerialDispatchQueueScheduler(qos: .default) @@ -26,11 +26,15 @@ class SharedSequenceTest: RxTest { // * events are observed on main thread - observe(on:MainScheduler.instance) // * it can't error out - it needs to have catch somewhere extension SharedSequenceTest { - func subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(_ xs: SharedSequence, expectationFulfilled: @escaping (Result) -> Bool = { _ in false }, subscribedOnBackground: () -> Void) -> [Result] { + func subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription( + _ xs: SharedSequence, + expectationFulfilled: @escaping (Result) -> Bool = { _ in false }, + subscribedOnBackground: () -> Void + ) -> [Result] { var firstElements = [Result]() var secondElements = [Result]() - let subscribeFinished = self.expectation(description: "subscribeFinished") + let subscribeFinished = expectation(description: "subscribeFinished") var expectation1: XCTestExpectation! var expectation2: XCTestExpectation! @@ -43,13 +47,13 @@ extension SharedSequenceTest { XCTAssertTrue(DispatchQueue.isMain) } switch e { - case .next(let element): + case let .next(element): firstElements.append(element) if expectationFulfilled(element) { expectation1.fulfill() firstSubscriptionFuture.dispose() } - case .error(let error): + case let .error(error): XCTFail("Error passed \(error)") case .completed: expectation1.fulfill() @@ -65,13 +69,13 @@ extension SharedSequenceTest { XCTAssertTrue(DispatchQueue.isMain) } switch e { - case .next(let element): + case let .next(element): secondElements.append(element) if expectationFulfilled(element) { expectation2.fulfill() secondSubscriptionFuture.dispose() } - case .error(let error): + case let .error(error): XCTFail("Error passed \(error)") case .completed: expectation2.fulfill() @@ -96,15 +100,72 @@ extension SharedSequenceTest { XCTAssertTrue(error == nil) } - expectation1 = self.expectation(description: "finished1") - expectation2 = self.expectation(description: "finished2") + expectation1 = expectation(description: "finished1") + expectation2 = expectation(description: "finished2") subscribedOnBackground() waitForExpectations(timeout: 1.0) { error in XCTAssertTrue(error == nil) } - + return firstElements } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func testDriverWorksOnMainActor() async { + for await value in await Observable.just(1) + .observe(on: ConcurrentDispatchQueueScheduler(qos: .default)) + .asDriver(onErrorDriveWith: .empty()) + .map({ @MainActor one in + MainActor.shared.assertIsolated() + return one + 1 + }) + .values { + XCTAssertEqual(value, 2) + } + } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func testSignalWorksOnMainActor() async { + for await value in await Observable.just(1) + .observe(on: ConcurrentDispatchQueueScheduler(qos: .default)) + .asSignal(onErrorSignalWith: .empty()) + .map({ @MainActor one in + MainActor.shared.assertIsolated() + return one + 1 + }) + .values { + XCTAssertEqual(value, 2) + } + } + + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + func testBackgroundSharingSequence() async { + func testBackgroundSharingSequence() async { + for await value in await Observable.just(1) + .asSharedSequence( + sharingStrategy: BackgroundSharingStrategy.self, + onErrorRecover: { _ in .empty() }) + .map({ one in + if Thread.isMainThread { + return 0 + } + return one + 1 + }) + .values { + XCTAssertEqual(value, 2) + } + } + } } + +private struct BackgroundSharingStrategy: SharingStrategyProtocol { + public static var scheduler: SchedulerType { ConcurrentDispatchQueueScheduler(qos: .default) } + + public static func share(_ source: Observable) -> Observable { + source.share(scope: .whileConnected) + } +} + +private typealias TestSequence = SharedSequence