Skip to content

Commit

Permalink
speedup switch_on_next (#682)
Browse files Browse the repository at this point in the history
* speedup switch_on_next

* compile fix

* add tests

* fix tets
  • Loading branch information
victimsnino authored Nov 15, 2024
1 parent 90f9f2f commit 9929865
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 75 deletions.
5 changes: 3 additions & 2 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <rpp/operators/fwd.hpp>

#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/utils/utils.hpp>

Expand All @@ -34,7 +35,7 @@ namespace rpp::operators::details
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
class concat_disposable final : public rpp::composite_disposable
class concat_disposable final : public rpp::details::base_disposable
, public rpp::details::enable_wrapper_from_this<concat_disposable<TObservable, TObserver>>
{
public:
Expand Down Expand Up @@ -87,7 +88,7 @@ namespace rpp::operators::details
return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst);
}

void composite_dispose_impl(interface_disposable::Mode) noexcept override
void base_dispose_impl(interface_disposable::Mode) noexcept override
{
for (auto& d : m_child_disposables)
d.dispose();
Expand Down
50 changes: 32 additions & 18 deletions src/rpp/rpp/operators/switch_on_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/disposables/refcount_disposable.hpp>
#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/utils/utils.hpp>

#include <array>

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver>
class switch_on_next_state_t final : public refcount_disposable
class switch_on_next_state_t final : public rpp::details::base_disposable
{
public:
template<rpp::constraint::decayed_same_as<TObserver> TObs>
Expand All @@ -33,13 +35,21 @@ namespace rpp::operators::details
switch_on_next_state_t(const switch_on_next_state_t&) = delete;
switch_on_next_state_t(switch_on_next_state_t&&) noexcept = delete;

rpp::utils::pointer_under_lock<TObserver> get_observer()
rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer_with_mutex; }
rpp::composite_disposable& get_base_child_disposable() { return m_base_child_disposable; }
rpp::utils::pointer_under_lock<rpp::composite_disposable_wrapper> get_inner_child_disposable() { return m_inner_child_disposable; }

private:
void base_dispose_impl(interface_disposable::Mode) noexcept override
{
return m_observer_with_mutex;
get_base_child_disposable().dispose();
get_inner_child_disposable()->dispose();
}

private:
rpp::utils::value_with_mutex<TObserver> m_observer_with_mutex{};
rpp::utils::value_with_mutex<TObserver> m_observer_with_mutex{};
rpp::composite_disposable m_base_child_disposable{};
rpp::utils::value_with_mutex<rpp::composite_disposable_wrapper> m_inner_child_disposable{composite_disposable_wrapper::empty()};
};

template<rpp::constraint::observer TObserver>
Expand All @@ -48,9 +58,9 @@ namespace rpp::operators::details
public:
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

switch_on_next_inner_observer_strategy(const std::shared_ptr<switch_on_next_state_t<TObserver>>& state, const composite_disposable_wrapper& refcounted)
switch_on_next_inner_observer_strategy(const std::shared_ptr<switch_on_next_state_t<TObserver>>& state, composite_disposable_wrapper&& refcounted)
: m_state{state}
, m_refcounted{refcounted}
, m_refcounted{std::move(refcounted)}
{
}

Expand All @@ -68,12 +78,11 @@ namespace rpp::operators::details
void on_completed() const
{
m_refcounted.dispose();
if (m_state->is_disposed())
if (m_state->get_base_child_disposable().is_disposed())
m_state->get_observer()->on_completed();
}

void set_upstream(const disposable_wrapper& d) const { m_refcounted.add(d); }

bool is_disposed() const { return m_refcounted.is_disposed(); }

private:
Expand All @@ -98,9 +107,16 @@ namespace rpp::operators::details
template<typename T>
void on_next(T&& v) const
{
m_last_refcount.dispose();
m_last_refcount = m_state->add_ref();
std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, m_last_refcount});
auto new_inner = rpp::composite_disposable_wrapper::make();
{
auto inner = m_state->get_inner_child_disposable();
inner->dispose();
if (m_state->is_disposed())
return;

*inner = new_inner;
}
std::forward<T>(v).subscribe(switch_on_next_inner_observer_strategy<TObserver>{m_state, std::move(new_inner)});
}

