Skip to content

Commit

Permalink
#2281: Working perf test with collection
Browse files Browse the repository at this point in the history
  • Loading branch information
JacobDomagala committed Sep 2, 2024
1 parent 81b4d00 commit 27d1929
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 30 deletions.
15 changes: 9 additions & 6 deletions src/vt/collective/reduce/allreduce/rabenseifner.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,20 @@ namespace vt::collective::reduce::allreduce {
template <typename Type, typename DataT, template <typename Arg> class Op, auto finalHandler>
template <typename... Args>
Rabenseifner<Type, DataT, Op, finalHandler>::Rabenseifner(
detail::StrongVrtProxy proxy, detail::StrongGroup group, size_t num_elems,
detail::StrongVrtProxy proxy, detail::StrongGroup group,
size_t num_elems,
Args&&... data)
: Rabenseifner<Type, DataT, Op, finalHandler>(group, std::forward<Args>(data)...) {
collection_proxy_ = proxy.get();
local_num_elems_ = num_elems;
local_col_wait_count_++;

auto const is_ready = local_col_wait_count_ == local_num_elems_;
vt_debug_print(
terse, allreduce,
"Rabenseifner (this={}): proxy={:x} local_num_elems={} ID={} is_ready={}\n",
print_ptr(this), proxy.get(), local_num_elems_, id_,
local_col_wait_count_ == local_num_elems_);
"Rabenseifner (this={}): proxy={:x} proxy_={} local_num_elems={} ID={} is_ready={}\n",
print_ptr(this), proxy.get(), proxy_.getProxy(), local_num_elems_, id_ - 1, is_ready
);
}

template <typename Type, typename DataT, template <typename Arg> class Op, auto finalHandler>
Expand Down Expand Up @@ -502,9 +505,9 @@ void Rabenseifner<Type, DataT, Op, finalHandler>::scatterReduceIter(size_t id) {
terse, allreduce,
"Rabenseifner Scatter (Send step {} to {}): Starting with idx = {} and "
"count "
"{} ID = {}\n",
"{} ID = {} proxy_={}\n",
state.scatter_step_, actual_partner, state.s_index_[state.scatter_step_],
state.s_count_[state.scatter_step_], id
state.s_count_[state.scatter_step_], id, proxy_.getProxy()
);

proxy_[actual_partner]
Expand Down
7 changes: 6 additions & 1 deletion src/vt/vrt/collection/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ messaging::PendingSend CollectionManager::reduceLocal(
using Reducer = collective::reduce::allreduce::Rabenseifner<
CollectionAllreduceT, DataT, Op, f>;

// Incorrect! will yield same reducer for different Op/payload size/final handler etc.
if (auto reducer = rabenseifner_reducers_.find(col_proxy);
reducer == rabenseifner_reducers_.end()) {
if (use_group) {
Expand All @@ -937,12 +938,16 @@ messaging::PendingSend CollectionManager::reduceLocal(

auto cb = vt::theCB()->makeCallbackBcastProxy<f>(proxy);
obj->setFinalHandler(cb);

if(num_elms == 1){
obj->allreduce(obj->id_ - 1);
}
}
} else {
if (use_group) {
// theGroup()->allreduce<f, Op>(group, );
} else {
auto obj_proxy = rabenseifner_reducers_.at(col_proxy);
auto obj_proxy = reducer->second; // rabenseifner_reducers_.at(col_proxy);
auto typed_proxy =
static_cast<vt::objgroup::proxy::Proxy<Reducer>>(obj_proxy);
auto* obj = typed_proxy[theContext()->getNode()].get();
Expand Down
2 changes: 0 additions & 2 deletions src/vt/vrt/collection/reducable/reducable.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
//@HEADER
*/

#include "vt/collective/reduce/allreduce/rabenseifner_msg.h"
#include <utility>
#if !defined INCLUDED_VT_VRT_COLLECTION_REDUCABLE_REDUCABLE_IMPL_H
#define INCLUDED_VT_VRT_COLLECTION_REDUCABLE_REDUCABLE_IMPL_H

Expand Down
58 changes: 37 additions & 21 deletions tests/perf/allreduce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,49 +281,65 @@ VT_PERF_TEST(MyTest, test_allreduce_group_rabenseifner) {
}

struct Hello : vt::Collection<Hello, vt::Index1D> {
Hello() = default;
void FInalHan(std::vector<int32_t> result) {
std::string result_s = "";
for(auto val : result){
result_s.append(fmt::format("{} ", val));
}
fmt::print(
"[{}]: Allreduce handler (Values=[{}]), idx={}\n",
theContext()->getNode(), result_s, getIndex().x()
);
Hello() {
for (auto const payload_size : payloadSizes) {
timer_names_[payload_size] = fmt::format("Collection {}", payload_size);
}
}

void Handler() {
auto proxy = this->getCollectionProxy();

std::vector<int32_t> payload(100, getIndex().x());
proxy.allreduce_h<&Hello::FInalHan, collective::PlusOp>(std::move(payload));
void finalHan(std::vector<int32_t> result) {
// std::string result_s = "";
// for(auto val : result){
// result_s.append(fmt::format("{} ", val));
// }
// fmt::print(
// "[{}]: Allreduce handler (Values=[{}]), idx={}\n",
// theContext()->getNode(), result_s, getIndex().x()
// );

col_send_done_ = true;
parent_->StopTimer(timer_names_.at(result.size()));
}

void handler(size_t payload_size) {
auto proxy = this->getCollectionProxy();

std::vector<int32_t> payload(payload_size, getIndex().x());
parent_->StartTimer(timer_names_.at(payload_size));
proxy.allreduce_h<&Hello::finalHan, collective::PlusOp>(std::move(payload));
}

bool col_send_done_ = false;
std::unordered_map<size_t, std::string> timer_names_= {};
MyTest* parent_ = {};
};

VT_PERF_TEST(MyTest, test_allreduce_collection_rabenseifner) {
auto range = vt::Index1D(int32_t{num_nodes_ * 2});
auto range = vt::Index1D(int32_t{num_nodes_});
auto proxy = vt::makeCollection<Hello>("test_collection_send")
.bounds(range)
.bulkInsert()
.wait();


auto const thisNode = vt::theContext()->getNode();
auto const nextNode = (thisNode + 1) % num_nodes_;

theCollective()->barrier();

proxy.broadcastCollective<&Hello::Handler>();
auto const num_elms_per_node = 1;
auto const elm = thisNode * num_elms_per_node;

// We run 1 coll elem per node, so it should be ok
// theSched()->runSchedulerWhile([&] { return !(elm->col_send_done_); });
//elm->col_send_done_ = false;
proxy[elm].tryGetLocalPtr()->parent_ = this;
proxy.broadcastCollective<&Hello::handler>(payloadSizes.front());
theSched()->runSchedulerWhile(
[&] { return !proxy[elm].tryGetLocalPtr()->col_send_done_; });
// for (auto payload_size : payloadSizes) {
// proxy.broadcastCollective<&Hello::handler>(payload_size);

// // We run 1 coll elem per node, so it should be ok
// theSched()->runSchedulerWhile([&] { return !proxy[elm].tryGetLocalPtr()->col_send_done_; });
// proxy[elm].tryGetLocalPtr()->col_send_done_ = false;
// }
}

VT_PERF_TEST_MAIN()

0 comments on commit 27d1929

Please sign in to comment.