diff --git a/include/quo-vadis-pthread.h b/include/quo-vadis-pthread.h index a21c1a6..a54399d 100644 --- a/include/quo-vadis-pthread.h +++ b/include/quo-vadis-pthread.h @@ -27,6 +27,21 @@ extern "C" { #endif +/** + * Mapping policies types. + */ +typedef enum qv_policy_s { + QV_POLICY_PACKED = 1, + QV_POLICY_COMPACT = 1, + QV_POLICY_CLOSE = 1, + QV_POLICY_SPREAD = 2, + QV_POLICY_DISTRIBUTE = 3, + QV_POLICY_ALTERNATE = 3, + QV_POLICY_CORESFIRST = 3, + QV_POLICY_SCATTER = 4, + QV_POLICY_CHOOSE = 5, +} qv_policy_t; + /** * Similar to pthread_create(3). */ @@ -66,6 +81,16 @@ qv_pthread_scopes_free( qv_scope_t **scopes ); +/** + * Fills color array used in qv_pthread_scope_split*. + */ +int +qv_pthread_colors_fill( + int *color_array, + int array_size, + qv_policy_t policy +); + #ifdef __cplusplus } #endif diff --git a/src/quo-vadis-pthread.cc b/src/quo-vadis-pthread.cc index 750a5c3..4ca1e59 100644 --- a/src/quo-vadis-pthread.cc +++ b/src/quo-vadis-pthread.cc @@ -1,5 +1,5 @@ /* -*- Mode: C++; c-basic-offset:4; indent-tabs-mode:nil -*- */ -/* +/*qv_policy_t * Copyright (c) 2022-2024 Triad National Security, LLC * All rights reserved. * @@ -116,7 +116,7 @@ qv_pthread_create( return ENOMEM; } return pthread_create( - thread, attr, qvi_pthread_group_s::call_first_from_pthread_create, cargs + thread, attr, qvi_pthread_group::call_first_from_pthread_create, cargs ); } @@ -135,6 +135,16 @@ qv_pthread_scopes_free( qvi_catch_and_return(); } +int +qv_pthread_colors_fill( + int *,//color_array, + int, // array_size, + qv_policy_t //policy +){ + //TODO(GM) implement + return QV_ERR_NOT_SUPPORTED; +} + /* * vim: ft=cpp ts=4 sts=4 sw=4 expandtab */ diff --git a/src/quo-vadis.cc b/src/quo-vadis.cc index 676858e..a10433a 100644 --- a/src/quo-vadis.cc +++ b/src/quo-vadis.cc @@ -179,6 +179,7 @@ qv_scope_split( // We use the sentinel value QV_HW_OBJ_LAST to differentiate between // calls from split() and split_at(). Since this call doesn't have a // hardware type argument, we use QV_HW_OBJ_LAST as the hardware type. + // return return scope->split(npieces, color, QV_HW_OBJ_LAST, subscope); } qvi_catch_and_return(); diff --git a/src/qvi-bbuff.cc b/src/qvi-bbuff.cc index 39104e2..7b4566f 100644 --- a/src/qvi-bbuff.cc +++ b/src/qvi-bbuff.cc @@ -112,6 +112,14 @@ qvi_bbuff_dup( return qvi_dup(src, buff); } +int +qvi_bbuff_copy( + const qvi_bbuff &src, + qvi_bbuff *buff +) { + return qvi_copy(src, buff); +} + void qvi_bbuff_delete( qvi_bbuff **buff diff --git a/src/qvi-bbuff.h b/src/qvi-bbuff.h index ffe103b..46980ea 100644 --- a/src/qvi-bbuff.h +++ b/src/qvi-bbuff.h @@ -73,11 +73,25 @@ qvi_bbuff_dup( qvi_bbuff **buff ); +int +qvi_bbuff_copy( + const qvi_bbuff &src, + qvi_bbuff *buff +); + void qvi_bbuff_delete( qvi_bbuff **buff ); +enum qvi_alloc_s { + ALLOC_SHARED = 0, + ALLOC_SHARED_GLOBAL, + ALLOC_PRIVATE, +}; + +typedef enum qvi_alloc_s qvi_alloc_type_t; + #endif /* diff --git a/src/qvi-common.h b/src/qvi-common.h index 17273a8..c35594c 100644 --- a/src/qvi-common.h +++ b/src/qvi-common.h @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include diff --git a/src/qvi-group-mpi.h b/src/qvi-group-mpi.h index e86db2c..3d3154c 100644 --- a/src/qvi-group-mpi.h +++ b/src/qvi-group-mpi.h @@ -20,6 +20,7 @@ #include "qvi-common.h" #include "qvi-group.h" #include "qvi-mpi.h" +#include "qvi-bbuff.h" struct qvi_group_mpi : public qvi_group { protected: @@ -84,7 +85,7 @@ struct qvi_group_mpi : public qvi_group { gather( qvi_bbuff *txbuff, int root, - bool *shared, + qvi_alloc_type_t *shared, qvi_bbuff ***rxbuffs ) { return qvi_mpi_group_gather_bbuffs( diff --git a/src/qvi-group-omp.h b/src/qvi-group-omp.h index 1abca33..a33b150 100644 --- a/src/qvi-group-omp.h +++ b/src/qvi-group-omp.h @@ -23,6 +23,7 @@ #include "qvi-common.h" #include "qvi-group.h" #include "qvi-omp.h" +#include "qvi-bbuff.h" struct qvi_group_omp : public qvi_group { private: @@ -90,7 +91,7 @@ struct qvi_group_omp : public qvi_group { gather( qvi_bbuff *txbuff, int root, - bool *shared, + qvi_alloc_type_t *shared, qvi_bbuff ***rxbuffs ) { return m_ompgroup->gather(txbuff, root, shared, rxbuffs); diff --git a/src/qvi-group-process.h b/src/qvi-group-process.h index 9fb8a96..e90fa29 100644 --- a/src/qvi-group-process.h +++ b/src/qvi-group-process.h @@ -17,6 +17,7 @@ #include "qvi-common.h" #include "qvi-group.h" #include "qvi-process.h" +#include "qvi-bbuff.h" struct qvi_group_process : public qvi_group { protected: @@ -85,7 +86,7 @@ struct qvi_group_process : public qvi_group { gather( qvi_bbuff *txbuff, int root, - bool *shared, + qvi_alloc_type_t *shared, qvi_bbuff ***rxbuffs ) { return qvi_process_group_gather_bbuffs( diff --git a/src/qvi-group-pthread.cc b/src/qvi-group-pthread.cc index 4a9e1a4..3dffc66 100644 --- a/src/qvi-group-pthread.cc +++ b/src/qvi-group-pthread.cc @@ -17,7 +17,7 @@ qvi_group_pthread::qvi_group_pthread( int group_size ) { - const int rc = qvi_new(&thgroup, group_size); + const int rc = qvi_new(&thgroup, group_size, 0); if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error(); } @@ -36,12 +36,21 @@ qvi_group_pthread::self( int qvi_group_pthread::split( - int, - int, - qvi_group ** + int color , + int key, + qvi_group ** child ) { - // TODO(skg) - return QV_ERR_NOT_SUPPORTED; + qvi_group_pthread *ichild = nullptr; + int rc = qvi_new(&ichild); + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; + + rc = thgroup->split(color, key, &ichild->thgroup); +out: + if (qvi_unlikely(rc != QV_SUCCESS)) { + qvi_delete(&ichild); + } + *child = ichild; + return rc; } /* diff --git a/src/qvi-group-pthread.h b/src/qvi-group-pthread.h index f02517c..143284b 100644 --- a/src/qvi-group-pthread.h +++ b/src/qvi-group-pthread.h @@ -17,12 +17,13 @@ #include "qvi-common.h" #include "qvi-group.h" #include "qvi-pthread.h" +#include "qvi-bbuff.h" struct qvi_group_pthread : public qvi_group { /** Underlying group instance. */ qvi_pthread_group_t *thgroup = nullptr; /** Constructor. */ - qvi_group_pthread(void) = delete; + qvi_group_pthread(void) = default; /** Constructor. */ qvi_group_pthread( int group_size @@ -87,7 +88,7 @@ struct qvi_group_pthread : public qvi_group { gather( qvi_bbuff *txbuff, int root, - bool *shared, + qvi_alloc_type_t *shared, qvi_bbuff ***rxbuffs ) { return thgroup->gather( diff --git a/src/qvi-group.h b/src/qvi-group.h index ff9d0b6..ee31c40 100644 --- a/src/qvi-group.h +++ b/src/qvi-group.h @@ -19,6 +19,7 @@ #include "qvi-common.h" #include "qvi-utils.h" +#include "qvi-bbuff.h" /** Group ID type. */ using qvi_group_id_t = uint64_t; @@ -82,7 +83,7 @@ struct qvi_group : qvi_refc { gather( qvi_bbuff *txbuff, int root, - bool *shared, + qvi_alloc_type_t *shared, qvi_bbuff ***rxbuffs ) = 0; /** Scatters bbuffs from specified root. */ diff --git a/src/qvi-hwsplit.cc b/src/qvi-hwsplit.cc index 052376e..df72f9c 100644 --- a/src/qvi-hwsplit.cc +++ b/src/qvi-hwsplit.cc @@ -641,7 +641,9 @@ qvi_hwsplit_coll::scatter_values( } rc = group->scatter(txbuffs.data(), rootid, &rxbuff); - if (qvi_unlikely(rc != QV_SUCCESS)) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) { + goto out; + } *value = *(TYPE *)rxbuff->data(); out: @@ -692,7 +694,7 @@ qvi_hwsplit_coll::gather_values( return rc; } // Gather the values to the root. - bool shared = false; + qvi_alloc_type_t shared = ALLOC_PRIVATE; qvi_bbuff **bbuffs = nullptr; rc = group->gather(txbuff, rootid, &shared, &bbuffs); if (qvi_unlikely(rc != QV_SUCCESS)) goto out; @@ -705,14 +707,15 @@ qvi_hwsplit_coll::gather_values( } } out: - if (!shared || (shared && (group->rank() == rootid))) { + if ((ALLOC_PRIVATE == shared) || ((ALLOC_SHARED == shared) && (group->rank() == rootid))) { if (bbuffs) { - for (uint_t i = 0; i < group_size; ++i) { + for (uint_t i = 0; i < group_size; ++i) { qvi_bbuff_delete(&bbuffs[i]); } delete[] bbuffs; } } + qvi_bbuff_delete(&txbuff); if (qvi_unlikely(rc != QV_SUCCESS)) { // If something went wrong, just zero-initialize the values. @@ -733,7 +736,7 @@ qvi_hwsplit_coll::gather_hwpools( int rc = txpool->packinto(&txbuff); if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // Gather the values to the root. - bool shared = false; + qvi_alloc_type_t shared = ALLOC_PRIVATE; qvi_bbuff **bbuffs = nullptr; rc = group->gather(&txbuff, rootid, &shared, &bbuffs); if (rc != QV_SUCCESS) goto out; @@ -749,7 +752,7 @@ qvi_hwsplit_coll::gather_hwpools( } } out: - if (!shared || (shared && (group->rank() == rootid))) { + if ((ALLOC_PRIVATE == shared) || ((ALLOC_SHARED == shared) && (group->rank() == rootid))) { if (bbuffs) { for (uint_t i = 0; i < group_size; ++i) { qvi_bbuff_delete(&bbuffs[i]); @@ -757,6 +760,7 @@ qvi_hwsplit_coll::gather_hwpools( delete[] bbuffs; } } + if (rc != QV_SUCCESS) { // If something went wrong, just zero-initialize the pools. rxpools = {}; diff --git a/src/qvi-mpi.cc b/src/qvi-mpi.cc index fa66e4e..a126430 100644 --- a/src/qvi-mpi.cc +++ b/src/qvi-mpi.cc @@ -365,7 +365,7 @@ qvi_mpi_group_gather_bbuffs( qvi_mpi_group_t *group, qvi_bbuff *txbuff, int root, - bool *shared_alloc, + qvi_alloc_type_t *shared_alloc, qvi_bbuff ***rxbuffs ) { const int send_count = (int)txbuff->size(); @@ -434,7 +434,7 @@ qvi_mpi_group_gather_bbuffs( bbuffs = nullptr; } *rxbuffs = bbuffs; - *shared_alloc = false; + *shared_alloc = ALLOC_PRIVATE; return rc; } diff --git a/src/qvi-mpi.h b/src/qvi-mpi.h index b131042..24a14e0 100644 --- a/src/qvi-mpi.h +++ b/src/qvi-mpi.h @@ -20,6 +20,7 @@ #include "qvi-common.h" #include "qvi-group.h" #include "quo-vadis-mpi.h" // IWYU pragma: keep +#include "qvi-bbuff.h" // Forward declarations. struct qvi_mpi_s; @@ -151,7 +152,7 @@ qvi_mpi_group_gather_bbuffs( qvi_mpi_group_t *group, qvi_bbuff *txbuff, int root, - bool *shared_alloc, + qvi_alloc_type_t *shared_alloc, qvi_bbuff ***rxbuffs ); diff --git a/src/qvi-omp.cc b/src/qvi-omp.cc index 763756b..3ea19a3 100644 --- a/src/qvi-omp.cc +++ b/src/qvi-omp.cc @@ -62,6 +62,15 @@ qvi_omp_group::barrier(void) { // TODO(skg) What should we do about barriers here? In particular, we need // to be careful about sub-groups, etc. + if (0 == m_rank){ + omp_set_num_threads(m_size); + } + #pragma omp barrier + + int level = omp_get_level(); + assert(level > 0); + int num = omp_get_ancestor_thread_num(level-1); + omp_set_num_threads(num); return QV_SUCCESS; } @@ -150,7 +159,7 @@ int qvi_omp_group::gather( qvi_bbuff *txbuff, int, - bool *shared_alloc, + qvi_alloc_type_t *shared_alloc, qvi_bbuff ***rxbuffs ) { qvi_bbuff **bbuffs = nullptr; @@ -171,20 +180,21 @@ qvi_omp_group::gather( bbuffs = nullptr; } *rxbuffs = bbuffs; - *shared_alloc = true; + *shared_alloc = ALLOC_SHARED; return rc; } int qvi_omp_group::scatter( qvi_bbuff **txbuffs, - int, + int, // rootid, qvi_bbuff **rxbuff ) { qvi_bbuff ***tmp = nullptr; #pragma omp single copyprivate(tmp) tmp = new qvi_bbuff**(); #pragma omp master + //#pragma omp masked filter(rootid) *tmp = txbuffs; #pragma omp barrier qvi_bbuff *inbuff = (*tmp)[m_rank]; diff --git a/src/qvi-omp.h b/src/qvi-omp.h index 857cc10..d2df85d 100644 --- a/src/qvi-omp.h +++ b/src/qvi-omp.h @@ -22,6 +22,7 @@ #include "qvi-common.h" #include "qvi-subgroup.h" +#include "qvi-bbuff.h" #if 0 /** @@ -104,7 +105,7 @@ struct qvi_omp_group { gather( qvi_bbuff *txbuff, int root, - bool *shared, + qvi_alloc_type_t*shared, qvi_bbuff ***rxbuffs ); diff --git a/src/qvi-process.cc b/src/qvi-process.cc index 60eab12..928cd8c 100644 --- a/src/qvi-process.cc +++ b/src/qvi-process.cc @@ -63,7 +63,7 @@ qvi_process_group_gather_bbuffs( qvi_process_group_t *group, qvi_bbuff *txbuff, int root, - bool *shared, + qvi_alloc_type_t *shared, qvi_bbuff ***rxbuffs ) { const int group_size = qvi_process_group_size(group); @@ -84,7 +84,7 @@ qvi_process_group_gather_bbuffs( bbuffs = nullptr; } *rxbuffs = bbuffs; - *shared = false; + *shared = ALLOC_PRIVATE; return rc; } diff --git a/src/qvi-process.h b/src/qvi-process.h index b0f091b..d72af8c 100644 --- a/src/qvi-process.h +++ b/src/qvi-process.h @@ -15,6 +15,7 @@ #define QVI_PROCESS_H #include "qvi-common.h" +#include "qvi-bbuff.h" // Forward declarations. struct qvi_process_group_s; @@ -68,7 +69,7 @@ qvi_process_group_gather_bbuffs( qvi_process_group_t *group, qvi_bbuff *txbuff, int root, - bool *shared, + qvi_alloc_type_t *shared, qvi_bbuff ***rxbuffs ); diff --git a/src/qvi-pthread.cc b/src/qvi-pthread.cc index 1a63474..f71da8c 100644 --- a/src/qvi-pthread.cc +++ b/src/qvi-pthread.cc @@ -12,19 +12,37 @@ */ #include "qvi-pthread.h" +#include "qvi-bbuff.h" #include "qvi-task.h" // IWYU pragma: keep #include "qvi-utils.h" -qvi_pthread_group_s::qvi_pthread_group_s( - int group_size +qvi_pthread_group::qvi_pthread_group( + int group_size, + int //rank_in_group //unused for now ) : m_size(group_size) { const int rc = pthread_barrier_init(&m_barrier, NULL, group_size); if (qvi_unlikely(rc != 0)) throw qvi_runtime_error(); + + //C++ flavor + /* + m_data_g.resize(m_size); + for (auto &p : m_data_g) { + qvi_new(&p); + } + */ + + m_data_g = new qvi_bbuff *[m_size](); + for(int i = 0 ; i < group_size ; i++){ + const int rc = qvi_bbuff_new(&m_data_g[i]); + if (qvi_unlikely(rc != 0)) throw qvi_runtime_error(); + } + m_data_s = new qvi_bbuff**(); + m_ckrs = new qvi_subgroup_color_key_rank[m_size](); } void * -qvi_pthread_group_s::call_first_from_pthread_create( +qvi_pthread_group::call_first_from_pthread_create( void *arg ) { const pid_t mytid = qvi_gettid(); @@ -74,38 +92,55 @@ qvi_pthread_group_s::call_first_from_pthread_create( return thread_routine(th_routine_argp); } -qvi_pthread_group_s::~qvi_pthread_group_s(void) +qvi_pthread_group::~qvi_pthread_group(void) { std::lock_guard guard(m_mutex); for (auto &tt : m_tid2task) { qvi_delete(&tt.second); } pthread_barrier_destroy(&m_barrier); + + if (m_data_g) { + for (int i = 0; i < m_size; ++i) { + qvi_bbuff_delete(&m_data_g[i]); + } + delete[] m_data_g; + } + + //C++ flavor + /* + for (auto &tt : m_data) { + qvi_delete(&tt); + } + */ + + delete m_data_s; + delete[] m_ckrs; } int -qvi_pthread_group_s::size(void) +qvi_pthread_group::size(void) { std::lock_guard guard(m_mutex); return m_size; } int -qvi_pthread_group_s::rank(void) +qvi_pthread_group::rank(void) { std::lock_guard guard(m_mutex); return m_tid2rank.at(qvi_gettid()); } qvi_task * -qvi_pthread_group_s::task(void) +qvi_pthread_group::task(void) { std::lock_guard guard(m_mutex); return m_tid2task.at(qvi_gettid()); } int -qvi_pthread_group_s::barrier(void) +qvi_pthread_group::barrier(void) { const int rc = pthread_barrier_wait(&(m_barrier)); if (qvi_unlikely((rc != 0) && (rc != PTHREAD_BARRIER_SERIAL_THREAD))) { @@ -115,34 +150,125 @@ qvi_pthread_group_s::barrier(void) } int -qvi_pthread_group_s::split( - int, - int, - qvi_pthread_group_s ** +qvi_pthread_group::m_subgroup_info( + int color, + int key, + qvi_subgroup_info *sginfo ) { - // TODO(skg) - return QV_ERR_NOT_SUPPORTED; + int rank = qvi_pthread_group::rank(); + int master_rank = 0; // Choosing 0 as master. + // Gather colors and keys from ALL threads. + m_ckrs[rank].color = color; + m_ckrs[rank].key = key; + m_ckrs[rank].rank = rank; + // Barrier to be sure that all threads have contributed their values. + pthread_barrier_wait(&m_barrier); + // Since these data are shared, only the master thread has to sort them. + // The same goes for calculating the number of distinct colors provided. + if(rank == master_rank){ + // Sort the color/key/rank array. First according to color, then by key, + // but in the same color realm. If color and key are identical, sort by + // the rank from given group. + std::sort(m_ckrs, m_ckrs + m_size, qvi_subgroup_color_key_rank::by_color); + std::sort(m_ckrs, m_ckrs + m_size, qvi_subgroup_color_key_rank::by_key); + std::sort(m_ckrs, m_ckrs + m_size, qvi_subgroup_color_key_rank::by_rank); + // Calculate the number of distinct colors provided. + std::set color_set; + for (int i = 0; i < m_size; ++i) { + color_set.insert(m_ckrs[i].color); + } + m_ckrs[rank].ncolors = color_set.size(); + } + //All threads wait for the number of colors to be computed. + pthread_barrier_wait(&m_barrier); + // The number of distinct colors is the number of subgroups. + sginfo->ngroups = m_ckrs[master_rank].ncolors; + // Compute my sub-group size and sub-group rank. + int group_rank = 0; + int group_size = 0; + for (int i = 0; i < m_size; ++i) { + if (color != m_ckrs[i].color) continue; + // Else we found the start of my color group. + const int current_color = m_ckrs[i].color; + for (int j = i; j < m_size && current_color == m_ckrs[j].color; ++j) { + if (m_ckrs[j].rank == rank) { + sginfo->rank = group_rank; + } + group_size++; + group_rank++; + } + sginfo->size = group_size; + break; + } + return QV_SUCCESS; } int -qvi_pthread_group_s::gather( - qvi_bbuff *, - int, - bool *, - qvi_bbuff *** +qvi_pthread_group::split( + int color, + int key, + qvi_pthread_group_t **child ) { - // TODO(skg) - return QV_ERR_NOT_SUPPORTED; + qvi_pthread_group_t *ichild = nullptr; + + qvi_subgroup_info sginfo; + int rc = m_subgroup_info(color, key, &sginfo); + if (qvi_likely(rc == QV_SUCCESS)) { + rc = qvi_new(&ichild, sginfo.size, sginfo.rank); + } + if (rc != QV_SUCCESS) { + qvi_delete(&ichild); + } + *child = ichild; + return rc; } int -qvi_pthread_group_s::scatter( - qvi_bbuff **, +qvi_pthread_group::gather( + qvi_bbuff *txbuff, int, - qvi_bbuff ** + qvi_alloc_type_t *shared_alloc, + qvi_bbuff ***rxbuffs +) { + int rank = qvi_pthread_group::rank(); + + const int rc = qvi_bbuff_copy(*txbuff, m_data_g[rank]); + // Need to ensure that all threads have contributed to m_data_g + pthread_barrier_wait(&m_barrier); + *shared_alloc = ALLOC_SHARED_GLOBAL; + + if (qvi_unlikely(rc != QV_SUCCESS)) { + *rxbuffs = nullptr; + return QV_ERR_INTERNAL; + } + + *rxbuffs = m_data_g; + return rc; +} + +int +qvi_pthread_group::scatter( + qvi_bbuff **txbuffs, + int rootid, + qvi_bbuff **rxbuff ) { - // TODO(skg) - return QV_ERR_NOT_SUPPORTED; + int rank = qvi_pthread_group::rank(); + + if(rootid == rank){ + *m_data_s = txbuffs; + } + pthread_barrier_wait(&m_barrier); + + qvi_bbuff *mybbuff = nullptr; + const int rc = qvi_bbuff_dup( *((*m_data_s)[rank]), &mybbuff); + pthread_barrier_wait(&m_barrier); + + if (qvi_unlikely(rc != QV_SUCCESS)) { + qvi_bbuff_delete(&mybbuff); + return QV_ERR_INTERNAL; + } + *rxbuff = mybbuff; + return rc; } /* diff --git a/src/qvi-pthread.h b/src/qvi-pthread.h index 06b9b77..ed1d39a 100644 --- a/src/qvi-pthread.h +++ b/src/qvi-pthread.h @@ -15,11 +15,13 @@ #define QVI_PTHREAD_H #include "qvi-common.h" +#include "qvi-subgroup.h" +#include "qvi-bbuff.h" typedef void *(*qvi_pthread_routine_fun_ptr_t)(void *); -struct qvi_pthread_group_s; -typedef struct qvi_pthread_group_s qvi_pthread_group_t; +struct qvi_pthread_group; +typedef struct qvi_pthread_group qvi_pthread_group_t; struct qvi_pthread_group_pthread_create_args_s { /** Thread group. */ @@ -40,7 +42,7 @@ struct qvi_pthread_group_pthread_create_args_s { , throutine_argp(throutine_argp_a) { } }; -struct qvi_pthread_group_s { +struct qvi_pthread_group { private: /** Group size. */ int m_size = 0; @@ -52,19 +54,36 @@ struct qvi_pthread_group_s { std::map m_tid2task; /** Used for mutexy things. */ std::mutex m_mutex; + /** Used for monitory things */ + std::condition_variable m_condition; /** Used for barrier things. */ pthread_barrier_t m_barrier; + /** Used for gather exchanges*/ + //C++ flavor + //std::vector m_data_g; + qvi_bbuff **m_data_g = nullptr; + /** Used for scatter exchanges*/ + qvi_bbuff ***m_data_s = nullptr; + /** Used for split */ + qvi_subgroup_color_key_rank *m_ckrs = nullptr; + int + m_subgroup_info( + int color, + int key, + qvi_subgroup_info *sginfo + ); public: /** Constructor. */ - qvi_pthread_group_s(void) = delete; + qvi_pthread_group(void) = delete; /** * Constructor. This is called by the parent process to construct the * maximum amount of infrastructure possible. The rest of group construction * has to be performed after pthread_create() time. See * call_first_from_pthread_create() for more details. */ - qvi_pthread_group_s( - int group_size + qvi_pthread_group( + int group_size, + int rank_in_group ); /** * This function shall be called by pthread_create() to finish group @@ -76,7 +95,7 @@ struct qvi_pthread_group_s { void *arg ); /** Destructor. */ - ~qvi_pthread_group_s(void); + ~qvi_pthread_group(void); qvi_task * task(void); @@ -94,14 +113,14 @@ struct qvi_pthread_group_s { split( int color, int key, - qvi_pthread_group_s **child + qvi_pthread_group_t **child ); int gather( qvi_bbuff *txbuff, int root, - bool *shared, + qvi_alloc_type_t *shared, qvi_bbuff ***rxbuffs ); @@ -112,7 +131,6 @@ struct qvi_pthread_group_s { qvi_bbuff **rxbuff ); }; -typedef struct qvi_pthread_group_s qvi_pthread_group_t; #endif diff --git a/src/qvi-scope.cc b/src/qvi-scope.cc index c346232..95cf0d3 100644 --- a/src/qvi-scope.cc +++ b/src/qvi-scope.cc @@ -237,10 +237,14 @@ qv_scope::split( ); rc = chwsplit.split(&colorp, &hwpool); if (rc != QV_SUCCESS) goto out; + // Split underlying group. Notice the use of colorp here. rc = m_group->split( colorp, m_group->rank(), &group ); + + assert(rc == QV_SUCCESS); + if (rc != QV_SUCCESS) goto out; // Create and initialize the new scope. rc = qvi_new(&ichild, group, hwpool); diff --git a/src/qvi-subgroup.h b/src/qvi-subgroup.h index 83641ee..001b854 100644 --- a/src/qvi-subgroup.h +++ b/src/qvi-subgroup.h @@ -40,9 +40,10 @@ struct qvi_subgroup_info { * sub-groups based on color, key, and rank. */ struct qvi_subgroup_color_key_rank { - int color = 0; - int key = 0; - int rank = 0; + int color = -1; + int key = -1; + int rank = -1; + int ncolors = 0; static bool by_color( diff --git a/src/qvi-utils.h b/src/qvi-utils.h index 6c257ca..e5e544d 100644 --- a/src/qvi-utils.h +++ b/src/qvi-utils.h @@ -95,6 +95,22 @@ qvi_dup( qvi_catch_and_return(); } +/** + * Simple wrapper that copies the provided instance. + */ +template +int +qvi_copy( + const T &t, + T *dup +) { + try { + *dup = t; + return QV_SUCCESS; + } + qvi_catch_and_return(); +} + /** * */ diff --git a/tests/test-pthread-split.c b/tests/test-pthread-split.c index 9e9be79..58b971b 100644 --- a/tests/test-pthread-split.c +++ b/tests/test-pthread-split.c @@ -33,6 +33,22 @@ thread_work( fprintf(stdout,"[%d] Thread running on %s\n", tid, binds); free(binds); + qv_scope_t *out_scope = NULL; + int rank = -1; + rc = qv_scope_group_rank(thargs->scope, &rank); + if (rc != QV_SUCCESS) { + ers = "qv_scope_group_rank failed"; + qvi_test_panic("%s (rc=%s)", ers, qv_strerr(rc)); + } + + fprintf(stdout,"=== [%d] Thread %i splitting in two pieces\n", tid, rank); + + rc = qv_scope_split(thargs->scope, 2, rank, &out_scope); + if (rc != QV_SUCCESS) { + ers = "qv_scope_split failed"; + qvi_test_panic("%s (rc=%s)", ers, qv_strerr(rc)); + } + return NULL; } @@ -81,7 +97,7 @@ main(void) } //test qv_pthread_scope_split - int npieces = ncores / 2; + int npieces = 2; //ncores / 2; int nthreads = ncores; fprintf(stdout,"[%d] ====== Testing thread_scope_split (number of threads : %i)\n", tid, nthreads); @@ -134,6 +150,16 @@ main(void) qvi_test_panic("%s (rc=%s)", ers, qv_strerr(rc)); } + + rc = qv_scope_free(mpi_scope); + if (rc != QV_SUCCESS) { + ers = "qv_scope_free() failed"; + qvi_test_panic("%s (rc=%s)", ers, qv_strerr(rc)); + } + + //MPI_Finalize(); + //exit(EXIT_SUCCESS); + //Test qv_pthread_scope_split_at nthreads = 2 * ncores;