void on_error(const std::exception_ptr& err) const
Expand All @@ -110,13 +126,13 @@ namespace rpp::operators::details

void on_completed() const
{
m_this_refcount.dispose();
if (m_state->is_disposed())
m_state->get_base_child_disposable().dispose();
if (m_state->get_inner_child_disposable()->is_disposed())
m_state->get_observer()->on_completed();
}

void set_upstream(const disposable_wrapper& d) const { m_this_refcount.add(d); }
bool is_disposed() const { return m_this_refcount.is_disposed(); }
void set_upstream(const disposable_wrapper& d) const { m_state->get_base_child_disposable().add(d); }
bool is_disposed() const { return m_state->get_base_child_disposable().is_disposed(); }

private:
static std::shared_ptr<switch_on_next_state_t<TObserver>> init_state(TObserver&& observer)
Expand All @@ -129,8 +145,6 @@ namespace rpp::operators::details

private:
std::shared_ptr<switch_on_next_state_t<TObserver>> m_state;
rpp::composite_disposable_wrapper m_this_refcount = m_state->add_ref();
mutable rpp::composite_disposable_wrapper m_last_refcount = composite_disposable_wrapper::empty();
};

struct switch_on_next_t : lift_operator<switch_on_next_t>
Expand Down
119 changes: 64 additions & 55 deletions src/tests/rpp/test_switch_on_next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <doctest/doctest.h>

#include <rpp/observables/dynamic_observable.hpp>
#include <rpp/observers/mock_observer.hpp>
#include <rpp/operators/switch_on_next.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/error.hpp>
Expand All @@ -21,86 +20,76 @@

#include "copy_count_tracker.hpp"
#include "disposable_observable.hpp"
#include "rpp_trompeloil.hpp"

TEST_CASE("switch_on_next switches observable after obtaining new one")
{
auto mock = mock_observer_strategy<int>();
mock_observer<int> mock{};
trompeloeil::sequence s{};

SUBCASE("just observable of just observables")
{
auto observable = rpp::source::just(rpp::source::just(1), rpp::source::just(2), rpp::source::just(3));
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat")
{
CHECK(mock.get_received_values() == std::vector{1, 2, 3});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
SUBCASE("just observable of just observables where second is error")
{
auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""})).as_dynamic(),
rpp::source::just(3).as_dynamic());
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat but stops on error")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(s);

observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat but stops on error")
{
CHECK(mock.get_received_values() == std::vector{1});
CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}
}
}
SUBCASE("just observable of just observables where second is completed")
{
auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::empty<int>().as_dynamic(),
rpp::source::just(3).as_dynamic());
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);

observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat")
{
CHECK(mock.get_received_values() == std::vector{1, 3});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
SUBCASE("just observable of just observables where second is never")
{
auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::never<int>().as_dynamic(),
rpp::source::just(3).as_dynamic());
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat")
{
CHECK(mock.get_received_values() == std::vector{1, 3});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
}
}
SUBCASE("just observable of just observables where last is never")
{
auto observable = rpp::source::just(rpp::source::just(1).as_dynamic(),
rpp::source::just(3).as_dynamic(),
rpp::source::never<int>().as_dynamic());
SUBCASE("subscribe on it via switch_on_next")
SUBCASE("subscribe on it via switch_on_next - obtains values as from concat but no complete")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
REQUIRE_CALL(*mock, on_next_lvalue(3)).IN_SEQUENCE(s);

observable | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("obtains values as from concat but no complete")
{
CHECK(mock.get_received_values() == std::vector{1, 3});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
}
}
SUBCASE("subject of just subjects")
Expand All @@ -120,12 +109,9 @@ TEST_CASE("switch_on_next switches observable after obtaining new one")

