#include "../test.h" #include "rxcpp/operators/rx-combine_latest.hpp" #include "rxcpp/operators/rx-map.hpp" #include "rxcpp/operators/rx-take.hpp" #include "rxcpp/operators/rx-observe_on.hpp" #include "rxcpp/operators/rx-publish.hpp" #include "rxcpp/operators/rx-ref_count.hpp" #include SCENARIO("observe subscription", "[!hide]"){ GIVEN("observable of ints"){ WHEN("subscribe"){ auto observers = std::make_shared>>(); auto observable = rxcpp::observable<>::create([=](rxcpp::subscriber out){ auto it = observers->insert(observers->end(), out); it->add([=](){ observers->erase(it); }); }); } } } static const int static_subscriptions = 10000; SCENARIO("for loop subscribes to map", "[!hide][for][just][subscribe][long][perf]"){ const int& subscriptions = static_subscriptions; GIVEN("a for loop"){ WHEN("subscribe 100K times"){ using namespace std::chrono; typedef steady_clock clock; auto sc = rxsc::make_current_thread(); auto w = sc.create_worker(); int runs = 10; auto loop = [&](const rxsc::schedulable& self) { int c = 0; int n = 1; auto start = clock::now(); for (int i = 0; i < subscriptions; i++) { rx::observable<>::just(1) .map([](int i) { std::stringstream serializer; serializer << i; return serializer.str(); }) .map([](const std::string& s) { int i; std::stringstream(s) >> i; return i; }) .subscribe([&](int){ ++c; }); } auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); std::cout << "loop subscribe map : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; if (--runs > 0) { self(); } }; w.schedule(loop); } } } SCENARIO("for loop subscribes to combine_latest", "[!hide][for][just][combine_latest][subscribe][long][perf]"){ const int& subscriptions = static_subscriptions; GIVEN("a for loop"){ WHEN("subscribe 100K times"){ using namespace std::chrono; typedef steady_clock clock; auto sc = rxsc::make_current_thread(); auto w = sc.create_worker(); int runs = 10; auto loop = [&](const rxsc::schedulable& self) { int c = 0; int n = 1; auto start = clock::now(); for (int i = 0; i < subscriptions; i++) { rx::observable<>::just(1) .combine_latest([](int i, int j) { return i + j; }, rx::observable<>::just(2)) .subscribe([&](int){ ++c; }); } auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); std::cout << "loop subscribe combine_latest : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; if (--runs > 0) { self(); } }; w.schedule(loop); } } } SCENARIO("synchronized range debug", "[!hide][subscribe][range][synchronize_debug][synchronize][long][perf]"){ GIVEN("range"){ WHEN("synchronized"){ using namespace std::chrono; typedef steady_clock clock; auto sc = rxsc::make_current_thread(); auto w = sc.create_worker(); auto es = rx::synchronize_event_loop(); const int values = 10000; int runs = 10; auto loop = [&](const rxsc::schedulable& self) { std::atomic c(0); int n = 1; auto liftrequirecompletion = [&](rx::subscriber dest){ auto completionstate = std::make_shared>>(false, 0, std::move(dest)); std::get<2>(*completionstate).add([=](){ if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) { abort(); } }); // VS2013 deduction issue requires dynamic (type-forgetting) return rx::make_subscriber( std::get<2>(*completionstate), [=](int n){ ++std::get<1>(*completionstate); std::get<2>(*completionstate).on_next(n); }, [=](rxu::error_ptr){ abort(); }, [=](){ if (std::get<1>(*completionstate) != values) { abort(); } std::get<0>(*completionstate) = true; std::get<2>(*completionstate).on_completed(); }).as_dynamic(); }; auto start = clock::now(); auto ew = es.create_coordinator().get_worker(); std::atomic v(0); auto s0 = rxs::range(1, es) .take(values) .lift(liftrequirecompletion) .as_dynamic() .publish_synchronized(es) .ref_count() .lift(liftrequirecompletion) .subscribe( rx::make_observer_dynamic( [&](int){ ++v; }, [&](){ ++c; })); auto s1 = rxs::range(values + 1, es) .take(values) .lift(liftrequirecompletion) .as_dynamic() .publish_synchronized(es) .ref_count() .lift(liftrequirecompletion) .subscribe( rx::make_observer_dynamic( [&](int){ ++v; }, [&](){ ++c; })); auto s2 = rxs::range((values * 2) + 1, es) .take(values) .lift(liftrequirecompletion) .as_dynamic() .publish_synchronized(es) .ref_count() .lift(liftrequirecompletion) .subscribe( rx::make_observer_dynamic( [&](int){ ++v; }, [&](){ ++c; })); while(v != values * 3 || c != 3); s0.unsubscribe(); s1.unsubscribe(); s2.unsubscribe(); auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); std::cout << "range synchronized : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; if (--runs > 0) { self(); } }; w.schedule(loop); } } } SCENARIO("observe_on range debug", "[!hide][subscribe][range][observe_on_debug][observe_on][long][perf]"){ GIVEN("range"){ WHEN("observed on"){ using namespace std::chrono; typedef steady_clock clock; auto sc = rxsc::make_current_thread(); auto w = sc.create_worker(); auto es = rx::observe_on_event_loop(); const int values = 10000; int runs = 10; auto loop = [&](const rxsc::schedulable& self) { std::atomic c(0); int n = 1; auto liftrequirecompletion = [&](rx::subscriber dest){ auto completionstate = std::make_shared>>(false, 0, std::move(dest)); std::get<2>(*completionstate).add([=](){ if (std::get<1>(*completionstate) != values || !std::get<0>(*completionstate)) { abort(); } }); // VS2013 deduction issue requires dynamic (type-forgetting) return rx::make_subscriber( std::get<2>(*completionstate), [=](int n){ ++std::get<1>(*completionstate); std::get<2>(*completionstate).on_next(n); }, [=](rxu::error_ptr){ abort(); }, [=](){ if (std::get<1>(*completionstate) != values) { abort(); } std::get<0>(*completionstate) = true; std::get<2>(*completionstate).on_completed(); }).as_dynamic(); }; auto start = clock::now(); auto ew = es.create_coordinator().get_worker(); std::atomic v(0); auto s0 = rxs::range(1, es) .take(values) .lift(liftrequirecompletion) .as_dynamic() .observe_on(es) .lift(liftrequirecompletion) .subscribe( rx::make_observer_dynamic( [&](int){ ++v; }, [&](){ ++c; })); auto s1 = rxs::range(values + 1, es) .take(values) .lift(liftrequirecompletion) .as_dynamic() .observe_on(es) .lift(liftrequirecompletion) .subscribe( rx::make_observer_dynamic( [&](int){ ++v; }, [&](){ ++c; })); auto s2 = rxs::range((values * 2) + 1, es) .take(values) .lift(liftrequirecompletion) .as_dynamic() .observe_on(es) .lift(liftrequirecompletion) .subscribe( rx::make_observer_dynamic( [&](int){ ++v; }, [&](){ ++c; })); while(v != values * 3 || c != 3); s0.unsubscribe(); s1.unsubscribe(); s2.unsubscribe(); auto finish = clock::now(); auto msElapsed = duration_cast(finish-start); std::cout << "range observe_on : " << n << " subscribed, " << v << " on_next calls, " << msElapsed.count() << "ms elapsed, " << v / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; if (--runs > 0) { self(); } }; w.schedule(loop); } } } SCENARIO("subscription traits", "[subscription][traits]"){ GIVEN("given some subscription types"){ auto es = rx::make_subscription(); rx::composite_subscription cs; WHEN("tested"){ THEN("is_subscription value is true for empty subscription"){ REQUIRE(rx::is_subscription::value); } THEN("is_subscription value is true for composite_subscription"){ REQUIRE(rx::is_subscription::value); } } } } SCENARIO("non-subscription traits", "[subscription][traits]"){ GIVEN("given some non-subscription types"){ auto l = [](){}; int i = 0; void* v = nullptr; WHEN("tested"){ THEN("is_subscription value is false for lambda"){ l(); REQUIRE(!rx::is_subscription::value); } THEN("is_subscription value is false for int"){ i = 0; REQUIRE(!rx::is_subscription::value); } THEN("is_subscription value is false for void*"){ v = nullptr; REQUIRE(!rx::is_subscription::value); } THEN("is_subscription value is false for void"){ REQUIRE(!rx::is_subscription::value); } } } } SCENARIO("subscription static", "[subscription]"){ GIVEN("given a subscription"){ int i=0; auto s = rx::make_subscription([&i](){++i;}); WHEN("not used"){ THEN("is subscribed"){ REQUIRE(s.is_subscribed()); } THEN("i is 0"){ REQUIRE(i == 0); } } WHEN("used"){ THEN("is not subscribed when unsubscribed once"){ s.unsubscribe(); REQUIRE(!s.is_subscribed()); } THEN("is not subscribed when unsubscribed twice"){ s.unsubscribe(); s.unsubscribe(); REQUIRE(!s.is_subscribed()); } THEN("i is 1 when unsubscribed once"){ s.unsubscribe(); REQUIRE(i == 1); } THEN("i is 1 when unsubscribed twice"){ s.unsubscribe(); s.unsubscribe(); REQUIRE(i == 1); } } } } SCENARIO("subscription empty", "[subscription]"){ GIVEN("given an empty subscription"){ auto s = rx::make_subscription(); WHEN("not used"){ THEN("is not subscribed"){ REQUIRE(!s.is_subscribed()); } } WHEN("used"){ THEN("is not subscribed when unsubscribed once"){ s.unsubscribe(); REQUIRE(!s.is_subscribed()); } THEN("is not subscribed when unsubscribed twice"){ s.unsubscribe(); s.unsubscribe(); REQUIRE(!s.is_subscribed()); } } } } SCENARIO("subscription composite", "[subscription]"){ GIVEN("given a subscription"){ int i=0; rx::composite_subscription s; s.add(rx::make_subscription()); s.add(rx::make_subscription([&i](){++i;})); s.add([&i](){++i;}); WHEN("not used"){ THEN("is subscribed"){ REQUIRE(s.is_subscribed()); } THEN("i is 0"){ REQUIRE(i == 0); } } WHEN("used"){ THEN("is not subscribed when unsubscribed once"){ s.unsubscribe(); REQUIRE(!s.is_subscribed()); } THEN("is not subscribed when unsubscribed twice"){ s.unsubscribe(); s.unsubscribe(); REQUIRE(!s.is_subscribed()); } THEN("i is 2 when unsubscribed once"){ s.unsubscribe(); REQUIRE(i == 2); } THEN("i is 2 when unsubscribed twice"){ s.unsubscribe(); s.unsubscribe(); REQUIRE(i == 2); } } } }