diff --git a/src/vt/collective/reduce/allreduce/rabenseifner.impl.h b/src/vt/collective/reduce/allreduce/rabenseifner.impl.h index b01c3f4f73..02cb0b8db0 100644 --- a/src/vt/collective/reduce/allreduce/rabenseifner.impl.h +++ b/src/vt/collective/reduce/allreduce/rabenseifner.impl.h @@ -65,17 +65,20 @@ namespace vt::collective::reduce::allreduce { template class Op, auto finalHandler> template Rabenseifner::Rabenseifner( - detail::StrongVrtProxy proxy, detail::StrongGroup group, size_t num_elems, + detail::StrongVrtProxy proxy, detail::StrongGroup group, + size_t num_elems, Args&&... data) : Rabenseifner(group, std::forward(data)...) { collection_proxy_ = proxy.get(); local_num_elems_ = num_elems; local_col_wait_count_++; + + auto const is_ready = local_col_wait_count_ == local_num_elems_; vt_debug_print( terse, allreduce, - "Rabenseifner (this={}): proxy={:x} local_num_elems={} ID={} is_ready={}\n", - print_ptr(this), proxy.get(), local_num_elems_, id_, - local_col_wait_count_ == local_num_elems_); + "Rabenseifner (this={}): proxy={:x} proxy_={} local_num_elems={} ID={} is_ready={}\n", + print_ptr(this), proxy.get(), proxy_.getProxy(), local_num_elems_, id_ - 1, is_ready + ); } template class Op, auto finalHandler> @@ -502,9 +505,9 @@ void Rabenseifner::scatterReduceIter(size_t id) { terse, allreduce, "Rabenseifner Scatter (Send step {} to {}): Starting with idx = {} and " "count " - "{} ID = {}\n", + "{} ID = {} proxy_={}\n", state.scatter_step_, actual_partner, state.s_index_[state.scatter_step_], - state.s_count_[state.scatter_step_], id + state.s_count_[state.scatter_step_], id, proxy_.getProxy() ); proxy_[actual_partner] diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index 49d3b00821..d007e4338d 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -920,6 +920,7 @@ messaging::PendingSend CollectionManager::reduceLocal( using Reducer = collective::reduce::allreduce::Rabenseifner< CollectionAllreduceT, DataT, Op, f>; + // Incorrect! will yield same reducer for different Op/payload size/final handler etc. if (auto reducer = rabenseifner_reducers_.find(col_proxy); reducer == rabenseifner_reducers_.end()) { if (use_group) { @@ -937,12 +938,16 @@ messaging::PendingSend CollectionManager::reduceLocal( auto cb = vt::theCB()->makeCallbackBcastProxy(proxy); obj->setFinalHandler(cb); + + if(num_elms == 1){ + obj->allreduce(obj->id_ - 1); + } } } else { if (use_group) { // theGroup()->allreduce(group, ); } else { - auto obj_proxy = rabenseifner_reducers_.at(col_proxy); + auto obj_proxy = reducer->second; // rabenseifner_reducers_.at(col_proxy); auto typed_proxy = static_cast>(obj_proxy); auto* obj = typed_proxy[theContext()->getNode()].get(); diff --git a/src/vt/vrt/collection/reducable/reducable.impl.h b/src/vt/vrt/collection/reducable/reducable.impl.h index 564a81827a..3d0b3576e5 100644 --- a/src/vt/vrt/collection/reducable/reducable.impl.h +++ b/src/vt/vrt/collection/reducable/reducable.impl.h @@ -41,8 +41,6 @@ //@HEADER */ -#include "vt/collective/reduce/allreduce/rabenseifner_msg.h" -#include #if !defined INCLUDED_VT_VRT_COLLECTION_REDUCABLE_REDUCABLE_IMPL_H #define INCLUDED_VT_VRT_COLLECTION_REDUCABLE_REDUCABLE_IMPL_H diff --git a/tests/perf/allreduce.cc b/tests/perf/allreduce.cc index 00d8579873..6607a28c63 100644 --- a/tests/perf/allreduce.cc +++ b/tests/perf/allreduce.cc @@ -281,49 +281,65 @@ VT_PERF_TEST(MyTest, test_allreduce_group_rabenseifner) { } struct Hello : vt::Collection { - Hello() = default; - void FInalHan(std::vector result) { - std::string result_s = ""; - for(auto val : result){ - result_s.append(fmt::format("{} ", val)); - } - fmt::print( - "[{}]: Allreduce handler (Values=[{}]), idx={}\n", - theContext()->getNode(), result_s, getIndex().x() - ); + Hello() { + for (auto const payload_size : payloadSizes) { + timer_names_[payload_size] = fmt::format("Collection {}", payload_size); + } } - void Handler() { - auto proxy = this->getCollectionProxy(); - - std::vector payload(100, getIndex().x()); - proxy.allreduce_h<&Hello::FInalHan, collective::PlusOp>(std::move(payload)); + void finalHan(std::vector result) { + // std::string result_s = ""; + // for(auto val : result){ + // result_s.append(fmt::format("{} ", val)); + // } + // fmt::print( + // "[{}]: Allreduce handler (Values=[{}]), idx={}\n", + // theContext()->getNode(), result_s, getIndex().x() + // ); col_send_done_ = true; + parent_->StopTimer(timer_names_.at(result.size())); + } + + void handler(size_t payload_size) { + auto proxy = this->getCollectionProxy(); + + std::vector payload(payload_size, getIndex().x()); + parent_->StartTimer(timer_names_.at(payload_size)); + proxy.allreduce_h<&Hello::finalHan, collective::PlusOp>(std::move(payload)); } bool col_send_done_ = false; + std::unordered_map timer_names_= {}; + MyTest* parent_ = {}; }; VT_PERF_TEST(MyTest, test_allreduce_collection_rabenseifner) { - auto range = vt::Index1D(int32_t{num_nodes_ * 2}); + auto range = vt::Index1D(int32_t{num_nodes_}); auto proxy = vt::makeCollection("test_collection_send") .bounds(range) .bulkInsert() .wait(); - auto const thisNode = vt::theContext()->getNode(); auto const nextNode = (thisNode + 1) % num_nodes_; theCollective()->barrier(); - proxy.broadcastCollective<&Hello::Handler>(); + auto const num_elms_per_node = 1; + auto const elm = thisNode * num_elms_per_node; - // We run 1 coll elem per node, so it should be ok - // theSched()->runSchedulerWhile([&] { return !(elm->col_send_done_); }); - //elm->col_send_done_ = false; + proxy[elm].tryGetLocalPtr()->parent_ = this; + proxy.broadcastCollective<&Hello::handler>(payloadSizes.front()); + theSched()->runSchedulerWhile( + [&] { return !proxy[elm].tryGetLocalPtr()->col_send_done_; }); + // for (auto payload_size : payloadSizes) { + // proxy.broadcastCollective<&Hello::handler>(payload_size); + // // We run 1 coll elem per node, so it should be ok + // theSched()->runSchedulerWhile([&] { return !proxy[elm].tryGetLocalPtr()->col_send_done_; }); + // proxy[elm].tryGetLocalPtr()->col_send_done_ = false; + // } } VT_PERF_TEST_MAIN()