SUBCASE("Only value from first subject obtained")
{
REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
subj_1.get_observer().on_next(1);
subj_2.get_observer().on_next(2);

CHECK(mock.get_received_values() == std::vector{1});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
SUBCASE("send second subject and send values for all subjects")
{
Expand All @@ -134,36 +120,59 @@ TEST_CASE("switch_on_next switches observable after obtaining new one")
SUBCASE("Only value from second subject obtained")
{
subj_1.get_observer().on_next(1);
subj_2.get_observer().on_next(2);

CHECK(mock.get_received_values() == std::vector{2});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
REQUIRE_CALL(*mock, on_next_lvalue(2)).IN_SEQUENCE(s);
subj_2.get_observer().on_next(2);
}
}
SUBCASE("original subject completes but provided send value")
{
subj_of_subjects.get_observer().on_completed();

REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(s);
subj_1.get_observer().on_next(1);
subj_2.get_observer().on_next(2);
SUBCASE("value obtained")
{
CHECK(mock.get_received_values() == std::vector{1});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
SUBCASE("subject sends on_completed")
{
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
subj_1.get_observer().on_completed();
SUBCASE("subsriber completed")
{
CHECK(mock.get_on_completed_count() == 1);
}
}
}
}
}
}
SUBCASE("switch_on_next completes right")
{
rpp::subjects::publish_subject<rpp::dynamic_observable<int>> subj{};

subj.get_observable() | rpp::ops::switch_on_next() | rpp::ops::subscribe(mock);
SUBCASE("on_completed from base")
{
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
subj.get_observer().on_completed();
}

SUBCASE("on_completed from inner + then from base")
{
subj.get_observer().on_next(rpp::source::empty<int>());

REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
subj.get_observer().on_completed();
}

SUBCASE("on_completed from base + then from inner")
{
subj.get_observer().on_next(rpp::source::empty<int>());
subj.get_observer().on_next(rpp::source::never<int>());

rpp::subjects::publish_subject<int> inner{};
subj.get_observer().on_next(inner.get_observable());
subj.get_observer().on_completed();

REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(s);
inner.get_observer().on_completed();
}
}
}

TEST_CASE("switch_on_next doesn't produce extra copies")
Expand Down

