diff --git a/src/rpp/rpp/operators/fwd/filter.hpp b/src/rpp/rpp/operators/fwd/filter.hpp index 5519cb5de..75c9c43d7 100644 --- a/src/rpp/rpp/operators/fwd/filter.hpp +++ b/src/rpp/rpp/operators/fwd/filter.hpp @@ -47,7 +47,7 @@ struct member_overload * - On subscribe * - None * - OnNext - * - Just forwards emission of predicate returns true + * - Just forwards emission when predicate returns true * - OnError * - Just forwards original on_error * - OnCompleted diff --git a/src/rpp/rpp/operators/fwd/scan.hpp b/src/rpp/rpp/operators/fwd/scan.hpp index c6fff707e..eb62a4208 100644 --- a/src/rpp/rpp/operators/fwd/scan.hpp +++ b/src/rpp/rpp/operators/fwd/scan.hpp @@ -29,28 +29,42 @@ struct scan_impl; template struct member_overload { - /** - * \brief Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value - * - * \marble scan - { - source observable : +--1-2-3-| - operator "scan: s=1, (s,x)=>s+x" : +--2-4-7-| - } - * - * \param initial_value initial value for seed which will be applied for first value from observable (instead of emitting this as first value). Then it will be replaced with result and etc. - * \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference. - * - * \return new specific_observable with the scan operator as most recent operator. - * \warning #include - * - * \par Example - * \snippet scan.cpp scan - * \snippet scan.cpp scan_vector - * - * \ingroup transforming_operators - * \see https://reactivex.io/documentation/operators/scan.html - */ + /** + * \brief Apply accumulator function for each emission from observable and result of accumulator from previous step and emit (and cache) resulting value + * + * \marble scan + { + source observable : +--1-2-3-| + operator "scan: s=1, (s,x)=>s+x" : +--2-4-7-| + } + * + * \details Acttually this operator applies provided accumulator function to seed and new emission, emits resulting value and updates seed value for next emission + * + * \param initial_value initial value for seed which will be applied for first value from observable (instead of emitting this as first value). Then it will be replaced with result and etc. + * \param accumulator function which accepts seed value and new value from observable and return new value of seed. Can accept seed by move-reference. + * + * \return new specific_observable with the scan operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet scan.cpp scan + * \snippet scan.cpp scan_vector + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store seed + * - OnNext + * - Applies accumulator to each emission + * - Updates seed value + * - Emits new seed value + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/scan.html + */ template AccumulatorFn> auto scan(Result&& initial_value, AccumulatorFn&& accumulator) const & requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/skip.hpp b/src/rpp/rpp/operators/fwd/skip.hpp index 172478438..ffc9b685f 100644 --- a/src/rpp/rpp/operators/fwd/skip.hpp +++ b/src/rpp/rpp/operators/fwd/skip.hpp @@ -19,40 +19,54 @@ namespace rpp::details namespace rpp::details { - template - struct skip_impl; +template +struct skip_impl; - template - struct member_overload +template +struct member_overload +{ + /** + * \brief Skip first `count` items provided by observable then send rest items as expected + * + * \marble skip + { + source observable : +--1-2-3-4-5-6-| + operator "skip(3)" : +--------4-5-6-| + } + * + * \details Actually this operator just decrements counter and starts to forward emissions when counter reaches zero. + * + * \param count amount of items to be skipped + * \return new specific_observable with the skip operator as most recent operator. + * \warning #include + * + * \par Example: + * \snippet skip.cpp skip + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store counter + * - OnNext + * - Forwards emission if counter is zero + * - Decrements counter if not zero + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/skip.html + */ + template + auto skip(size_t count) const& requires is_header_included { - /** - * \brief Skip first `count` items provided by observable then send rest items as expected - * - * \marble skip - { - source observable : +--1-2-3-4-5-6-| - operator "skip(3)" : +--------4-5-6-| - } - * \param count amount of items to be skipped - * \return new specific_observable with the skip operator as most recent operator. - * \warning #include - * - * \par Example: - * \snippet skip.cpp skip - * - * \ingroup filtering_operators - * \see https://reactivex.io/documentation/operators/skip.html - */ - template - auto skip(size_t count) const& requires is_header_included - { - return static_cast(this)->template lift(skip_impl{count}); - } + return static_cast(this)->template lift(skip_impl{count}); + } - template - auto skip(size_t count) && requires is_header_included - { - return std::move(*static_cast(this)).template lift(skip_impl{count}); - } - }; + template + auto skip(size_t count) && requires is_header_included + { + return std::move(*static_cast(this)).template lift(skip_impl{count}); + } +}; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/start_with.hpp b/src/rpp/rpp/operators/fwd/start_with.hpp index 3ea065499..4025cfaf7 100644 --- a/src/rpp/rpp/operators/fwd/start_with.hpp +++ b/src/rpp/rpp/operators/fwd/start_with.hpp @@ -39,7 +39,7 @@ struct member_overload operator "start_with(1,2,3)" : +-1-2-3--4--6-| } * - * \details Actually it makes concat but arguments passed before current observable + * \details Actually it makes concat(rpp::source::just(vals_to_start_with)..., current_observable) so observables from argument subscribed before current observable * * \tparam memory_model memory_model strategy used to store provided values * \param vals list of values which should be emitted before current observable @@ -74,7 +74,7 @@ struct member_overload operator "start_with(-1-2-3-|)" : +--1-2-3--4--6-| } * - * \details Actually it makes concat but arguments passed before current observable + * \details Actually it makes concat(observables_to_start_with..., current_observable) so observables from argument subscribed before current observable * * \param observables list of observables which should be used before current observable * diff --git a/src/rpp/rpp/operators/fwd/subscribe_on.hpp b/src/rpp/rpp/operators/fwd/subscribe_on.hpp index fc9ba2174..adb719c79 100644 --- a/src/rpp/rpp/operators/fwd/subscribe_on.hpp +++ b/src/rpp/rpp/operators/fwd/subscribe_on.hpp @@ -29,6 +29,8 @@ struct member_overload { /** * \brief OnSubscribe function for this observable will be scheduled via provided scheduler + * + * \details Actually this operator just schedules subscription on original observable to provided scheduler * * \param scheduler is scheduler used for scheduling of OnSubscribe * \return new specific_observable with the subscribe_on operator as most recent operator. diff --git a/src/rpp/rpp/operators/fwd/switch_on_next.hpp b/src/rpp/rpp/operators/fwd/switch_on_next.hpp index 86e1f6118..e15fcc82a 100644 --- a/src/rpp/rpp/operators/fwd/switch_on_next.hpp +++ b/src/rpp/rpp/operators/fwd/switch_on_next.hpp @@ -20,45 +20,58 @@ namespace rpp::details namespace rpp::details { - template - struct switch_on_next_impl; +template +struct switch_on_next_impl; - template - struct member_overload - { - /** - * \brief Converts observable of observables into observable of values which emits values from most recent underlying observable till new observable obtained - * - * \marble switch_on_next - { - source observable : - { - +--1-2-3-5--| - .....+4--6-9| - .......+7-8-| - } - operator "switch_on_next" : +--1-24-7-8| - } - * - * \return new specific_observable with the switch_on_next operator as most recent operator. - * \warning #include - * - * \par Example: - * \snippet switch_on_next.cpp switch_on_next - * - * \ingroup combining_operators - * \see https://reactivex.io/documentation/operators/switch.html - */ - template - auto switch_on_next() const& requires (is_header_included&& rpp::constraint::observable) +template +struct member_overload +{ + /** + * \brief Converts observable of observables into observable of values which emits values from most recent underlying observable till new observable obtained + * + * \marble switch_on_next { - return static_cast(this)->template lift>(switch_on_next_impl()); + source observable : + { + +--1-2-3-5--| + .....+4--6-9| + .......+7-8-| } + operator "switch_on_next" : +--1-24-7-8| + } + * + * \details Actually this operator just unsubscribes from previous observable and subscribes on new observable when obtained in `on_next` + * + * \return new specific_observable with the switch_on_next operator as most recent operator. + * \warning #include + * + * \par Example: + * \snippet switch_on_next.cpp switch_on_next + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store internal state + * - OnNext + * - Unsubscribed from previous observable (if any) + * - Subscribed on new emitted observable + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed if no any active inner observable or original observable yet + * + * \ingroup combining_operators + * \see https://reactivex.io/documentation/operators/switch.html + */ + template + auto switch_on_next() const& requires (is_header_included&& rpp::constraint::observable) + { + return static_cast(this)->template lift>(switch_on_next_impl()); + } - template - auto switch_on_next() && requires (is_header_included&& rpp::constraint::observable) - { - return std::move(*static_cast(this)).template lift>(switch_on_next_impl()); - } - }; + template + auto switch_on_next() && requires (is_header_included&& rpp::constraint::observable) + { + return std::move(*static_cast(this)).template lift>(switch_on_next_impl()); + } +}; } // namespace rpp::details diff --git a/src/rpp/rpp/operators/fwd/take.hpp b/src/rpp/rpp/operators/fwd/take.hpp index 7e36b076b..f52c808b2 100644 --- a/src/rpp/rpp/operators/fwd/take.hpp +++ b/src/rpp/rpp/operators/fwd/take.hpp @@ -25,24 +25,38 @@ struct take_impl; template struct member_overload { - /** - * \brief Emit only first `count` items provided by observable, then send `on_completed` - * - * \marble take - { - source observable : +--1-2-3-4-5-6-| - operator "take(3)" : +--1-2-3| - } - * \param count amount of items to be emitted. 0 - instant complete - * \return new specific_observable with the Take operator as most recent operator. - * \warning #include - * - * \par Example: - * \snippet take.cpp take - * - * \ingroup filtering_operators - * \see https://reactivex.io/documentation/operators/take.html - */ + /** + * \brief Emit only first `count` items provided by observable, then send `on_completed` + * + * \marble take + { + source observable : +--1-2-3-4-5-6-| + operator "take(3)" : +--1-2-3| + } + * \details Actually this operator just emits emissions while counter is not zero and decrements counter on each emission + * + * \param count amount of items to be emitted. 0 - instant complete + * \return new specific_observable with the Take operator as most recent operator. + * \warning #include + * + * \par Example: + * \snippet take.cpp take + * + * \par Implementation details: + * - On subscribe + * - Allocate one `shared_ptr` to store counter + * - OnNext + * - Just forwards emission if counter is not zero + * - Decrements counter if not zero + * - If counter reached zero, then emits OnCompleted + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/take.html + */ template auto take(size_t count) const & requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/take_last.hpp b/src/rpp/rpp/operators/fwd/take_last.hpp index aaf5749da..103e18400 100644 --- a/src/rpp/rpp/operators/fwd/take_last.hpp +++ b/src/rpp/rpp/operators/fwd/take_last.hpp @@ -25,25 +25,38 @@ struct take_last_impl; template struct member_overload { - /** - * \brief Emit only last `count` items provided by observable, then send `on_completed` - * - * \marble take_last - { - source observable : +--1-2-3-4-5-6-| - operator "take_last(3)" : +--------------456| - } - * - * \param count amount of last items to be emitted - * \return new specific_observable with the take_last operator as most recent operator. - * \warning #include - * - * \par Example - * \snippet take_last.cpp take_last - * - * \ingroup filtering_operators - * \see https://reactivex.io/documentation/operators/takelast.html - */ + /** + * \brief Emit only last `count` items provided by observable, then send `on_completed` + * + * \marble take_last + { + source observable : +--1-2-3-4-5-6-| + operator "take_last(3)" : +--------------456| + } + * + * \details Actually this operator has buffer of requested size inside, keeps last `count` values and emit stored values on `on_completed` + * + * \param count amount of last items to be emitted + * \return new specific_observable with the take_last operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet take_last.cpp take_last + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store internal buffer + * - OnNext + * - Place obtained value into queue + * - If queue contains more values than expected - remove oldest one + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Emits values stored in queue + * + * \ingroup filtering_operators + * \see https://reactivex.io/documentation/operators/takelast.html + */ template auto take_last(size_t count) const & requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/take_until.hpp b/src/rpp/rpp/operators/fwd/take_until.hpp index 68cfd786c..65fe959e6 100644 --- a/src/rpp/rpp/operators/fwd/take_until.hpp +++ b/src/rpp/rpp/operators/fwd/take_until.hpp @@ -28,28 +28,46 @@ template struct member_overload { - /** - * \brief Discard any items emitted by an Observable after a second Observable emits an item or terminates. - * \warning The take_until subscribes and begins mirroring the source Observable. It also monitors a second Observable that you provide. If this second Observable emits an item or sends a on_error/on_completed notification, the Observable returned by take_until stops mirroring the source Observable and terminates. - * - * \marble take_until - { - source observable : +-1--2--3--| - source until_observable : +--s--s----| - operator "take_until" : +-1-| - } - * - * \param until_observable is the observables that stops the source observable from sending values when it emits one value or sends a on_error/on_completed event. - * \return new specific_observable with the take_until operator as most recent operator. - * \warning #include - * - * \par Examples - * \snippet take_until.cpp take_until - * \snippet take_until.cpp terminate - * - * \ingroup conditional_operators - * \see https://reactivex.io/documentation/operators/takeuntil.html - */ + /** + * \brief Discard any items emitted by an Observable after a second Observable emits an item or terminates. + * \warning The take_until subscribes and begins mirroring the source Observable. It also monitors a second Observable that you provide. If this second Observable emits an item or sends a on_error/on_completed notification, the Observable returned by take_until stops mirroring the source Observable and terminates. + * + * \marble take_until + { + source observable : +-1--2--3--| + source until_observable : +--s--s----| + operator "take_until" : +-1-| + } + * + * \details Actually this operator just subscribes on 2 observables and completes original when `until_observable` emits any value + * + * \param until_observable is the observables that stops the source observable from sending values when it emits one value or sends a on_error/on_completed event. + * \return new specific_observable with the take_until operator as most recent operator. + * \warning #include + * + * \par Examples + * \snippet take_until.cpp take_until + * \snippet take_until.cpp terminate + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to store internal state + * - OnNext for original observable + * - Just forward original on_next + * - OnError for original observable + * - Just forwards original on_error + * - OnCompleted for original observable + * - Just forwards original on_completed + * - OnNext for until observable + * - Emits on_completed + * - OnError for until observable + * - Just forwards on_error + * - OnCompleted for until observable + * - Just forwards on_completed + * + * \ingroup conditional_operators + * \see https://reactivex.io/documentation/operators/takeuntil.html + */ template auto take_until(TTriggerObservable&& until_observable) const& requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/take_while.hpp b/src/rpp/rpp/operators/fwd/take_while.hpp index fdcd7fb2b..e9cd27161 100644 --- a/src/rpp/rpp/operators/fwd/take_while.hpp +++ b/src/rpp/rpp/operators/fwd/take_while.hpp @@ -25,24 +25,38 @@ struct take_while_impl; template struct member_overload { - /** - * \brief Sends items provided by observable while items are satisfy predicate. When condition becomes false -> sends `on_completed` - * - * \marble take_while - { - source observable : +--1-2-3-4-5-6-| - operator "take_while: x => x!=3" : +--1-2-| - } - * \param predicate is predicate used to check items - * \return new specific_observable with the take_while operator as most recent operator. - * \warning #include - * - * \par Example: - * \snippet take_while.cpp take_while - * - * \ingroup conditional_operators - * \see https://reactivex.io/documentation/operators/takewhile.html - */ + /** + * \brief Sends items provided by observable while items are satisfy predicate. When condition becomes false -> sends `on_completed` + * + * \marble take_while + { + source observable : +--1-2-3-4-5-6-| + operator "take_while: x => x!=3" : +--1-2-| + } + * + * \details Actually this operator just emits values while predicate returns true + * + * \param predicate is predicate used to check items + * \return new specific_observable with the take_while operator as most recent operator. + * \warning #include + * + * \par Example: + * \snippet take_while.cpp take_while + * + * \par Implementation details: + * - On subscribe + * - None + * - OnNext + * - Just forwards emission if predicate returns true + * - Emits OnCompleted if predicate returns false + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup conditional_operators + * \see https://reactivex.io/documentation/operators/takewhile.html + */ template Predicate> auto take_while(Predicate&& predicate) const& requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/timeout.hpp b/src/rpp/rpp/operators/fwd/timeout.hpp index b652b4bfd..33fb0e4a9 100644 --- a/src/rpp/rpp/operators/fwd/timeout.hpp +++ b/src/rpp/rpp/operators/fwd/timeout.hpp @@ -28,26 +28,27 @@ struct timeout_impl; template struct member_overload { - /** - * \brief Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission) - * - * \marble timeout_fallback_obs - { - source observable : +--1-2-3-4----- ---5-| - operator "timeout(4, -10-|)" : +--1-2-3-4----10-| - } - * \param period is maximum duration between emitted items before a timeout occurs - * \param fallback_obs is observable to subscribe on when timeout reached - * \param scheduler is scheduler used to run timer for timeout - * \return new specific_observable with the timeout operator as most recent operator. - * \warning #include - * - * \par Example - * \snippet timeout.cpp timeout_fallback_obs - * - * \ingroup utility_operators - * \see https://reactivex.io/documentation/operators/timeout.html - */ + /** + * \brief Forwards emissions from original observable, but subscribes on fallback observable if no any events during specified period of time (since last emission) + * + * \marble timeout_fallback_obs + { + source observable : +--1-2-3-4--------5-| + operator "timeout(4, -10-|)" : +--1-2-3-4----10-| + } + * + * \param period is maximum duration between emitted items before a timeout occurs + * \param fallback_obs is observable to subscribe on when timeout reached + * \param scheduler is scheduler used to run timer for timeout + * \return new specific_observable with the timeout operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet timeout.cpp timeout_fallback_obs + * + * \ingroup utility_operators + * \see https://reactivex.io/documentation/operators/timeout.html + */ template FallbackObs, schedulers::constraint::scheduler TScheduler> auto timeout(schedulers::duration period, FallbackObs&& fallback_obs, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/window.hpp b/src/rpp/rpp/operators/fwd/window.hpp index b457a65f9..491eb974d 100644 --- a/src/rpp/rpp/operators/fwd/window.hpp +++ b/src/rpp/rpp/operators/fwd/window.hpp @@ -26,34 +26,46 @@ auto window_impl(TObs&& obs, size_t window_size); template struct member_overload { - /** - * \brief Subdivide original observable into sub-observables (windowed observables) and emit sub-observables of items instead of original items - * - * \marble window - { - source observable : +-1-2-3-4-5-| + /** + * \brief Subdivide original observable into sub-observables (windowed observables) and emit sub-observables of items instead of original items + * + * \marble window + { + source observable : +-1-2-3-4-5-| - operator "window(2)" : - { - .+1-2| - .....+3-4| - .........+5-| - } - } - * - * \details Actually it is similar to `buffer` but it emits observable instead of container. - * - * \param window_size amount of items which every observable would have - * - * \return new specific_observable with the window operator as most recent operator. - * \warning #include - * - * \par Example - * \snippet window.cpp window - * - * \ingroup transforming_operators - * \see https://reactivex.io/documentation/operators/window.html - */ + operator "window(2)" : + { + .+1-2| + .....+3-4| + .........+5-| + } + } + * + * \details Actually it is similar to `buffer` but it emits observable instead of container. + * + * \param window_size amount of items which every observable would have + * + * \return new specific_observable with the window operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet window.cpp window + * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to keep internal state + * - OnNext + * - Emits new window-observable if previous observable emitted requested amound of emisions + * - Emits emission via active window-observable + * - Completes window-observable if requested amound of emisions reached + * - OnError + * - Just forwards original on_error + * - OnCompleted + * - Just forwards original on_completed + * + * \ingroup transforming_operators + * \see https://reactivex.io/documentation/operators/window.html + */ template auto window(size_t window_size) const & requires is_header_included { diff --git a/src/rpp/rpp/operators/fwd/with_latest_from.hpp b/src/rpp/rpp/operators/fwd/with_latest_from.hpp index b0d529d2a..f23dc8c37 100644 --- a/src/rpp/rpp/operators/fwd/with_latest_from.hpp +++ b/src/rpp/rpp/operators/fwd/with_latest_from.hpp @@ -39,6 +39,8 @@ struct member_overload source other_observable : +-5-6-7- -- 8- -| operator "with_latest_from: x,y =>std::pair{x,y}" : +------{1,5}-{2,7}-{3,8}-| } + * + * \details Actually this operator just keeps last values from all other observables and combines them together with each new emission from original observable * * \param selector is applied to current emission of current observable and latests emissions from observables * \param observables are observables whose emissions would be combined when current observable sends new value @@ -48,6 +50,20 @@ struct member_overload * \par Examples * \snippet with_latest_from.cpp with_latest_from custom selector * + * \par Implementation details: + * - On subscribe + * - Allocates one `shared_ptr` to keep last values from all observables + * - OnNext for original observable + * - Applies selector to new emission and all saved last values from other observable (if any value for all observables) + * - OnNext other original observables + * - Just updates last value for this observable + * - OnError + * - Just forwards original on_error + * - OnCompleted for original observable + * - Just forwards original on_completed + * - OnCompleted for other observables + * - None + * * \ingroup combining_operators * \see https://reactivex.io/documentation/operators/combinelatest.html */