diff --git a/src/vt/collective/reduce/allreduce/helpers.h b/src/vt/collective/reduce/allreduce/helpers.h index 5dad947dfe..d2b196862d 100644 --- a/src/vt/collective/reduce/allreduce/helpers.h +++ b/src/vt/collective/reduce/allreduce/helpers.h @@ -58,6 +58,30 @@ using remove_cvref = std::remove_cv_t>; namespace vt::collective::reduce::allreduce { +template +struct function_traits; // General template declaration. + +// Specialization for function pointers. +template +struct function_traits { + using return_type = Ret; + static constexpr std::size_t arity = sizeof...(Args); + using args_tuple = std::tuple; + + template + using arg_type = typename std::tuple_element>::type; +}; + +template +struct function_traits { + using return_type = Ret; + static constexpr std::size_t arity = sizeof...(Args); + using args_tuple = std::tuple; + + template + using arg_type = typename std::tuple_element>::type; +}; + // Primary template template struct ShouldUseView { diff --git a/src/vt/collective/reduce/allreduce/rabenseifner.h b/src/vt/collective/reduce/allreduce/rabenseifner.h index 2f6d698ae3..1780e7fc72 100644 --- a/src/vt/collective/reduce/allreduce/rabenseifner.h +++ b/src/vt/collective/reduce/allreduce/rabenseifner.h @@ -41,6 +41,7 @@ //@HEADER */ +#include "vt/configs/types/types_type.h" #if !defined INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_H #define INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_H @@ -87,11 +88,16 @@ struct Rabenseifner { static constexpr bool KokkosPaylod = ShouldUseView_v; template - Rabenseifner(GroupType group, Args&&... args); + Rabenseifner(detail::StrongVrtProxy proxy, Args&&... args); + + template + Rabenseifner(detail::StrongGroup group, Args&&... args); template Rabenseifner(vt::objgroup::proxy::Proxy proxy, Args&&... args); + template + void localReduce(IdxT idx); /** * \brief Initialize the allreduce algorithm. * @@ -265,25 +271,36 @@ struct Rabenseifner { vt::objgroup::proxy::Proxy proxy_ = {}; vt::objgroup::proxy::Proxy parent_proxy_ = {}; + VirtualProxyType collection_proxy_ = {}; size_t id_ = 0; std::unordered_map states_ = {}; - /// Only used when non-default group is beign used + /// Sorted list of Nodes that take part in allreduce std::vector nodes_ = {}; NodeType num_nodes_ = {}; + + /// Represents an index inside nodes_ NodeType this_node_ = {}; bool is_even_ = false; + + /// Num steps for each scatter/gather phase int32_t num_steps_ = {}; + + /// 2^num_steps_ int32_t nprocs_pof2_ = {}; int32_t nprocs_rem_ = {}; + /// For non-power-of-2 number of nodes this respresents whether current Node + /// is excluded (has value of -1) from computation NodeType vrt_node_ = {}; + bool is_part_of_adjustment_group_ = false; + static inline const std::string name_ = "Rabenseifner"; - static inline const ReducerType type_ = ReducerType::Rabenseifner; + static inline constexpr ReducerType type_ = ReducerType::Rabenseifner; }; } // namespace vt::collective::reduce::allreduce diff --git a/src/vt/collective/reduce/allreduce/rabenseifner.impl.h b/src/vt/collective/reduce/allreduce/rabenseifner.impl.h index 2ed68dcd92..fbdc7d77a7 100644 --- a/src/vt/collective/reduce/allreduce/rabenseifner.impl.h +++ b/src/vt/collective/reduce/allreduce/rabenseifner.impl.h @@ -41,8 +41,6 @@ //@HEADER */ -#include "vt/configs/debug/debug_print.h" -#include #if !defined INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_IMPL_H #define INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_IMPL_H @@ -56,16 +54,30 @@ #include "vt/configs/types/types_sentinels.h" #include "vt/registry/auto/auto_registry.h" #include "vt/utils/fntraits/fntraits.h" +#include "vt/configs/debug/debug_print.h" +#include #include namespace vt::collective::reduce::allreduce { +template class Op, auto finalHandler> +template +Rabenseifner::Rabenseifner(detail::StrongVrtProxy proxy, Args&&... data){ + vt_debug_print(terse, allreduce, "Rabenseifner: proxy={:x} \n", proxy.get()); +} + +template class Op, auto finalHandler> +template +void Rabenseifner::localReduce(IdxT idx){ + vt_debug_print(terse, allreduce, "Rabenseifner: idx={} \n", idx); +} + template class Op, auto finalHandler> template Rabenseifner::Rabenseifner( - GroupType group, Args&&... data) - : nodes_(theGroup()->GetGroupNodes(group)), + detail::StrongGroup group, Args&&... data) + : nodes_(theGroup()->GetGroupNodes(group.get())), num_nodes_(nodes_.size()), this_node_(theContext()->getNode()), num_steps_(static_cast(log2(num_nodes_))), @@ -77,8 +89,8 @@ Rabenseifner::Rabenseifner( for(auto& node : nodes_){ nodes_info += fmt::format("{} ", node); } - auto const is_default_group = group == default_group; - auto const is_part_of_allreduce = (not is_default_group and theGroup()->inGroup(group)) or is_default_group; + auto const is_default_group = group.get() == default_group; + auto const is_part_of_allreduce = (not is_default_group and theGroup()->inGroup(group.get())) or is_default_group; vt_debug_print( terse, allreduce, @@ -87,7 +99,7 @@ Rabenseifner::Rabenseifner( is_default_group, is_part_of_allreduce, num_nodes_, nodes_info ); - if (not is_default_group and theGroup()->inGroup(group)) { + if (not is_default_group and theGroup()->inGroup(group.get())) { // vtAssert(theGroup()->inGroup(group), fmt::format("This node is not part of group {:x}!", group)); auto it = std::find(nodes_.begin(), nodes_.end(), theContext()->getNode()); diff --git a/src/vt/collective/reduce/allreduce/rabenseifner_group.h b/src/vt/collective/reduce/allreduce/rabenseifner_group.h index ce5409106c..ff25ee0297 100644 --- a/src/vt/collective/reduce/allreduce/rabenseifner_group.h +++ b/src/vt/collective/reduce/allreduce/rabenseifner_group.h @@ -77,19 +77,6 @@ struct StateHolder : StateHolderBase { // } }; -template -struct function_traits; // General template declaration. - -// Specialization for function pointers. -template -struct function_traits { - using return_type = Ret; - static constexpr std::size_t arity = sizeof...(Args); - using args_tuple = std::tuple; - - template - using arg_type = typename std::tuple_element>::type; -}; /** * \struct Rabenseifner diff --git a/src/vt/collective/reduce/allreduce/rabenseifner_msg.h b/src/vt/collective/reduce/allreduce/rabenseifner_msg.h index 9bf2653d7b..a5e53aa692 100644 --- a/src/vt/collective/reduce/allreduce/rabenseifner_msg.h +++ b/src/vt/collective/reduce/allreduce/rabenseifner_msg.h @@ -41,11 +41,13 @@ //@HEADER */ + #if !defined INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_MSG_H #define INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_MSG_H #include "vt/config.h" #include "vt/messaging/active.h" - +#include "vt/configs/debug/debug_print.h" +#include "vt/collective/reduce/operators/default_msg.h" namespace vt::collective::reduce::allreduce { template @@ -86,6 +88,59 @@ struct RabenseifnerMsg : Message { s | step_; } +struct NoCombine {}; + +template +struct IsTuple : std::false_type {}; +template +struct IsTuple> : std::true_type {}; + + template + static void combine(MsgT* m1, MsgT* m2) { + Op()(m1->getVal(), m2->getConstVal()); + } + + template + static void FinalHandler(ReduceTMsg* msg) { + // using MsgT = ReduceTMsg; + vt_debug_print( + terse, reduce, + "FinalHandler: reduce root: ptr={}\n", print_ptr(msg) + ); + // if (msg->isRoot()) { + // vt_debug_print( + // terse, reduce, + // "FinalHandler::ROOT: reduce root: ptr={}\n", print_ptr(msg) + // ); + // if (msg->hasValidCallback()) { + // envelopeUnlockForForwarding(msg->env); + // if (msg->isParamCallback()) { + // if constexpr (IsTuple::value) { + // msg->getParamCallback().sendTuple(std::move(msg->getVal())); + // } + // } else { + // // We need to force the type to the more specific one here + // auto cb = msg->getMsgCallback(); + // auto typed_cb = reinterpret_cast*>(&cb); + // typed_cb->sendMsg(msg); + // } + // } else if (msg->root_handler_ != uninitialized_handler) { + // auto_registry::getAutoHandler(msg->root_handler_)->dispatch(msg, nullptr); + // } + // } else { + // MsgT* fst_msg = msg; + // MsgT* cur_msg = msg->template getNext(); + // vt_debug_print( + // terse, reduce, + // "FinalHandler::leaf: fst ptr={}\n", print_ptr(fst_msg) + // ); + // while (cur_msg != nullptr) { + // RabenseifnerMsg::combine(fst_msg, cur_msg); + // cur_msg = cur_msg->template getNext(); + // } + // } + } + const Scalar* val_ = {}; size_t size_ = {}; size_t id_ = {}; diff --git a/src/vt/group/group_manager.impl.h b/src/vt/group/group_manager.impl.h index 409cf68421..68b52def56 100644 --- a/src/vt/group/group_manager.impl.h +++ b/src/vt/group/group_manager.impl.h @@ -170,7 +170,11 @@ void GroupManager::allreduce(GroupType group, Args&&... args) { using Reducer = collective::reduce::allreduce::Rabenseifner; // TODO; Save the proxy so it can be deleted afterwards - auto proxy = theObjGroup()->makeCollective("reducer", group, std::forward(args)...); + auto proxy = theObjGroup()->makeCollective( + "reducer", collective::reduce::detail::StrongGroup{group}, + std::forward(args)... + ); + if (iter->second->is_in_group) { auto const this_node = theContext()->getNode(); auto id = proxy[this_node].get()->id_ - 1; diff --git a/src/vt/vrt/collection/manager.h b/src/vt/vrt/collection/manager.h index 9ef6540206..21166066e9 100644 --- a/src/vt/vrt/collection/manager.h +++ b/src/vt/vrt/collection/manager.h @@ -41,6 +41,8 @@ //@HEADER */ +#include "vt/configs/types/types_type.h" +#include #if !defined INCLUDED_VT_VRT_COLLECTION_MANAGER_H #define INCLUDED_VT_VRT_COLLECTION_MANAGER_H @@ -743,6 +745,11 @@ struct CollectionManager bool instrument ); + template class Op, typename ...Args> + messaging::PendingSend reduceLocal( + CollectionProxyWrapType const& proxy, Args &&... args + ); + /** * \brief Reduce over a collection * @@ -1766,6 +1773,10 @@ struct CollectionManager VirtualIDType next_rooted_id_ = 0; TypelessHolder typeless_holder_; std::unordered_map reduce_stamp_; + + // Allreduce stuff, probably should be moved elsewhere + std::unordered_map rabenseifner_reducers_; + std::unordered_map waiting_count_ = {}; }; }}} /* end namespace vt::vrt::collection */ diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index a5b82f12f5..cdce21e620 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -41,6 +41,7 @@ //@HEADER */ +#include "vt/collective/reduce/scoping/strong_types.h" #if !defined INCLUDED_VT_VRT_COLLECTION_MANAGER_IMPL_H #define INCLUDED_VT_VRT_COLLECTION_MANAGER_IMPL_H @@ -184,7 +185,7 @@ GroupType CollectionManager::createGroupCollection( } vt_debug_print( - normal, allreduce, + normal, vrt_coll, "group finished construction: proxy={:x}, new_group={:x}, use_group={}, " "ready={}, root={}, is_group_default={}\n", proxy, new_group, elm_holder->useGroup(), elm_holder->groupReady(), @@ -870,6 +871,71 @@ messaging::PendingSend CollectionManager::broadcastMsgUntypedHandler( } } +template < + auto f, typename ColT, template class Op, typename... Args> +messaging::PendingSend CollectionManager::reduceLocal( + CollectionProxyWrapType const& proxy, Args&&... args) { + using namespace collective::reduce::allreduce; + using DataT = typename function_traits::template arg_type<0>; + + using Reducer = collective::reduce::allreduce::Rabenseifner; + + using IndexT = typename ColT::IndexType; + + // Get the current running index context + IndexT idx = *queryIndexContext(); + auto const col_proxy = proxy.getProxy(); + auto elm_holder = findElmHolder(col_proxy); + std::size_t num_elms = elm_holder->numElements(); + + auto const group_ready = elm_holder->groupReady(); + auto const send_group = elm_holder->useGroup(); + auto const group = elm_holder->group(); + bool const use_group = group_ready && send_group; + + // First time here + if (waiting_count_[col_proxy] == 0) { + if (use_group) { + // theGroup()->allreduce(group, ); + } else { + auto obj_proxy = theObjGroup()->makeCollective( + "reducer", collective::reduce::detail::StrongVrtProxy{col_proxy}, + std::forward(args)... + ); + + rabenseifner_reducers_[col_proxy] = obj_proxy.getProxy(); + obj_proxy[theContext()->getNode()].get()->proxy_ = obj_proxy; + + obj_proxy[theContext()->getNode()].get()->localReduce(idx); + } + }else{ + if (use_group) { + // theGroup()->allreduce(group, ); + } else { + auto obj_proxy = rabenseifner_reducers_.at(col_proxy); + auto typed_proxy = static_cast>(obj_proxy); + typed_proxy[theContext()->getNode()].get()->localReduce(idx); + } + } + + waiting_count_[col_proxy]++; + bool is_ready = waiting_count_[col_proxy] == num_elms; + vt_debug_print( + terse, allreduce, "reduceLocal: idx={} num_elms={} is_ready={}\n", idx, + num_elms, is_ready); + if (is_ready) { + if (use_group) { + // theGroup()->allreduce(group, ); + } else { + auto obj_proxy = rabenseifner_reducers_[col_proxy]; + auto typed_proxy = static_cast>(obj_proxy); + typed_proxy[theContext()->getNode()].get()->localReduce(idx); + } + } + + return messaging::PendingSend{nullptr}; +} + template *f> messaging::PendingSend CollectionManager::reduceMsgExpr( CollectionProxyWrapType const& proxy, diff --git a/src/vt/vrt/collection/reducable/reducable.h b/src/vt/vrt/collection/reducable/reducable.h index b748cddbbb..dcaf7c0650 100644 --- a/src/vt/vrt/collection/reducable/reducable.h +++ b/src/vt/vrt/collection/reducable/reducable.h @@ -85,6 +85,15 @@ struct Reducable : BaseProxyT { Args&&... args ) const; + template < + auto f, + template class Op = collective::NoneOp, + typename... Args + > + messaging::PendingSend allreduce_h( + Args&&... args + ) const; + /** * \brief Reduce back to a point target. Performs a reduction using operator * `Op` followed by a send to `f` with the result. diff --git a/src/vt/vrt/collection/reducable/reducable.impl.h b/src/vt/vrt/collection/reducable/reducable.impl.h index 39afc6fca6..564a81827a 100644 --- a/src/vt/vrt/collection/reducable/reducable.impl.h +++ b/src/vt/vrt/collection/reducable/reducable.impl.h @@ -41,6 +41,8 @@ //@HEADER */ +#include "vt/collective/reduce/allreduce/rabenseifner_msg.h" +#include #if !defined INCLUDED_VT_VRT_COLLECTION_REDUCABLE_REDUCABLE_IMPL_H #define INCLUDED_VT_VRT_COLLECTION_REDUCABLE_REDUCABLE_IMPL_H @@ -80,6 +82,16 @@ messaging::PendingSend Reducable::allreduce( >(proxy, msg.get(), stamp, root_node); } +template +template class Op, typename... Args> +messaging::PendingSend Reducable::allreduce_h( + Args&&... args +) const { + auto const proxy = this->getProxy(); + return theCollection()->reduceLocal( + proxy, std::forward(args)...); +} + template template class Op, typename Target, typename... Args> messaging::PendingSend Reducable::reduce( diff --git a/tests/perf/allreduce.cc b/tests/perf/allreduce.cc index b0603d4384..9166056f2d 100644 --- a/tests/perf/allreduce.cc +++ b/tests/perf/allreduce.cc @@ -211,53 +211,53 @@ VT_PERF_TEST(MyTestKokkos, test_allreduce_rabenseifner_kokkos) { } #endif // MAGISTRATE_KOKKOS_ENABLED -VT_PERF_TEST(MyTest, test_allreduce_recursive_doubling) { - auto proxy = vt::theObjGroup()->makeCollective>( - "test_allreduce_recursive_doubling", this, "Recursive doubling vector" - ); +// VT_PERF_TEST(MyTest, test_allreduce_recursive_doubling) { +// auto proxy = vt::theObjGroup()->makeCollective>( +// "test_allreduce_recursive_doubling", this, "Recursive doubling vector" +// ); - using DataT = decltype(data); - using Reducer = collective::reduce::allreduce::RecursiveDoubling< - DataT, collective::PlusOp, &NodeObj::handlerVec>; +// using DataT = decltype(data); +// using Reducer = collective::reduce::allreduce::RecursiveDoubling< +// DataT, collective::PlusOp, &NodeObj::handlerVec>; - for (auto payload_size : payloadSizes) { - data.resize(payload_size, theContext()->getNode() + 1); +// for (auto payload_size : payloadSizes) { +// data.resize(payload_size, theContext()->getNode() + 1); - theCollective()->barrier(); - auto* obj_ptr = proxy[my_node_].get(); - StartTimer(obj_ptr->timer_names_.at(payload_size)); - theObjGroup()->allreduce(proxy, data); +// theCollective()->barrier(); +// auto* obj_ptr = proxy[my_node_].get(); +// StartTimer(obj_ptr->timer_names_.at(payload_size)); +// theObjGroup()->allreduce(proxy, data); - // theSched()->runSchedulerWhile( - // [obj_ptr] { return !obj_ptr->allreduce_done_; }); - obj_ptr->allreduce_done_ = false; - } -} +// // theSched()->runSchedulerWhile( +// // [obj_ptr] { return !obj_ptr->allreduce_done_; }); +// obj_ptr->allreduce_done_ = false; +// } +// } #if MAGISTRATE_KOKKOS_ENABLED -VT_PERF_TEST(MyTestKokkos, test_allreduce_recursive_doubling_kokkos) { - auto proxy = vt::theObjGroup()->makeCollective>( - "test_allreduce_rabenseifner", this, "Recursive doubling view" - ); - - using DataT = decltype(view); - using Reducer = collective::reduce::allreduce::RecursiveDoubling< - DataT, collective::PlusOp, - &NodeObj::handlerView - >; - - for (auto payload_size : payloadSizes) { - view = Kokkos::View("view", payload_size); - - theCollective()->barrier(); - auto* obj_ptr = proxy[my_node_].get(); - StartTimer(obj_ptr->timer_names_.at(payload_size)); - theObjGroup()->allreduce(proxy, view); - - // theSched()->runSchedulerWhile([obj_ptr] { return !obj_ptr->allreduce_done_; }); - obj_ptr->allreduce_done_ = false; - } -} +// VT_PERF_TEST(MyTestKokkos, test_allreduce_recursive_doubling_kokkos) { +// auto proxy = vt::theObjGroup()->makeCollective>( +// "test_allreduce_rabenseifner", this, "Recursive doubling view" +// ); + +// using DataT = decltype(view); +// using Reducer = collective::reduce::allreduce::RecursiveDoubling< +// DataT, collective::PlusOp, +// &NodeObj::handlerView +// >; + +// for (auto payload_size : payloadSizes) { +// view = Kokkos::View("view", payload_size); + +// theCollective()->barrier(); +// auto* obj_ptr = proxy[my_node_].get(); +// StartTimer(obj_ptr->timer_names_.at(payload_size)); +// theObjGroup()->allreduce(proxy, view); + +// // theSched()->runSchedulerWhile([obj_ptr] { return !obj_ptr->allreduce_done_; }); +// obj_ptr->allreduce_done_ = false; +// } +// } #endif // MAGISTRATE_KOKKOS_ENABLED void allreduce_group_han(std::vector vec){ @@ -274,17 +274,22 @@ VT_PERF_TEST(MyTest, test_allreduce_group_rabenseifner) { struct Hello : vt::Collection { Hello() = default; - + void FInalHan(NodeType result) { + fmt::print("Allreduce result is {} \n", result); + } void AllredHandler(NodeType result) { fmt::print("Allreduce result is {} \n", result); + + auto proxy = this->getCollectionProxy(); + proxy.allreduce_h<&Hello::FInalHan, collective::PlusOp>(theContext()->getNode()); } void Handler() { auto proxy = this->getCollectionProxy(); fmt::print("[{}] Hello from idx={} \n", theContext()->getNode(), getIndex()); - proxy.allreduce<&Hello::AllredHandler, collective::PlusOp>(theContext()->getNode()); - + // proxy.reduce<&Hello::AllredHandler, collective::PlusOp>(theContext()->getNode(), theContext()->getNode()); + proxy.allreduce_h<&Hello::FInalHan, collective::PlusOp>(theContext()->getNode()); col_send_done_ = true; } @@ -307,6 +312,7 @@ VT_PERF_TEST(MyTest, test_allreduce_collection_rabenseifner) { theCollective()->barrier(); proxy.broadcastCollective<&Hello::Handler>(); + // proxy.allreduce<&Hello::AllredHandler, collective::PlusOp>(theContext()->getNode()); // We run 1 coll elem per node, so it should be ok