Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement global-to-global dash::copy based on #659 #660

Open
wants to merge 3 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 195 additions & 37 deletions dash/include/dash/algorithm/Copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,19 @@ namespace internal {

template<typename InputValueType, typename OutputValueType>
struct local_copy_chunk {
const InputValueType *src;
OutputValueType *dest;
const size_t size;
using nonconst_input_value_type = typename std::remove_const<InputValueType>::type;
using nonconst_output_value_type = typename std::remove_const<OutputValueType>::type;

const nonconst_input_value_type *src;
nonconst_output_value_type *dest;
const size_t size;
};

template<typename InputValueType, typename OutputValueType>
void do_local_copies(
std::vector<local_copy_chunk<InputValueType, OutputValueType>>& chunks)
using local_copy_chunks = std::vector<local_copy_chunk<InputValueType, OutputValueType>>;

template<typename InputValueType, typename OutputValueType>
void do_local_copies(local_copy_chunks<InputValueType, OutputValueType>& chunks)
{
for (auto& chunk : chunks) {
std::copy(chunk.src, chunk.src + chunk.size, chunk.dest);
Expand Down Expand Up @@ -136,10 +141,13 @@ template <
typename ValueType,
typename GlobInputIt >
ValueType * copy_impl(
GlobInputIt begin,
GlobInputIt end,
ValueType * out_first,
std::vector<dart_handle_t> * handles)
GlobInputIt begin,
GlobInputIt end,
ValueType * out_first,
std::vector<dart_handle_t> * handles,
local_copy_chunks<
typename GlobInputIt::value_type,
ValueType> & local_chunks)
{
DASH_LOG_TRACE("dash::internal::copy_impl() global -> local",
"in_first:", begin.pos(),
Expand All @@ -166,8 +174,6 @@ ValueType * copy_impl(

ContiguousRangeSet<GlobInputIt> range_set{begin, end};

std::vector<local_copy_chunk<input_value_type, output_value_type>> local_chunks;

//
// Copy elements from every unit:
//
Expand Down Expand Up @@ -211,8 +217,6 @@ ValueType * copy_impl(
num_elem_copied += num_copy_elem;
}

do_local_copies(local_chunks);

DASH_ASSERT_EQ(num_elem_copied, num_elem_total,
"Failed to find all contiguous subranges in range");

Expand All @@ -233,10 +237,13 @@ template <
typename ValueType,
typename GlobOutputIt >
GlobOutputIt copy_impl(
ValueType * begin,
ValueType * end,
GlobOutputIt out_first,
std::vector<dart_handle_t> * handles)
ValueType * begin,
ValueType * end,
GlobOutputIt out_first,
std::vector<dart_handle_t> * handles,
local_copy_chunks<
ValueType,
typename GlobOutputIt::value_type> & local_chunks)
{
DASH_LOG_TRACE("dash::copy_impl() local -> global",
"in_first:", begin,
Expand Down Expand Up @@ -266,8 +273,6 @@ GlobOutputIt copy_impl(

ContiguousRangeSet<GlobOutputIt> range_set{out_first, out_last};

std::vector<local_copy_chunk<input_value_type, output_value_type>> local_chunks;

auto in_first = begin;

//
Expand Down Expand Up @@ -312,8 +317,6 @@ GlobOutputIt copy_impl(
num_elem_copied += num_copy_elem;
}

do_local_copies(local_chunks);

DASH_ASSERT_EQ(num_elem_copied, num_elem_total,
"Failed to find all contiguous subranges in range");

Expand Down Expand Up @@ -350,9 +353,10 @@ dash::Future<ValueType *> copy_async(
}

auto handles = std::make_shared<std::vector<dart_handle_t>>();

auto out_last = dash::internal::copy_impl(in_first, in_last,
out_first, handles.get());
dash::internal::local_copy_chunks<typename GlobInputIt::value_type, ValueType> local_chunks;
auto out_last = dash::internal::copy_impl(in_first, in_last, out_first,
handles.get(), local_chunks);
dash::internal::do_local_copies(local_chunks);

if (handles->empty()) {
DASH_LOG_TRACE("dash::copy_async", "all transfers completed");
Expand Down Expand Up @@ -434,24 +438,29 @@ ValueType * copy(
}

ValueType *out_last;
dash::internal::local_copy_chunks<typename GlobInputIt::value_type, ValueType> local_chunks;
if (UseHandles) {
std::vector<dart_handle_t> handles;
out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
&handles);
&handles,
local_chunks);
if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
"num_handles: ", handles.size());
dart_waitall_local(handles.data(), handles.size());
}
dash::internal::do_local_copies(local_chunks);

} else {
out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
nullptr);
nullptr,
local_chunks);
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete");
dash::internal::do_local_copies(local_chunks);
dart_flush_local_all(in_first.dart_gptr());
}

Expand Down Expand Up @@ -484,10 +493,13 @@ dash::Future<GlobOutputIt> copy_async(
}

auto handles = std::make_shared<std::vector<dart_handle_t>>();
dash::internal::local_copy_chunks<ValueType, typename GlobOutputIt::value_type> local_chunks;
auto out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
handles.get());
handles.get(),
local_chunks);
dash::internal::do_local_copies(local_chunks);