1 comment on commit 9929865

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 311.12 ns 1.85 ns 1.85 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 307.17 ns 1.85 ns 1.85 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 689.82 ns 0.39 ns 0.31 ns 1.28
from array of 1 - create + subscribe + current_thread 1026.81 ns 3.42 ns 3.42 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2245.99 ns 112.50 ns 118.49 ns 0.95
defer from array of 1 - defer + create + subscribe + immediate 722.45 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2151.22 ns 59.23 ns 59.23 ns 1.00
interval - interval + take(3) + subscribe + current_thread 2984.35 ns 32.42 ns 32.40 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 30370.51 ns 28390.81 ns 27635.19 ns 1.03
from array of 1000 - create + as_blocking + subscribe + new_thread 45178.32 ns 53556.14 ns 50572.00 ns 1.06
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3579.35 ns 134.00 ns 140.57 ns 0.95

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1083.72 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 879.68 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1007.71 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 871.04 ns 0.31 ns 0.31 ns 1.01
immediate_just(1,2)+first()+subscribe 1242.02 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 900.39 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1130.53 ns 18.22 ns 17.91 ns 1.02
immediate_just(1,2,3)+element_at(1)+subscribe 834.41 ns 0.31 ns 0.31 ns 1.01

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 265.91 ns 1.54 ns 1.55 ns 1.00
current_thread scheduler create worker + schedule 395.14 ns 4.94 ns 4.32 ns 1.14
current_thread scheduler create worker + schedule + recursive schedule 821.06 ns 61.65 ns 60.48 ns 1.02

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 851.37 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 915.52 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2333.99 ns 149.88 ns 139.77 ns 1.07
immediate_just+buffer(2)+subscribe 1545.04 ns 13.89 ns 13.59 ns 1.02
immediate_just+window(2)+subscribe + subscsribe inner 2350.62 ns 1351.03 ns 1359.80 ns 0.99

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 832.30 ns - - 0.00
immediate_just+take_while(true)+subscribe 855.31 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2010.25 ns 0.32 ns 0.31 ns 1.04

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3388.98 ns 165.73 ns 160.14 ns 1.03
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3708.22 ns 160.94 ns 155.79 ns 1.03
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 137.38 ns 137.99 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3579.54 ns 524.86 ns 1254.32 ns 0.42
immediate_just(1) + zip(immediate_just(2)) + subscribe 2099.40 ns 213.43 ns 210.73 ns 1.01
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3123.99 ns 228.33 ns 250.60 ns 0.91

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.55 ns 14.66 ns 14.68 ns 1.00
subscribe 100 observers to publish_subject 198033.40 ns 16495.20 ns 16449.17 ns 1.00
100 on_next to 100 observers to publish_subject 27342.87 ns 18661.60 ns 17198.60 ns 1.09

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1362.48 ns 13.27 ns 12.67 ns 1.05
basic sample with immediate scheduler 1373.63 ns 5.55 ns 5.24 ns 1.06

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 908.84 ns 0.33 ns 0.31 ns 1.06

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2063.36 ns 1000.03 ns 983.42 ns 1.02
create(on_error())+retry(1)+subscribe 596.23 ns 112.93 ns 109.48 ns 1.03

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 993.62 ns 0.72 ns 0.71 ns 1.01
Subscribe empty callbacks to empty observable via pipe operator 995.25 ns 0.71 ns 0.70 ns 1.02

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1981.49 ns 0.24 ns 0.23 ns 1.02
from array of 1 - create + subscribe + current_thread 2518.99 ns 33.78 ns 33.60 ns 1.01
concat_as_source of just(1 immediate) create + subscribe 5449.31 ns 316.00 ns 315.88 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 1941.23 ns 0.23 ns 0.23 ns 1.00
interval - interval + take(3) + subscribe + immediate 4864.67 ns 114.78 ns 114.56 ns 1.00
interval - interval + take(3) + subscribe + current_thread 5872.82 ns 97.97 ns 97.86 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 331781.20 ns 232793.00 ns 79561.79 ns 2.93
from array of 1000 - create + as_blocking + subscribe + new_thread 88495.00 ns 88081.33 ns 87249.42 ns 1.01
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 8102.29 ns 360.82 ns 359.45 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 2757.45 ns 0.23 ns 0.23 ns 1.00
immediate_just+filter(true)+subscribe 2039.07 ns 0.23 ns 0.23 ns 0.97
immediate_just(1,2)+skip(1)+subscribe 2876.46 ns 0.24 ns 0.23 ns 1.01
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2017.62 ns 0.46 ns 0.47 ns 0.97
immediate_just(1,2)+first()+subscribe 3077.02 ns 0.23 ns 0.23 ns 0.97
immediate_just(1,2)+last()+subscribe 2287.52 ns 0.23 ns 0.23 ns 0.97
immediate_just+take_last(1)+subscribe 2931.31 ns 0.23 ns 0.23 ns 0.97
immediate_just(1,2,3)+element_at(1)+subscribe 2067.61 ns 0.23 ns 0.23 ns 0.97

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 825.48 ns 0.91 ns 0.93 ns 0.98
current_thread scheduler create worker + schedule 1158.16 ns 33.18 ns 34.14 ns 0.97
current_thread scheduler create worker + schedule + recursive schedule 1916.45 ns 197.63 ns 202.92 ns 0.97

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2084.89 ns 4.20 ns 4.21 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 2320.60 ns 0.47 ns 0.47 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 5206.41 ns 373.21 ns 375.74 ns 0.99
immediate_just+buffer(2)+subscribe 2476.94 ns 63.65 ns 64.18 ns 0.99
immediate_just+window(2)+subscribe + subscsribe inner 5543.75 ns 2438.70 ns 2385.97 ns 1.02

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2068.17 ns - - 0.00
immediate_just+take_while(true)+subscribe 2083.37 ns 0.23 ns 0.23 ns 0.99

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 4811.58 ns 4.78 ns 4.90 ns 0.98

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 7228.19 ns 400.89 ns 417.88 ns 0.96
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 8315.48 ns 428.16 ns 407.91 ns 1.05
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 451.95 ns 448.18 ns 1.01
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 7984.30 ns 927.67 ns 1881.86 ns 0.49
immediate_just(1) + zip(immediate_just(2)) + subscribe 5079.56 ns 826.82 ns 822.09 ns 1.01
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 7204.94 ns 633.26 ns 674.38 ns 0.94

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 73.68 ns 48.79 ns 49.39 ns 0.99
subscribe 100 observers to publish_subject 334109.00 ns 38817.69 ns 40905.05 ns 0.95
100 on_next to 100 observers to publish_subject 49730.10 ns 22502.71 ns 19300.43 ns 1.17

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2637.17 ns 66.61 ns 69.43 ns 0.96
basic sample with immediate scheduler 2631.93 ns 17.90 ns 18.44 ns 0.97

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2375.15 ns 0.23 ns 0.23 ns 0.99

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 6510.15 ns 4045.93 ns 4118.37 ns 0.98
create(on_error())+retry(1)+subscribe 1806.60 ns 278.54 ns 278.47 ns 1.00

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 276.16 ns 0.63 ns 0.63 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 272.57 ns 0.63 ns 0.63 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 576.98 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 794.52 ns 4.01 ns 4.01 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2324.78 ns 128.06 ns 128.61 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 777.22 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2195.23 ns 58.30 ns 58.30 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3152.25 ns 30.91 ns 30.88 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 29915.38 ns 27414.80 ns 29074.54 ns 0.94
from array of 1000 - create + as_blocking + subscribe + new_thread 40454.54 ns 38247.25 ns 37825.45 ns 1.01
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3653.67 ns 148.32 ns 148.28 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1144.76 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 847.77 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1091.23 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 890.93 ns 0.62 ns 0.31 ns 2.00
immediate_just(1,2)+first()+subscribe 1375.71 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 1072.80 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1211.63 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 885.72 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 282.77 ns 1.54 ns 1.54 ns 1.00
current_thread scheduler create worker + schedule 396.75 ns 4.05 ns 4.01 ns 1.01
current_thread scheduler create worker + schedule + recursive schedule 850.71 ns 54.96 ns 56.03 ns 0.98

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 841.64 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 965.69 ns 0.31 ns 0.62 ns 0.50
immediate_just+flat_map(immediate_just(v*2))+subscribe 2243.64 ns 140.13 ns 140.60 ns 1.00
immediate_just+buffer(2)+subscribe 1503.34 ns 13.58 ns 14.19 ns 0.96
immediate_just+window(2)+subscribe + subscsribe inner 2418.97 ns 912.85 ns 918.06 ns 0.99

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 832.30 ns - - 0.00
immediate_just+take_while(true)+subscribe 847.49 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2037.30 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3262.76 ns 157.75 ns 153.10 ns 1.03
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3727.85 ns 141.15 ns 137.52 ns 1.03
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 143.36 ns 143.00 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3394.70 ns 376.60 ns 830.65 ns 0.45
immediate_just(1) + zip(immediate_just(2)) + subscribe 2276.92 ns 203.29 ns 204.17 ns 1.00
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 3227.28 ns 223.99 ns 222.97 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 52.67 ns 17.76 ns 17.91 ns 0.99
subscribe 100 observers to publish_subject 207406.00 ns 16225.97 ns 16076.98 ns 1.01
100 on_next to 100 observers to publish_subject 46656.44 ns 23607.41 ns 23499.42 ns 1.00

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1287.74 ns 11.42 ns 11.42 ns 1.00
basic sample with immediate scheduler 1291.40 ns 5.86 ns 6.18 ns 0.95

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 987.53 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2147.93 ns 1193.05 ns 1165.87 ns 1.02
create(on_error())+retry(1)+subscribe 653.22 ns 139.93 ns 140.12 ns 1.00

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 566.37 ns 1.85 ns 1.85 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 584.84 ns 1.85 ns 1.85 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1155.73 ns 5.24 ns 5.55 ns 0.94
from array of 1 - create + subscribe + current_thread 1433.33 ns 15.75 ns 15.75 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 3743.75 ns 172.95 ns 174.46 ns 0.99
defer from array of 1 - defer + create + subscribe + immediate 1186.93 ns 5.55 ns 5.55 ns 1.00
interval - interval + take(3) + subscribe + immediate 3092.82 ns 140.97 ns 140.85 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3459.16 ns 62.08 ns 60.24 ns 1.03
from array of 1 - create + as_blocking + subscribe + new_thread 121112.50 ns 114811.11 ns 114088.89 ns 1.01
from array of 1000 - create + as_blocking + subscribe + new_thread 130777.78 ns 132244.44 ns 130722.22 ns 1.01
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 5341.36 ns 216.93 ns 200.40 ns 1.08

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 2073.19 ns 19.73 ns 19.73 ns 1.00
immediate_just+filter(true)+subscribe 1335.48 ns 18.82 ns 18.81 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1721.74 ns 18.52 ns 18.51 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1373.59 ns 23.47 ns 23.45 ns 1.00
immediate_just(1,2)+first()+subscribe 2387.06 ns 17.29 ns 17.28 ns 1.00
immediate_just(1,2)+last()+subscribe 1464.02 ns 18.52 ns 18.51 ns 1.00
immediate_just+take_last(1)+subscribe 2025.83 ns 64.54 ns 65.03 ns 0.99
immediate_just(1,2,3)+element_at(1)+subscribe 1327.37 ns 21.92 ns 21.91 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 488.50 ns 4.32 ns 4.01 ns 1.08
current_thread scheduler create worker + schedule 664.11 ns 11.18 ns 11.61 ns 0.96
current_thread scheduler create worker + schedule + recursive schedule 1353.75 ns 103.92 ns 104.64 ns 0.99

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1308.85 ns 18.82 ns 18.81 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1428.49 ns 21.29 ns 20.99 ns 1.01
immediate_just+flat_map(immediate_just(v*2))+subscribe 3494.06 ns 186.13 ns 179.90 ns 1.03
immediate_just+buffer(2)+subscribe 2375.67 ns 63.27 ns 65.59 ns 0.96
immediate_just+window(2)+subscribe + subscsribe inner 4043.30 ns 1292.44 ns 1342.91 ns 0.96

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1331.03 ns 17.57 ns 17.57 ns 1.00
immediate_just+take_while(true)+subscribe 1330.20 ns 18.81 ns 18.81 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3225.47 ns 11.11 ns 11.10 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5117.44 ns 226.22 ns 196.94 ns 1.15
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5692.18 ns 191.89 ns 182.58 ns 1.05
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 187.18 ns 207.02 ns 0.90
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 5765.50 ns 434.44 ns 968.84 ns 0.45
immediate_just(1) + zip(immediate_just(2)) + subscribe 3580.89 ns 522.40 ns 522.18 ns 1.00
immediate_just(immediate_just(1), immediate_just(1)) + concat() + subscribe 4921.05 ns 327.16 ns 333.45 ns 0.98

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 36.62 ns 20.39 ns 20.11 ns 1.01
subscribe 100 observers to publish_subject 259600.00 ns 29530.56 ns 29612.82 ns 1.00
100 on_next to 100 observers to publish_subject 51705.00 ns 38730.00 ns 35737.93 ns 1.08

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1891.50 ns 95.70 ns 96.09 ns 1.00
basic sample with immediate scheduler 1884.45 ns 68.71 ns 68.33 ns 1.01

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1474.20 ns 19.44 ns 19.42 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1941.03 ns 368.08 ns 358.78 ns 1.03
create(on_error())+retry(1)+subscribe 1572.93 ns 138.06 ns 139.24 ns 0.99

Please sign in to comment.