if (handles->empty()) {
return dash::Future<GlobOutputIt>(out_last);
Expand Down Expand Up @@ -556,12 +568,15 @@ GlobOutputIt copy(
DASH_LOG_TRACE("dash::copy()", "blocking, local to global");
// handles to wait on at the end
GlobOutputIt out_last;
dash::internal::local_copy_chunks<ValueType, typename GlobOutputIt::value_type> local_chunks;
if (UseHandles) {
std::vector<dart_handle_t> handles;
out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
&handles);
&handles,
local_chunks);
dash::internal::do_local_copies(local_chunks);

if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
Expand All @@ -572,8 +587,10 @@ GlobOutputIt copy(
out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
nullptr);
nullptr,
local_chunks);
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete");
dash::internal::do_local_copies(local_chunks);
dart_flush_all(out_first.dart_gptr());
}
return out_last;
Expand Down Expand Up @@ -629,25 +646,166 @@ copy_async(
}
#endif

struct ActiveDestination{};
struct ActiveSource{};

/**
* Specialization of \c dash::copy as global-to-global blocking copy
* operation.
*
* \ingroup DashAlgorithms
*/
template <typename ValueType, class GlobInputIt, class GlobOutputIt>
template <
class GlobInputIt,
class GlobOutputIt,
bool UseHandles = false>
GlobOutputIt copy(
GlobInputIt /*in_first*/,
GlobInputIt /*in_last*/,
GlobOutputIt /*out_first*/)
GlobInputIt in_first,
GlobInputIt in_last,
GlobOutputIt out_first,
ActiveDestination /*unused*/)
{
DASH_LOG_TRACE("dash::copy()", "blocking, global to global, active destination");

using size_type = typename GlobInputIt::size_type;
using input_value_type = typename GlobInputIt::value_type;
using output_value_type = typename GlobOutputIt::value_type;

size_type num_elem_total = dash::distance(in_first, in_last);
if (num_elem_total <= 0) {
DASH_LOG_TRACE("dash::copy", "input range empty");
return out_first;
}

auto g_out_first = out_first;
auto g_out_last = g_out_first + num_elem_total;

internal::ContiguousRangeSet<GlobOutputIt> range_set{g_out_first, g_out_last};

const auto & out_team = out_first.team();
out_team.barrier();

std::vector<dart_handle_t> handles;
std::vector<dart_handle_t>* handles_arg = UseHandles ? &handles : nullptr;

dash::internal::local_copy_chunks<input_value_type, output_value_type> local_chunks;

size_type num_elem_processed = 0;

for (auto range : range_set) {

auto cur_out_first = range.first;
auto num_copy_elem = range.second;

DASH_ASSERT_GT(num_copy_elem, 0,
"Number of elements to copy is 0");

// handle local data only
if (cur_out_first.is_local()) {
auto dest_ptr = cur_out_first.local();
auto src_ptr = in_first + num_elem_processed;
internal::copy_impl(src_ptr,
src_ptr + num_copy_elem,
dest_ptr,
handles_arg,
local_chunks);
}
num_elem_processed += num_copy_elem;
}

dash::internal::do_local_copies(local_chunks);

if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
"num_handles: ", handles.size());
dart_waitall_local(handles.data(), handles.size());
} else if (!UseHandles) {
dart_flush_local_all(in_first.dart_gptr());
}
out_team.barrier();

DASH_ASSERT_EQ(num_elem_processed, num_elem_total,
"Failed to find all contiguous subranges in range");

return g_out_last;
}

/**
* Specialization of \c dash::copy as global-to-global blocking copy
* operation.
*
* \ingroup DashAlgorithms
*/
template <
class GlobInputIt,
class GlobOutputIt,
bool UseHandles = false>
GlobOutputIt copy(
GlobInputIt in_first,
GlobInputIt in_last,
GlobOutputIt out_first,
ActiveSource /*unused*/)
{
DASH_LOG_TRACE("dash::copy()", "blocking, global to global");

// TODO:
// - Implement adapter for local-to-global dash::copy here
// - Return if global input range has no local sub-range
using size_type = typename GlobInputIt::size_type;
using input_value_type = typename GlobInputIt::value_type;
using output_value_type = typename GlobOutputIt::value_type;

size_type num_elem_total = dash::distance(in_first, in_last);
if (num_elem_total <= 0) {
DASH_LOG_TRACE("dash::copy", "input range empty");
return out_first;
}

internal::ContiguousRangeSet<GlobOutputIt> range_set{in_first, in_last};

const auto & in_team = in_first.team();
in_team.barrier();

std::vector<dart_handle_t> handles;
std::vector<dart_handle_t>* handles_arg = UseHandles ? &handles : nullptr;

dash::internal::local_copy_chunks<input_value_type, output_value_type> local_chunks;

size_type num_elem_processed = 0;

for (auto range : range_set) {

auto cur_in_first = range.first;
auto num_copy_elem = range.second;

DASH_ASSERT_GT(num_copy_elem, 0,
"Number of elements to copy is 0");

// handle local data only
if (cur_in_first.is_local()) {
auto src_ptr = cur_in_first.local();
auto dest_ptr = out_first + num_elem_processed;
internal::copy_impl(src_ptr,
src_ptr + num_copy_elem,
dest_ptr,
handles_arg,
local_chunks);
}
num_elem_processed += num_copy_elem;
}

internal::do_local_copies(local_chunks);

if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
"num_handles: ", handles.size());
dart_waitall(handles.data(), handles.size());
} else if (!UseHandles) {
dart_flush_all(out_first.dart_gptr());
}
in_team.barrier();

DASH_ASSERT_EQ(num_elem_processed, num_elem_total,
"Failed to find all contiguous subranges in range");

return GlobOutputIt();
return out_first + num_elem_total;
}

#endif // DOXYGEN
Expand Down
Loading