From e75c9496164ee2ed2d6d6fd861c4e0e1e05078c3 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Tue, 16 Jan 2024 15:02:34 -0800 Subject: [PATCH 01/14] API: extend memory types and memory map --- src/ucc/api/ucc.h | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/ucc/api/ucc.h b/src/ucc/api/ucc.h index 9f8ee5f145..68b76a32eb 100644 --- a/src/ucc/api/ucc.h +++ b/src/ucc/api/ucc.h @@ -166,6 +166,7 @@ typedef enum ucc_memory_type { UCC_MEMORY_TYPE_CUDA_MANAGED, /*!< NVIDIA CUDA managed memory */ UCC_MEMORY_TYPE_ROCM, /*!< AMD ROCM memory */ UCC_MEMORY_TYPE_ROCM_MANAGED, /*!< AMD ROCM managed system memory */ + UCC_MEMORY_TYPE_DPU, /*!< DPU memory */ UCC_MEMORY_TYPE_LAST, UCC_MEMORY_TYPE_UNKNOWN = UCC_MEMORY_TYPE_LAST } ucc_memory_type_t; @@ -895,8 +896,13 @@ typedef ucc_oob_coll_t ucc_team_oob_coll_t; * @ingroup UCC_CONTEXT_DT */ typedef struct ucc_mem_map { - void * address; /*!< the address of a buffer to be attached to a UCC context */ - size_t len; /*!< the length of the buffer */ + void * address; /*!< the address of a buffer to be attached to + a UCC context */ + size_t len; /*!< the length of the buffer */ + ucc_memory_type_t mem_type; /*!< the memory type */ + void * resource; /*!< resource associated with the address. + examples of resources include memory + keys. */ } ucc_mem_map_t; /** @@ -1703,12 +1709,17 @@ typedef enum { Note, the status is not guaranteed to be global on all the processes participating in the collective.*/ - UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS = UCC_BIT(7) /*!< If set, both src + UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS = UCC_BIT(7), /*!< If set, both src and dst buffers reside in a memory mapped region. Useful for one-sided collectives. */ + UCC_COLL_ARGS_FLAG_MEM_MAP = UCC_BIT(8) /*!< If set, map both + src and dst buffers. + Useful for one-sided + collectives in message + passing programming models */ } ucc_coll_args_flags_t; /** @@ -1798,7 +1809,8 @@ enum ucc_coll_args_field { UCC_COLL_ARGS_FIELD_TAG = UCC_BIT(1), UCC_COLL_ARGS_FIELD_CB = UCC_BIT(2), UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER = UCC_BIT(3), - UCC_COLL_ARGS_FIELD_ACTIVE_SET = UCC_BIT(4) + UCC_COLL_ARGS_FIELD_ACTIVE_SET = UCC_BIT(4), + UCC_COLL_ARGS_FIELD_MEM_MAP = UCC_BIT(5) }; /** @@ -1868,6 +1880,11 @@ typedef struct ucc_coll_args { to 0. */ ucc_coll_callback_t cb; double timeout; /*!< Timeout in seconds */ + ucc_mem_map_params_t mem_map; /*!< Memory regions to be used + in during the collective + operation for one-sided + collectives. Not necessary + for two-sided collectives */ struct { uint64_t start; int64_t stride; From c9ebd0453f19f7fda4e1784382c113c36d4a29af Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Thu, 25 Jan 2024 14:26:57 -0800 Subject: [PATCH 02/14] TL/UCP: complete memory map extensions --- .../tl/ucp/alltoall/alltoall_onesided.c | 15 +- .../tl/ucp/alltoallv/alltoallv_onesided.c | 4 +- src/components/tl/ucp/tl_ucp.h | 78 ++++- src/components/tl/ucp/tl_ucp_coll.c | 293 ++++++++++++++++++ src/components/tl/ucp/tl_ucp_coll.h | 4 + src/components/tl/ucp/tl_ucp_context.c | 84 +++-- src/components/tl/ucp/tl_ucp_sendrecv.h | 168 ++++++---- src/components/tl/ucp/tl_ucp_team.c | 8 - src/ucc/api/ucc.h | 24 +- 9 files changed, 574 insertions(+), 104 deletions(-) diff --git a/src/components/tl/ucp/alltoall/alltoall_onesided.c b/src/components/tl/ucp/alltoall/alltoall_onesided.c index 856b392534..f84f98ca5e 100644 --- a/src/components/tl/ucp/alltoall/alltoall_onesided.c +++ b/src/components/tl/ucp/alltoall/alltoall_onesided.c @@ -9,6 +9,7 @@ #include "alltoall.h" #include "core/ucc_progress_queue.h" #include "utils/ucc_math.h" +#include "tl_ucp_coll.h" #include "tl_ucp_sendrecv.h" void ucc_tl_ucp_alltoall_onesided_progress(ucc_coll_task_t *ctask); @@ -23,24 +24,28 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_start(ucc_coll_task_t *ctask) ucc_rank_t grank = UCC_TL_TEAM_RANK(team); ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team); ucc_rank_t start = (grank + 1) % gsize; - long * pSync = TASK_ARGS(task).global_work_buffer; + long *pSync = TASK_ARGS(task).global_work_buffer; + ucc_memory_type_t mtype = TASK_ARGS(task).src.info.mem_type; ucc_rank_t peer; ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); + ucc_tl_ucp_coll_dynamic_segments(&TASK_ARGS(task), task); + /* TODO: change when support for library-based work buffers is complete */ nelems = (nelems / gsize) * ucc_dt_size(TASK_ARGS(task).src.info.datatype); dest = dest + grank * nelems; UCPCHECK_GOTO(ucc_tl_ucp_put_nb((void *)(src + start * nelems), - (void *)dest, nelems, start, team, task), + (void *)dest, nelems, start, mtype, team, + task), task, out); UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, start, team), task, out); for (peer = (start + 1) % gsize; peer != start; peer = (peer + 1) % gsize) { UCPCHECK_GOTO(ucc_tl_ucp_put_nb((void *)(src + peer * nelems), - (void *)dest, nelems, peer, team, task), + (void *)dest, nelems, peer, mtype, team, + task), task, out); - UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, - out); + UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, out); } return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); diff --git a/src/components/tl/ucp/alltoallv/alltoallv_onesided.c b/src/components/tl/ucp/alltoallv/alltoallv_onesided.c index bb6fa14b3e..5c446a6941 100644 --- a/src/components/tl/ucp/alltoallv/alltoallv_onesided.c +++ b/src/components/tl/ucp/alltoallv/alltoallv_onesided.c @@ -24,10 +24,12 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask) ucc_aint_t *d_disp = TASK_ARGS(task).dst.info_v.displacements; size_t sdt_size = ucc_dt_size(TASK_ARGS(task).src.info_v.datatype); size_t rdt_size = ucc_dt_size(TASK_ARGS(task).dst.info_v.datatype); + ucc_memory_type_t mtype = TASK_ARGS(task).src.info_v.mem_type; ucc_rank_t peer; size_t sd_disp, dd_disp, data_size; ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); + ucc_tl_ucp_coll_dynamic_segments(&TASK_ARGS(task), task); /* perform a put to each member peer using the peer's index in the * destination displacement. */ @@ -46,7 +48,7 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask) UCPCHECK_GOTO(ucc_tl_ucp_put_nb(PTR_OFFSET(src, sd_disp), PTR_OFFSET(dest, dd_disp), - data_size, peer, team, task), + data_size, peer, mtype, team, task), task, out); UCPCHECK_GOTO(ucc_tl_ucp_atomic_inc(pSync, peer, team), task, out); } diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index eac2303443..74b230edbe 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -96,10 +96,33 @@ typedef struct ucc_tl_ucp_lib { UCC_CLASS_DECLARE(ucc_tl_ucp_lib_t, const ucc_base_lib_params_t *, const ucc_base_config_t *); +/* dynamic segments stored in a flat buffer. An example with 4 segments on + * two PEs, with segments stored two at a time (collective with src/dst pair): + +rva/key => (rva, len, key size, key) tuple + ++-----------------------------+-----------------------------+ +| seg group 0 (seg 0 + seg 1) | seg group 1 (seg 2 + seg 3) | ++--------------+--------------+--------------+--------------+ +| rva/key pe 0 | rva/key pe 1 | rva/key pe 0 | rva/key pe 1 | ++--------------+--------------+--------------+--------------+ +*/ +typedef struct ucc_tl_ucp_dynamic_seg { + void *dyn_buff; /* flat buffer with rva, keys, etc. */ + size_t buff_size; + size_t *seg_groups; /* segment to segment group mapping */ + size_t *seg_group_start; /* offset of dyn_buff to start of seg group */ + size_t *seg_group_size; /* storage size of a seg group */ + size_t *starting_seg; /* starting seg for a seg group */ + size_t *num_seg_per_group; + size_t num_groups; +} ucc_tl_ucp_dynamic_seg_t; + typedef struct ucc_tl_ucp_remote_info { void * va_base; size_t len; void * mem_h; + void * packed_memh; void * packed_key; size_t packed_key_len; } ucc_tl_ucp_remote_info_t; @@ -120,9 +143,12 @@ typedef struct ucc_tl_ucp_context { ucc_tl_ucp_worker_t service_worker; uint32_t service_worker_throttling_count; ucc_mpool_t req_mp; - ucc_tl_ucp_remote_info_t * remote_info; + ucc_tl_ucp_remote_info_t *remote_info; + ucc_tl_ucp_remote_info_t *dynamic_remote_info; + ucc_tl_ucp_dynamic_seg_t dyn_seg; ucp_rkey_h * rkeys; uint64_t n_rinfo_segs; + uint64_t n_dynrinfo_segs; uint64_t ucp_memory_types; int topo_required; } ucc_tl_ucp_context_t; @@ -135,8 +161,6 @@ typedef struct ucc_tl_ucp_team { ucc_status_t status; uint32_t seq_num; ucc_tl_ucp_task_t *preconnect_task; - void * va_base[MAX_NR_SEGMENTS]; - size_t base_length[MAX_NR_SEGMENTS]; ucc_tl_ucp_worker_t * worker; ucc_tl_ucp_team_config_t cfg; const char * tuning_str; @@ -190,6 +214,13 @@ extern ucc_config_field_t ucc_tl_ucp_lib_config_table[]; #define UCC_TL_UCP_REMOTE_RKEY(_ctx, _rank, _seg) \ ((_ctx)->rkeys[_rank * _ctx->n_rinfo_segs + _seg]) +#define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _size, _seg) \ + ((_ctx)->rkeys[_size * _ctx->n_rinfo_segs + _rank * _ctx->n_dynrinfo_segs + _seg]) + +/* +#define UCC_TL_UCP_REMOTE_DYN_RVA(_ctx, _rank, _seg) \ + (PTR_OFFSET((_ctx)->dyn_seg.dyn_buff, (_ctx)->dyn_seg.seg_group_start[_seg] + (_ctx)->dyn_seg.seg_groups[_seg] * _rank)) +*/ extern ucs_memory_type_t ucc_memtype_to_ucs[UCC_MEMORY_TYPE_LAST+1]; void ucc_tl_ucp_pre_register_mem(ucc_tl_ucp_team_t *team, void *addr, @@ -198,4 +229,45 @@ void ucc_tl_ucp_pre_register_mem(ucc_tl_ucp_team_t *team, void *addr, ucc_status_t ucc_tl_ucp_ctx_remote_populate(ucc_tl_ucp_context_t *ctx, ucc_mem_map_params_t map, ucc_team_oob_coll_t oob); + +// FIXME convert to macro +static inline uint64_t UCC_TL_UCP_REMOTE_DYN_RVA(ucc_tl_ucp_context_t *ctx, + ucc_rank_t rank, + uint64_t seg) +{ + int seg_group_id = ctx->dyn_seg.seg_groups[seg]; + uint64_t *prva = PTR_OFFSET(ctx->dyn_seg.dyn_buff, ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t)); + return *prva;//[seg - ctx->dyn_seg.starting_seg[seg]]; +} + +// FIXME convert to macro +static inline uint64_t UCC_TL_UCP_REMOTE_DYN_LEN(ucc_tl_ucp_context_t *ctx, + ucc_rank_t rank, + uint64_t seg) +{ + int seg_group_id = ctx->dyn_seg.seg_groups[seg]; + uint64_t *plen = PTR_OFFSET(ctx->dyn_seg.dyn_buff, sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t)); + return *plen;//[seg - ctx->dyn_seg.starting_seg[seg]]; +} + +// FIXME convert to macro +static inline uint64_t UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(ucc_tl_ucp_context_t *ctx, + ucc_rank_t rank, + uint64_t seg) +{ + int seg_group_id = ctx->dyn_seg.seg_groups[seg]; + uint64_t *pkey_size = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 2 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t)); + return *pkey_size;//[seg - ctx->dyn_seg.starting_seg[seg]]; +} + +// FIXME convert to macro +static inline void * UCC_TL_UCP_REMOTE_DYN_KEY(ucc_tl_ucp_context_t *ctx, + ucc_rank_t rank, + size_t offset, + uint64_t seg) +{ + int seg_group_id = ctx->dyn_seg.seg_groups[seg]; + void *pkey = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 3 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + offset); + return pkey; +} #endif diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 3b4859b48f..12ae2d217a 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -155,6 +155,299 @@ ucc_status_t ucc_tl_ucp_coll_finalize(ucc_coll_task_t *coll_task) return UCC_OK; } +static void ucc_tl_ucp_pack_data(ucc_tl_ucp_context_t *ctx, int starting_index, + void *pack) +{ + uint64_t nsegs = ctx->n_dynrinfo_segs - starting_index; + uint64_t offset = 0; + size_t section_offset = sizeof(uint64_t) * nsegs; + void *keys; + uint64_t *rvas; + uint64_t *lens; + uint64_t *key_sizes; + int i; + + /* pack into one data object in following order: */ + /* rva, len, pack sizes, packed keys */ + rvas = pack; + lens = PTR_OFFSET(pack, section_offset); + key_sizes = PTR_OFFSET(pack, (section_offset * 2)); + keys = PTR_OFFSET(pack, (section_offset * 3)); + + for (i = 0; i < nsegs; i++) { + int index = i + starting_index; + rvas[i] = (uint64_t)ctx->dynamic_remote_info[index].va_base; + lens[i] = ctx->dynamic_remote_info[index].len; + key_sizes[i] = ctx->dynamic_remote_info[index].packed_key_len; + memcpy(PTR_OFFSET(keys, offset), + ctx->dynamic_remote_info[index].packed_key, + ctx->dynamic_remote_info[index].packed_key_len); + offset += ctx->dynamic_remote_info[index].packed_key_len; + } +} + +ucc_status_t ucc_tl_ucp_memmap_append_segment(ucc_tl_ucp_task_t *task, + ucc_mem_map_t *map, int segid) +{ + ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); + ucc_tl_ucp_context_t *tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + ucs_status_t ucs_status; + ucp_mem_map_params_t mmap_params; + ucp_mem_h mh; + + // map the memory + if (map->resource != NULL) { + mmap_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_EXPORTED_MEMH_BUFFER; + mmap_params.exported_memh_buffer = map->resource; + + ucs_status = ucp_mem_map(tl_ctx->worker.ucp_context, &mmap_params, &mh); + if (ucs_status == UCS_ERR_UNREACHABLE) { + tl_error(tl_ctx->super.super.lib, "exported memh is unsupported"); + return ucs_status_to_ucc_status(ucs_status); + } else if (ucs_status < UCS_OK) { + tl_error(tl_ctx->super.super.lib, + "ucp_mem_map failed with error code: %d", ucs_status); + return ucs_status_to_ucc_status(ucs_status); + } + /* generate rkeys / packed keys */ + + tl_ctx->dynamic_remote_info[segid].va_base = map->address; + tl_ctx->dynamic_remote_info[segid].len = map->len; + tl_ctx->dynamic_remote_info[segid].mem_h = mh; + tl_ctx->dynamic_remote_info[segid].packed_memh = map->resource; + ucs_status = + ucp_rkey_pack(tl_ctx->worker.ucp_context, mh, + &tl_ctx->dynamic_remote_info[segid].packed_key, + &tl_ctx->dynamic_remote_info[segid].packed_key_len); + if (UCS_OK != ucs_status) { + tl_error(tl_ctx->super.super.lib, + "failed to pack UCP key with error code: %d", ucs_status); + return ucs_status_to_ucc_status(ucs_status); + } + } else { + mmap_params.field_mask = + UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH; + mmap_params.address = map->address; + mmap_params.length = map->len; + + ucs_status = ucp_mem_map(tl_ctx->worker.ucp_context, &mmap_params, &mh); + if (ucs_status != UCS_OK) { + tl_error(UCC_TASK_LIB(task), "failure in ucp_mem_map %s", + ucs_status_string(ucs_status)); + return ucs_status_to_ucc_status(ucs_status); + } + tl_ctx->dynamic_remote_info[segid].va_base = map->address; + tl_ctx->dynamic_remote_info[segid].len = map->len; + tl_ctx->dynamic_remote_info[segid].mem_h = mh; + tl_ctx->dynamic_remote_info[segid].packed_memh = NULL; + ucs_status = + ucp_rkey_pack(tl_ctx->worker.ucp_context, mh, + &tl_ctx->dynamic_remote_info[segid].packed_key, + &tl_ctx->dynamic_remote_info[segid].packed_key_len); + if (UCS_OK != ucs_status) { + tl_error(tl_ctx->super.super.lib, + "failed to pack UCP key with error code: %d", ucs_status); + return ucs_status_to_ucc_status(ucs_status); + } + } + return UCC_OK; +} + +ucc_status_t ucc_tl_ucp_coll_dynamic_segments(ucc_coll_args_t *coll_args, + ucc_tl_ucp_task_t *task) +{ + ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); + ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + int i = 0; + ucc_status_t status; + + if (coll_args->mem_map.n_segments > 0) { + int starting_index = ctx->n_dynrinfo_segs; + size_t seg_pack_size = 0; + size_t *global_size = NULL; + size_t team_size = UCC_TL_TEAM_SIZE(tl_team); + ucc_team_t *core_team = UCC_TL_CORE_TEAM(UCC_TL_UCP_TASK_TEAM(task)); + ucc_subset_t subset = {.map = tl_team->ctx_map, + .myrank = core_team->rank}; + ucc_service_coll_req_t *scoll_req; + void *ex_buffer; + ptrdiff_t old_offset; + + /* increase dynamic remote info size */ + ctx->dynamic_remote_info = ucc_realloc( + ctx->dynamic_remote_info, + sizeof(ucc_tl_ucp_remote_info_t) * + (ctx->n_dynrinfo_segs + coll_args->mem_map.n_segments), + "dyn remote info"); + if (!ctx->dynamic_remote_info) { + tl_error(UCC_TASK_LIB(task), "Out of Memory"); + return UCC_ERR_NO_MEMORY; + } + + for (i = 0; i < coll_args->mem_map.n_segments; i++) { + /* map the buffer and populate the dynamic_remote_info segments */ + status = ucc_tl_ucp_memmap_append_segment( + task, &coll_args->mem_map.segments[i], starting_index + i); + if (status != UCC_OK) { + tl_error(UCC_TASK_LIB(task), "failed to memory map a segment"); + goto failed_memory_map; + } + seg_pack_size += + sizeof(uint64_t) * 3 + + ctx->dynamic_remote_info[starting_index + i].packed_key_len; + } + + global_size = ucc_calloc(core_team->size, sizeof(size_t)); + if (!global_size) { + tl_error(UCC_TASK_LIB(task), "Out of Memory"); + goto failed_memory_map; + } + + /* allgather on the new segments size */ + status = ucc_service_allgather(core_team, &seg_pack_size, global_size, + sizeof(uint64_t), subset, &scoll_req); + if (status < UCC_OK) { + tl_error(UCC_TASK_LIB(task), "failed to perform a service allgather"); + ucc_free(global_size); + goto failed_memory_map; + } + while (UCC_INPROGRESS == (status = ucc_service_coll_test(scoll_req))) { + } + if (status < UCC_OK) { + tl_error(UCC_TASK_LIB(task), "failed on the allgather"); + ucc_service_coll_finalize(scoll_req); + ucc_free(global_size); + goto failed_memory_map; + } + ucc_service_coll_finalize(scoll_req); + for (i = 0; i < core_team->size; i++) { + if (global_size[i] > seg_pack_size) { + seg_pack_size = global_size[i]; + } + } + ucc_free(global_size); + + /* pack the dynamic_remote_info segments */ + ctx->n_dynrinfo_segs += coll_args->mem_map.n_segments; + ex_buffer = ucc_malloc(seg_pack_size, "ex pack size"); + if (!ex_buffer) { + tl_error(UCC_TASK_LIB(task), "Out of Memory"); + status = UCC_ERR_NO_MEMORY; + goto failed_memory_map; + } + ucc_tl_ucp_pack_data(ctx, starting_index, ex_buffer); + + old_offset = ctx->dyn_seg.buff_size; + ctx->dyn_seg.buff_size += seg_pack_size * core_team->size; + ctx->dyn_seg.dyn_buff = ucc_realloc(ctx->dyn_seg.dyn_buff, + ctx->dyn_seg.buff_size, "dyn buff"); + if (!ctx->dyn_seg.dyn_buff) { + status = UCC_ERR_NO_MEMORY; + tl_error(UCC_TASK_LIB(task), "Out of Memory"); + goto failed_memory_map; + } + ctx->dyn_seg.seg_groups = ucc_realloc( + ctx->dyn_seg.seg_groups, sizeof(uint64_t) * ctx->n_dynrinfo_segs, + "n_dynrinfo_segs"); + if (!ctx->dyn_seg.seg_groups) { + status = UCC_ERR_NO_MEMORY; + tl_error(UCC_TASK_LIB(task), "Out of Memory"); + goto failed_memory_map; + } + ctx->dyn_seg.seg_group_start = ucc_realloc( + ctx->dyn_seg.seg_group_start, + sizeof(uint64_t) * ctx->n_dynrinfo_segs, "n_dynrinfo_segs"); + if (!ctx->dyn_seg.seg_group_start) { + status = UCC_ERR_NO_MEMORY; + tl_error(UCC_TASK_LIB(task), "Out of Memory"); + goto failed_memory_map; + } + ctx->dyn_seg.seg_group_size = ucc_realloc( + ctx->dyn_seg.seg_group_size, + sizeof(uint64_t) * ctx->dyn_seg.num_groups + 1, "n_dynrinfo_segs"); + if (!ctx->dyn_seg.seg_group_size) { + status = UCC_ERR_NO_MEMORY; + tl_error(UCC_TASK_LIB(task), "Out of Memory"); + goto failed_memory_map; + } + + ctx->dyn_seg.starting_seg = ucc_realloc( + ctx->dyn_seg.starting_seg, sizeof(uint64_t) * ctx->n_dynrinfo_segs, + "n_dynrinfo_segs"); + if (!ctx->dyn_seg.starting_seg) { + status = UCC_ERR_NO_MEMORY; + tl_error(UCC_TASK_LIB(task), "Out of Memory"); + goto failed_memory_map; + } + ctx->dyn_seg.num_seg_per_group = ucc_realloc( + ctx->dyn_seg.num_seg_per_group, + sizeof(uint64_t) * ctx->dyn_seg.num_groups + 1, "n_dynrinfo_segs"); + if (!ctx->dyn_seg.num_seg_per_group) { + status = UCC_ERR_NO_MEMORY; + tl_error(UCC_TASK_LIB(task), "Out of Memory"); + goto failed_memory_map; + } + + ctx->dyn_seg.num_groups += 1; + ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.num_groups - 1] = + coll_args->mem_map.n_segments; + ctx->dyn_seg.seg_group_size[ctx->dyn_seg.num_groups - 1] = + seg_pack_size; + if (starting_index == 0) { + for (i = starting_index; i < ctx->n_dynrinfo_segs; i++) { + ctx->dyn_seg.seg_groups[i] = 0; + ctx->dyn_seg.seg_group_start[i] = 0; + ctx->dyn_seg.starting_seg[i] = starting_index; + } + } else { + for (i = starting_index; i < ctx->n_dynrinfo_segs; i++) { + ctx->dyn_seg.seg_groups[i] = + ctx->dyn_seg.seg_groups[starting_index - 1] + 1; + ctx->dyn_seg.seg_group_start[i] = old_offset; + ctx->dyn_seg.starting_seg[i] = starting_index; + } + } + + /* allgather on the new segments (packed) */ + status = ucc_service_allgather( + core_team, ex_buffer, PTR_OFFSET(ctx->dyn_seg.dyn_buff, old_offset), + seg_pack_size, subset, &scoll_req); + if (status < UCC_OK) { + tl_error(UCC_TASK_LIB(task), "failed on the allgather"); + goto failed_memory_map; + } + while (UCC_INPROGRESS == (status = ucc_service_coll_test(scoll_req))) { + } + if (status < UCC_OK) { + tl_error(UCC_TASK_LIB(task), "failed on the allgather"); + ucc_service_coll_finalize(scoll_req); + goto failed_memory_map; + } + /* done with allgather */ + ucc_service_coll_finalize(scoll_req); + ctx->rkeys = ucc_realloc(ctx->rkeys, + team_size * sizeof(ucp_rkey_h) * + (ctx->n_rinfo_segs + ctx->n_dynrinfo_segs), + "rkeys"); + memset(PTR_OFFSET(ctx->rkeys, team_size * sizeof(ucp_rkey_h) * + (ctx->n_rinfo_segs + starting_index)), + 0, + team_size * sizeof(ucp_rkey_h) * coll_args->mem_map.n_segments); + ucc_free(ex_buffer); + } + return UCC_OK; +failed_memory_map: + for (i = 0; i < coll_args->mem_map.n_segments; i++) { + if (ctx->dynamic_remote_info[ctx->n_dynrinfo_segs + i].mem_h) { + ucp_mem_unmap(ctx->worker.ucp_context, ctx->dynamic_remote_info[ctx->n_dynrinfo_segs + i].mem_h); + } + if (ctx->dynamic_remote_info[ctx->n_dynrinfo_segs + i].packed_key) { + ucp_rkey_buffer_release(ctx->dynamic_remote_info[ctx->n_dynrinfo_segs + i].packed_key); + } + } + return status; +} + ucc_status_t ucc_tl_ucp_coll_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, ucc_coll_task_t **task_h) diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index 0a8a340955..69944b7420 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -490,4 +490,8 @@ ucc_tl_ucp_get_radix_from_range(ucc_tl_ucp_team_t *team, } return radix; } + +ucc_status_t ucc_tl_ucp_coll_dynamic_segments(ucc_coll_args_t *coll_args, + ucc_tl_ucp_task_t *task); + #endif diff --git a/src/components/tl/ucp/tl_ucp_context.c b/src/components/tl/ucp/tl_ucp_context.c index 6da05132ba..77efbba97f 100644 --- a/src/components/tl/ucp/tl_ucp_context.c +++ b/src/components/tl/ucp/tl_ucp_context.c @@ -163,10 +163,8 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t, ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_TAG_SENDER_MASK | UCP_PARAM_FIELD_NAME; - ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_AM; - if (params->params.mask & UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS) { - ucp_params.features |= UCP_FEATURE_RMA | UCP_FEATURE_AMO64; - } + ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_AM | UCP_FEATURE_RMA | + UCP_FEATURE_AMO64 | UCP_FEATURE_EXPORTED_MEMH; ucp_params.tag_sender_mask = UCC_TL_UCP_TAG_SENDER_MASK; ucp_params.name = "UCC_UCP_CONTEXT"; @@ -247,9 +245,12 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t, "failed to register progress function", err_thread_mode, UCC_ERR_NO_MESSAGE, self); - self->remote_info = NULL; - self->n_rinfo_segs = 0; - self->rkeys = NULL; + self->remote_info = NULL; + self->dynamic_remote_info = NULL; + self->n_rinfo_segs = 0; + self->n_dynrinfo_segs = 0; + self->rkeys = NULL; + memset(&self->dyn_seg, 0, sizeof(self->dyn_seg)); if (params->params.mask & UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS && params->params.mask & UCC_CONTEXT_PARAM_FIELD_OOB) { ucc_status = ucc_tl_ucp_ctx_remote_populate( @@ -338,6 +339,12 @@ ucc_status_t ucc_tl_ucp_rinfo_destroy(ucc_tl_ucp_context_t *ctx) ucp_rkey_destroy(UCC_TL_UCP_REMOTE_RKEY(ctx, i, j)); } } + for (j = 0; j < ctx->n_dynrinfo_segs; j++) { + if (UCC_TL_UCP_REMOTE_RKEY(ctx, i, ctx->n_rinfo_segs + j)) { + ucp_rkey_destroy( + UCC_TL_UCP_REMOTE_RKEY(ctx, i, ctx->n_rinfo_segs + j)); + } + } } for (i = 0; i < ctx->n_rinfo_segs; i++) { if (ctx->remote_info[i].mem_h) { @@ -347,10 +354,23 @@ ucc_status_t ucc_tl_ucp_rinfo_destroy(ucc_tl_ucp_context_t *ctx) ucp_rkey_buffer_release(ctx->remote_info[i].packed_key); } } + if (ctx->dynamic_remote_info) { + for (i = 0; i < ctx->n_dynrinfo_segs; i++) { + if (ctx->dynamic_remote_info[i].mem_h) { + ucp_mem_unmap(ctx->worker.ucp_context, + ctx->dynamic_remote_info[i].mem_h); + } + if (ctx->dynamic_remote_info[i].packed_key) { + ucp_rkey_buffer_release(ctx->dynamic_remote_info[i].packed_key); + } + } + ucc_free(ctx->dynamic_remote_info); + } ucc_free(ctx->remote_info); ucc_free(ctx->rkeys); - ctx->remote_info = NULL; - ctx->rkeys = NULL; + ctx->remote_info = NULL; + ctx->rkeys = NULL; + ctx->dynamic_remote_info = NULL; return UCC_OK; } @@ -477,22 +497,42 @@ ucc_status_t ucc_tl_ucp_ctx_remote_populate(ucc_tl_ucp_context_t * ctx, } for (i = 0; i < nsegs; i++) { - mmap_params.field_mask = - UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH; - mmap_params.address = map.segments[i].address; - mmap_params.length = map.segments[i].len; - - status = ucp_mem_map(ctx->worker.ucp_context, &mmap_params, &mh); - if (UCS_OK != status) { - tl_error(ctx->super.super.lib, - "ucp_mem_map failed with error code: %d", status); - ucc_status = ucs_status_to_ucc_status(status); - goto fail_mem_map; + if (map.segments[i].resource == NULL) { + mmap_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | + UCP_MEM_MAP_PARAM_FIELD_LENGTH; + mmap_params.address = map.segments[i].address; + mmap_params.length = map.segments[i].len; + + status = ucp_mem_map(ctx->worker.ucp_context, &mmap_params, &mh); + if (UCS_OK != status) { + tl_error(ctx->super.super.lib, + "ucp_mem_map failed with error code: %d", status); + ucc_status = ucs_status_to_ucc_status(status); + goto fail_mem_map; + } + ctx->remote_info[i].packed_memh = NULL; + } else { + mmap_params.field_mask = + UCP_MEM_MAP_PARAM_FIELD_EXPORTED_MEMH_BUFFER; + mmap_params.exported_memh_buffer = map.segments[i].resource; + + status = ucp_mem_map(ctx->worker.ucp_context, &mmap_params, &mh); + if (status == UCS_ERR_UNREACHABLE) { + tl_error(ctx->super.super.lib, "exported memh is unsupported"); + ucc_status = ucs_status_to_ucc_status(status); + goto fail_mem_map; + } else if (status < UCS_OK) { + tl_error(ctx->super.super.lib, + "ucp_mem_map failed with error code: %d", status); + ucc_status = ucs_status_to_ucc_status(status); + goto fail_mem_map; + } + ctx->remote_info[i].packed_memh = map.segments[i].resource; } ctx->remote_info[i].mem_h = (void *)mh; status = ucp_rkey_pack(ctx->worker.ucp_context, mh, - &ctx->remote_info[i].packed_key, - &ctx->remote_info[i].packed_key_len); + &ctx->remote_info[i].packed_key, + &ctx->remote_info[i].packed_key_len); if (UCS_OK != status) { tl_error(ctx->super.super.lib, "failed to pack UCP key with error code: %d", status); diff --git a/src/components/tl/ucp/tl_ucp_sendrecv.h b/src/components/tl/ucp/tl_ucp_sendrecv.h index ab815bad71..70cdafa689 100644 --- a/src/components/tl/ucp/tl_ucp_sendrecv.h +++ b/src/components/tl/ucp/tl_ucp_sendrecv.h @@ -226,9 +226,9 @@ static inline ucc_status_t ucc_tl_ucp_send_nz(void *buffer, size_t msglen, } static inline ucc_status_t -ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, ucp_ep_h *ep, - ucc_rank_t peer, uint64_t *rva, ucp_rkey_h *rkey, - int *segment) +ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, + ucp_ep_h *ep, ucc_rank_t peer, uint64_t *rva, + ucp_rkey_h *rkey, void **packed_memh, int *segment) { ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); ptrdiff_t key_offset = 0; @@ -239,6 +239,7 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, ucp_ep_h *ep, void *keys; void *offset; ptrdiff_t base_offset; + int i; *segment = -1; core_rank = ucc_ep_map_eval(UCC_TL_TEAM_MAP(team), peer); @@ -252,33 +253,76 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, ucp_ep_h *ep, rvas = (uint64_t *)base_offset; key_sizes = PTR_OFFSET(base_offset, (section_offset * 2)); keys = PTR_OFFSET(base_offset, (section_offset * 3)); - - for (int i = 0; i < ctx->n_rinfo_segs; i++) { - uint64_t base = (uint64_t)team->va_base[i]; - uint64_t end = base + team->base_length[i]; - if ((uint64_t)va >= base && - (uint64_t)va < end) { + for (i = 0; i < ctx->n_rinfo_segs; i++) { + uint64_t base = (uint64_t)ctx->remote_info[i].va_base; + uint64_t end = base + ctx->remote_info[i].len; + if ((uint64_t)va >= base && (uint64_t)va < end) { *segment = i; - break; + *rva = rvas[i] + ((uint64_t)va - (uint64_t)base); + if (ucc_unlikely(NULL == + UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment))) { + ucs_status_t ucs_status = ucp_ep_rkey_unpack( + *ep, PTR_OFFSET(keys, key_offset), + &UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment)); + if (UCS_OK != ucs_status) { + return ucs_status_to_ucc_status(ucs_status); + } + } + *rkey = UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment); + *packed_memh = (ctx->remote_info[i].packed_memh) + ? ctx->remote_info[i].mem_h + : NULL; + return UCC_OK; } key_offset += key_sizes[i]; } - if (ucc_unlikely(0 > *segment)) { - tl_error(UCC_TL_TEAM_LIB(team), - "attempt to perform one-sided operation on non-registered memory %p", va); - return UCC_ERR_NOT_FOUND; - } - if (ucc_unlikely(NULL == UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment))) { - ucs_status_t ucs_status = - ucp_ep_rkey_unpack(*ep, PTR_OFFSET(keys, key_offset), - &UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment)); - if (UCS_OK != ucs_status) { - return ucs_status_to_ucc_status(ucs_status); + if (0 > *segment) { + key_offset = 0; + for (i = 0; i < ctx->n_dynrinfo_segs; i++) { + uint64_t base = (uint64_t)ctx->dynamic_remote_info[i].va_base; + uint64_t end = base + ctx->dynamic_remote_info[i].len; + uint64_t check_base = (uint64_t)va; + uint64_t check_end = check_base + msglen; + size_t num_keys = 0; + void *packed_key = NULL; + size_t team_size = UCC_TL_TEAM_SIZE(team); + if (check_base >= base && check_base < end && check_end <= end) { + *segment = i; + *rva = UCC_TL_UCP_REMOTE_DYN_RVA(ctx, peer, i); + num_keys = *segment - ctx->dyn_seg.starting_seg[*segment]; + for (int j = 0; j < num_keys; j++) { + key_offset += UCC_TL_UCP_REMOTE_DYN_KEY_SIZE( + ctx, peer, ctx->dyn_seg.starting_seg[*segment] + j); + } + packed_key = + UCC_TL_UCP_REMOTE_DYN_KEY(ctx, peer, key_offset, *segment); + /* dynamic segment keys should be placed AFTER + * the ctx's keys (i.e., num_static_segs + segment_number) */ + if (ucc_unlikely(NULL == UCC_TL_UCP_DYN_REMOTE_RKEY( + ctx, peer, team_size, *segment))) { + ucs_status_t ucs_status = + ucp_ep_rkey_unpack(*ep, packed_key, + &UCC_TL_UCP_DYN_REMOTE_RKEY( + ctx, peer, team_size, *segment)); + if (UCS_OK != ucs_status) { + return ucs_status_to_ucc_status(ucs_status); + } + } + *rkey = + UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, team_size, *segment); + *packed_memh = (ctx->dynamic_remote_info[i].packed_memh) + ? ctx->dynamic_remote_info[i].mem_h + : NULL; + return UCC_OK; + } } } - *rkey = UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment); - *rva = rvas[*segment] + ((uint64_t)va - (uint64_t)team->va_base[*segment]); - return UCC_OK; + + tl_error( + UCC_TL_TEAM_LIB(team), + "attempt to perform one-sided operation on non-registered memory %p", + va); + return UCC_ERR_NOT_FOUND; } static inline ucc_status_t ucc_tl_ucp_flush(ucc_tl_ucp_team_t *team) @@ -322,13 +366,15 @@ static inline ucc_status_t ucc_tl_ucp_ep_flush(ucc_rank_t dest_group_rank, static inline ucc_status_t ucc_tl_ucp_put_nb(void *buffer, void *target, size_t msglen, ucc_rank_t dest_group_rank, + ucc_memory_type_t mtype, ucc_tl_ucp_team_t *team, ucc_tl_ucp_task_t *task) { - ucp_request_param_t req_param = {0}; - int segment = 0; - ucp_rkey_h rkey = NULL; - uint64_t rva = 0; + ucp_request_param_t req_param = {0}; + int segment = 0; + ucp_rkey_h rkey = NULL; + uint64_t rva = 0; + void *packed_memh = NULL; ucs_status_ptr_t ucp_status; ucc_status_t status; ucp_ep_h ep; @@ -338,19 +384,25 @@ static inline ucc_status_t ucc_tl_ucp_put_nb(void *buffer, void *target, return status; } - status = ucc_tl_ucp_resolve_p2p_by_va(team, target, &ep, dest_group_rank, - &rva, &rkey, &segment); + status = + ucc_tl_ucp_resolve_p2p_by_va(team, target, msglen, &ep, dest_group_rank, + &rva, &rkey, &packed_memh, &segment); if (ucc_unlikely(UCC_OK != status)) { return status; } - req_param.op_attr_mask = - UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA; - req_param.cb.send = ucc_tl_ucp_put_completion_cb; - req_param.user_data = (void *)task; + req_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_USER_DATA | + UCP_OP_ATTR_FIELD_MEMORY_TYPE; + req_param.cb.send = ucc_tl_ucp_put_completion_cb; + req_param.user_data = (void *)task; + req_param.memory_type = ucc_memtype_to_ucs[mtype]; + if (packed_memh) { + req_param.op_attr_mask |= UCP_OP_ATTR_FIELD_MEMH; + req_param.memh = packed_memh; + } ucp_status = ucp_put_nbx(ep, buffer, msglen, rva, rkey, &req_param); - task->onesided.put_posted++; if (UCS_OK != ucp_status) { if (UCS_PTR_IS_ERR(ucp_status)) { @@ -365,13 +417,15 @@ static inline ucc_status_t ucc_tl_ucp_put_nb(void *buffer, void *target, static inline ucc_status_t ucc_tl_ucp_get_nb(void *buffer, void *target, size_t msglen, ucc_rank_t dest_group_rank, + ucc_memory_type_t mtype, ucc_tl_ucp_team_t *team, ucc_tl_ucp_task_t *task) { - ucp_request_param_t req_param = {0}; - int segment = 0; - ucp_rkey_h rkey = NULL; - uint64_t rva = 0; + ucp_request_param_t req_param = {0}; + int segment = 0; + ucp_rkey_h rkey = NULL; + uint64_t rva = 0; + void *packed_memh = NULL; ucs_status_ptr_t ucp_status; ucc_status_t status; ucp_ep_h ep; @@ -381,19 +435,25 @@ static inline ucc_status_t ucc_tl_ucp_get_nb(void *buffer, void *target, return status; } - status = ucc_tl_ucp_resolve_p2p_by_va(team, target, &ep, dest_group_rank, - &rva, &rkey, &segment); + status = + ucc_tl_ucp_resolve_p2p_by_va(team, target, msglen, &ep, dest_group_rank, + &rva, &rkey, &packed_memh, &segment); if (ucc_unlikely(UCC_OK != status)) { return status; } - req_param.op_attr_mask = - UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA; - req_param.cb.send = ucc_tl_ucp_get_completion_cb; - req_param.user_data = (void *)task; + req_param.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_USER_DATA | + UCP_OP_ATTR_FIELD_MEMORY_TYPE; + req_param.cb.send = ucc_tl_ucp_get_completion_cb; + req_param.user_data = (void *)task; + req_param.memory_type = ucc_memtype_to_ucs[mtype]; + if (packed_memh) { + req_param.op_attr_mask |= UCP_OP_ATTR_FIELD_MEMH; + req_param.memh = packed_memh; + } ucp_status = ucp_get_nbx(ep, buffer, msglen, rva, rkey, &req_param); - task->onesided.get_posted++; if (UCS_OK != ucp_status) { if (UCS_PTR_IS_ERR(ucp_status)) { @@ -410,11 +470,12 @@ static inline ucc_status_t ucc_tl_ucp_atomic_inc(void * target, ucc_rank_t dest_group_rank, ucc_tl_ucp_team_t *team) { - ucp_request_param_t req_param = {0}; - int segment = 0; - uint64_t one = 1; - ucp_rkey_h rkey = NULL; - uint64_t rva = 0; + ucp_request_param_t req_param = {0}; + int segment = 0; + uint64_t one = 1; + ucp_rkey_h rkey = NULL; + uint64_t rva = 0; + void *packed_memh = NULL; ucs_status_ptr_t ucp_status; ucc_status_t status; ucp_ep_h ep; @@ -424,8 +485,9 @@ static inline ucc_status_t ucc_tl_ucp_atomic_inc(void * target, return status; } - status = ucc_tl_ucp_resolve_p2p_by_va(team, target, &ep, dest_group_rank, - &rva, &rkey, &segment); + status = ucc_tl_ucp_resolve_p2p_by_va(team, target, sizeof(uint64_t), &ep, + dest_group_rank, &rva, &rkey, + &packed_memh, &segment); if (ucc_unlikely(UCC_OK != status)) { return status; } diff --git a/src/components/tl/ucp/tl_ucp_team.c b/src/components/tl/ucp/tl_ucp_team.c index d7a8a8041c..cc4eb7ee71 100644 --- a/src/components/tl/ucp/tl_ucp_team.c +++ b/src/components/tl/ucp/tl_ucp_team.c @@ -168,7 +168,6 @@ ucc_status_t ucc_tl_ucp_team_create_test(ucc_base_team_t *tl_team) { ucc_tl_ucp_team_t * team = ucc_derived_of(tl_team, ucc_tl_ucp_team_t); ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); - int i; ucc_status_t status; if (USE_SERVICE_WORKER(team)) { @@ -190,13 +189,6 @@ ucc_status_t ucc_tl_ucp_team_create_test(ucc_base_team_t *tl_team) } } - if (ctx->remote_info) { - for (i = 0; i < ctx->n_rinfo_segs; i++) { - team->va_base[i] = ctx->remote_info[i].va_base; - team->base_length[i] = ctx->remote_info[i].len; - } - } - tl_debug(tl_team->context->lib, "initialized tl team: %p", team); team->status = UCC_OK; return UCC_OK; diff --git a/src/ucc/api/ucc.h b/src/ucc/api/ucc.h index 68b76a32eb..76fbf0c70a 100644 --- a/src/ucc/api/ucc.h +++ b/src/ucc/api/ucc.h @@ -166,7 +166,6 @@ typedef enum ucc_memory_type { UCC_MEMORY_TYPE_CUDA_MANAGED, /*!< NVIDIA CUDA managed memory */ UCC_MEMORY_TYPE_ROCM, /*!< AMD ROCM memory */ UCC_MEMORY_TYPE_ROCM_MANAGED, /*!< AMD ROCM managed system memory */ - UCC_MEMORY_TYPE_DPU, /*!< DPU memory */ UCC_MEMORY_TYPE_LAST, UCC_MEMORY_TYPE_UNKNOWN = UCC_MEMORY_TYPE_LAST } ucc_memory_type_t; @@ -899,7 +898,6 @@ typedef struct ucc_mem_map { void * address; /*!< the address of a buffer to be attached to a UCC context */ size_t len; /*!< the length of the buffer */ - ucc_memory_type_t mem_type; /*!< the memory type */ void * resource; /*!< resource associated with the address. examples of resources include memory keys. */ @@ -1709,17 +1707,12 @@ typedef enum { Note, the status is not guaranteed to be global on all the processes participating in the collective.*/ - UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS = UCC_BIT(7), /*!< If set, both src + UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS = UCC_BIT(7) /*!< If set, both src and dst buffers reside in a memory mapped region. Useful for one-sided collectives. */ - UCC_COLL_ARGS_FLAG_MEM_MAP = UCC_BIT(8) /*!< If set, map both - src and dst buffers. - Useful for one-sided - collectives in message - passing programming models */ } ucc_coll_args_flags_t; /** @@ -1881,10 +1874,17 @@ typedef struct ucc_coll_args { ucc_coll_callback_t cb; double timeout; /*!< Timeout in seconds */ ucc_mem_map_params_t mem_map; /*!< Memory regions to be used - in during the collective - operation for one-sided - collectives. Not necessary - for two-sided collectives */ + for the current and/or + future one-sided collectives. + If set, the designated regions + will be mapped and information + exchanged with the team + associated with the collective + via an allgather operation. + It is recommended to use this + option sparingly due to the + increased overhead. Not necessary + for two-sided collectives. */ struct { uint64_t start; int64_t stride; From 20a13af85e8a80594d9ed59f97a76da3a98a3405 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Thu, 15 Feb 2024 14:51:06 -0800 Subject: [PATCH 03/14] TEST: update gtest mem map usage --- test/gtest/common/test_ucc.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/gtest/common/test_ucc.cc b/test/gtest/common/test_ucc.cc index 40b51c1f56..2234ddb9e4 100644 --- a/test/gtest/common/test_ucc.cc +++ b/test/gtest/common/test_ucc.cc @@ -430,10 +430,10 @@ void proc_context_create(UccProcess_h proc, int id, ThreadAllgather *ta, bool is void proc_context_create_mem_params(UccProcess_h proc, int id, ThreadAllgather *ta) { + ucc_mem_map_t map[UCC_TEST_N_MEM_SEGMENTS] = {0}; ucc_status_t status; ucc_context_config_h ctx_config; std::stringstream err_msg; - ucc_mem_map_t map[UCC_TEST_N_MEM_SEGMENTS]; status = ucc_context_config_read(proc->lib_h, NULL, &ctx_config); if (status != UCC_OK) { From 15403c7a9e01d4b0b1f8e8449529db59428d0c16 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Thu, 15 Feb 2024 15:35:02 -0800 Subject: [PATCH 04/14] TEST: update mpi test mem map usage --- test/gtest/common/test_ucc.cc | 2 +- test/mpi/test_mpi.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/gtest/common/test_ucc.cc b/test/gtest/common/test_ucc.cc index 2234ddb9e4..273489c2c8 100644 --- a/test/gtest/common/test_ucc.cc +++ b/test/gtest/common/test_ucc.cc @@ -430,7 +430,7 @@ void proc_context_create(UccProcess_h proc, int id, ThreadAllgather *ta, bool is void proc_context_create_mem_params(UccProcess_h proc, int id, ThreadAllgather *ta) { - ucc_mem_map_t map[UCC_TEST_N_MEM_SEGMENTS] = {0}; + ucc_mem_map_t map[UCC_TEST_N_MEM_SEGMENTS] = {}; ucc_status_t status; ucc_context_config_h ctx_config; std::stringstream err_msg; diff --git a/test/mpi/test_mpi.cc b/test/mpi/test_mpi.cc index 147ce1fd7d..c481968f8e 100644 --- a/test/mpi/test_mpi.cc +++ b/test/mpi/test_mpi.cc @@ -41,11 +41,11 @@ static ucc_status_t oob_allgather_free(void *req) UccTestMpi::UccTestMpi(int argc, char *argv[], ucc_thread_mode_t _tm, int is_local, bool with_onesided) { + ucc_mem_map_t segments[UCC_TEST_N_MEM_SEGMENTS] = {0}; ucc_lib_config_h lib_config; ucc_context_config_h ctx_config; int size, rank; char *prev_env; - ucc_mem_map_t segments[UCC_TEST_N_MEM_SEGMENTS]; MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); From 16499407bd5ebbbdc93555751bdfd73429807fac Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Tue, 20 Feb 2024 09:47:53 -0800 Subject: [PATCH 05/14] API: clarify collective mem map destruction --- src/ucc/api/ucc.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ucc/api/ucc.h b/src/ucc/api/ucc.h index 76fbf0c70a..6d28276683 100644 --- a/src/ucc/api/ucc.h +++ b/src/ucc/api/ucc.h @@ -1881,6 +1881,8 @@ typedef struct ucc_coll_args { exchanged with the team associated with the collective via an allgather operation. + Memory is unmapped during + context destruction. It is recommended to use this option sparingly due to the increased overhead. Not necessary From 7a26248b53167b3389528b4647310ec0910cefe9 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Tue, 20 Feb 2024 23:04:05 -0800 Subject: [PATCH 06/14] TL/UCP: convert dyn rva/key/etc to macros --- src/components/tl/ucp/tl_ucp.h | 73 ++++++++++++++-------------------- 1 file changed, 29 insertions(+), 44 deletions(-) diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 74b230edbe..fc2d4acb07 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -217,10 +217,36 @@ extern ucc_config_field_t ucc_tl_ucp_lib_config_table[]; #define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _size, _seg) \ ((_ctx)->rkeys[_size * _ctx->n_rinfo_segs + _rank * _ctx->n_dynrinfo_segs + _seg]) -/* #define UCC_TL_UCP_REMOTE_DYN_RVA(_ctx, _rank, _seg) \ - (PTR_OFFSET((_ctx)->dyn_seg.dyn_buff, (_ctx)->dyn_seg.seg_group_start[_seg] + (_ctx)->dyn_seg.seg_groups[_seg] * _rank)) -*/ + *(uint64_t *)(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \ + _ctx->dyn_seg.seg_group_start[_seg] \ + + _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \ + + (_seg - _ctx->dyn_seg.starting_seg[_seg]) * sizeof(uint64_t))) + +#define UCC_TL_UCP_REMOTE_DYN_LEN(_ctx, _rank, _seg) \ + *(uint64_t *)(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \ + sizeof(uint64_t) \ + * _ctx->dyn_seg.num_seg_per_group[_ctx->dyn_seg.seg_groups[_seg]] \ + + _ctx->dyn_seg.seg_group_start[_seg] \ + + _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \ + + (_seg - _ctx->dyn_seg.starting_seg[_seg]) * sizeof(uint64_t))) + +#define UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(_ctx, _rank, _seg) \ + *(uint64_t *)(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \ + 2 * sizeof(uint64_t) \ + * _ctx->dyn_seg.num_seg_per_group[_ctx->dyn_seg.seg_groups[_seg]] \ + + _ctx->dyn_seg.seg_group_start[_seg] \ + + _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \ + + (_seg - _ctx->dyn_seg.starting_seg[_seg]) * sizeof(uint64_t))) + +#define UCC_TL_UCP_REMOTE_DYN_KEY(_ctx, _rank, _offset, _seg) \ + (PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \ + 3 * sizeof(uint64_t) \ + * _ctx->dyn_seg.num_seg_per_group[_ctx->dyn_seg.seg_groups[_seg]] \ + + _ctx->dyn_seg.seg_group_start[_seg] \ + + _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \ + + _offset)) + extern ucs_memory_type_t ucc_memtype_to_ucs[UCC_MEMORY_TYPE_LAST+1]; void ucc_tl_ucp_pre_register_mem(ucc_tl_ucp_team_t *team, void *addr, @@ -229,45 +255,4 @@ void ucc_tl_ucp_pre_register_mem(ucc_tl_ucp_team_t *team, void *addr, ucc_status_t ucc_tl_ucp_ctx_remote_populate(ucc_tl_ucp_context_t *ctx, ucc_mem_map_params_t map, ucc_team_oob_coll_t oob); - -// FIXME convert to macro -static inline uint64_t UCC_TL_UCP_REMOTE_DYN_RVA(ucc_tl_ucp_context_t *ctx, - ucc_rank_t rank, - uint64_t seg) -{ - int seg_group_id = ctx->dyn_seg.seg_groups[seg]; - uint64_t *prva = PTR_OFFSET(ctx->dyn_seg.dyn_buff, ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t)); - return *prva;//[seg - ctx->dyn_seg.starting_seg[seg]]; -} - -// FIXME convert to macro -static inline uint64_t UCC_TL_UCP_REMOTE_DYN_LEN(ucc_tl_ucp_context_t *ctx, - ucc_rank_t rank, - uint64_t seg) -{ - int seg_group_id = ctx->dyn_seg.seg_groups[seg]; - uint64_t *plen = PTR_OFFSET(ctx->dyn_seg.dyn_buff, sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t)); - return *plen;//[seg - ctx->dyn_seg.starting_seg[seg]]; -} - -// FIXME convert to macro -static inline uint64_t UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(ucc_tl_ucp_context_t *ctx, - ucc_rank_t rank, - uint64_t seg) -{ - int seg_group_id = ctx->dyn_seg.seg_groups[seg]; - uint64_t *pkey_size = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 2 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + (seg - ctx->dyn_seg.starting_seg[seg]) * sizeof(uint64_t)); - return *pkey_size;//[seg - ctx->dyn_seg.starting_seg[seg]]; -} - -// FIXME convert to macro -static inline void * UCC_TL_UCP_REMOTE_DYN_KEY(ucc_tl_ucp_context_t *ctx, - ucc_rank_t rank, - size_t offset, - uint64_t seg) -{ - int seg_group_id = ctx->dyn_seg.seg_groups[seg]; - void *pkey = PTR_OFFSET(ctx->dyn_seg.dyn_buff, 3 * sizeof(uint64_t) * ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.seg_groups[seg]] + ctx->dyn_seg.seg_group_start[seg] + ctx->dyn_seg.seg_group_size[seg_group_id] * rank + offset); - return pkey; -} #endif From 78ab3589f80e705cb04df988481353ccd2b94563 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Wed, 21 Feb 2024 13:01:36 -0800 Subject: [PATCH 07/14] TL/UCP: force enablement of dynamic segments --- src/components/tl/ucp/tl_ucp.c | 5 +++++ src/components/tl/ucp/tl_ucp.h | 1 + src/components/tl/ucp/tl_ucp_coll.c | 3 ++- src/components/tl/ucp/tl_ucp_context.c | 8 ++++++-- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 72586dc5de..038c3ba4ca 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -189,6 +189,11 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, use_reordering), UCC_CONFIG_TYPE_BOOL}, + {"USE_DYNAMIC_SEGMENTS", "n", + "Use dynamic segments in TL UCP for onesided collectives", + ucc_offsetof(ucc_tl_ucp_lib_config_t, use_dynamic_segments), + UCC_CONFIG_TYPE_BOOL}, + {NULL}}; static ucs_config_field_t ucc_tl_ucp_context_config_table[] = { diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index fc2d4acb07..67b3d73d4d 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -74,6 +74,7 @@ typedef struct ucc_tl_ucp_lib_config { uint32_t alltoallv_hybrid_pairwise_num_posts; ucc_ternary_auto_value_t use_topo; int use_reordering; + int use_dynamic_segments; } ucc_tl_ucp_lib_config_t; typedef struct ucc_tl_ucp_context_config { diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 12ae2d217a..cf759fc3c4 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -257,11 +257,12 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segments(ucc_coll_args_t *coll_args, ucc_tl_ucp_task_t *task) { ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); + ucc_tl_ucp_lib_t *tl_lib = UCC_TL_UCP_TEAM_LIB(tl_team); ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); int i = 0; ucc_status_t status; - if (coll_args->mem_map.n_segments > 0) { + if (tl_lib->cfg.use_dynamic_segments && coll_args->mem_map.n_segments > 0) { int starting_index = ctx->n_dynrinfo_segs; size_t seg_pack_size = 0; size_t *global_size = NULL; diff --git a/src/components/tl/ucp/tl_ucp_context.c b/src/components/tl/ucp/tl_ucp_context.c index 77efbba97f..c6ec78cf4e 100644 --- a/src/components/tl/ucp/tl_ucp_context.c +++ b/src/components/tl/ucp/tl_ucp_context.c @@ -163,8 +163,12 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t, ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_TAG_SENDER_MASK | UCP_PARAM_FIELD_NAME; - ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_AM | UCP_FEATURE_RMA | - UCP_FEATURE_AMO64 | UCP_FEATURE_EXPORTED_MEMH; + ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_AM; + if ((params->params.mask & UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS) || + lib->cfg.use_dynamic_segments) { + ucp_params.features |= UCP_FEATURE_RMA | UCP_FEATURE_AMO64 | + UCP_FEATURE_EXPORTED_MEMH; + } ucp_params.tag_sender_mask = UCC_TL_UCP_TAG_SENDER_MASK; ucp_params.name = "UCC_UCP_CONTEXT"; From cd9d1d23d2dead8bb89df2c25c5607df06d84041 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Wed, 21 Feb 2024 23:28:12 -0800 Subject: [PATCH 08/14] TL/UCP: add xgvmi usage option --- src/components/tl/ucp/tl_ucp.c | 5 +++++ src/components/tl/ucp/tl_ucp.h | 1 + src/components/tl/ucp/tl_ucp_context.c | 10 ++++++---- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 038c3ba4ca..051f213f2e 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -194,6 +194,11 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, use_dynamic_segments), UCC_CONFIG_TYPE_BOOL}, + {"USE_XGVMI", "n", + "Use XGVMI for onesided collectives", + ucc_offsetof(ucc_tl_ucp_lib_config_t, use_xgvmi), + UCC_CONFIG_TYPE_BOOL}, + {NULL}}; static ucs_config_field_t ucc_tl_ucp_context_config_table[] = { diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 67b3d73d4d..e8edef393f 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -75,6 +75,7 @@ typedef struct ucc_tl_ucp_lib_config { ucc_ternary_auto_value_t use_topo; int use_reordering; int use_dynamic_segments; + int use_xgvmi; } ucc_tl_ucp_lib_config_t; typedef struct ucc_tl_ucp_context_config { diff --git a/src/components/tl/ucp/tl_ucp_context.c b/src/components/tl/ucp/tl_ucp_context.c index c6ec78cf4e..4516814bcc 100644 --- a/src/components/tl/ucp/tl_ucp_context.c +++ b/src/components/tl/ucp/tl_ucp_context.c @@ -164,10 +164,12 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t, ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_TAG_SENDER_MASK | UCP_PARAM_FIELD_NAME; ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_AM; - if ((params->params.mask & UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS) || - lib->cfg.use_dynamic_segments) { - ucp_params.features |= UCP_FEATURE_RMA | UCP_FEATURE_AMO64 | - UCP_FEATURE_EXPORTED_MEMH; + if (((params->params.mask & UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS) || + lib->cfg.use_dynamic_segments)) { + ucp_params.features |= UCP_FEATURE_RMA | UCP_FEATURE_AMO64; + } + if (lib->cfg.use_xgvmi) { + ucp_params.features |= UCP_FEATURE_EXPORTED_MEMH; } ucp_params.tag_sender_mask = UCC_TL_UCP_TAG_SENDER_MASK; ucp_params.name = "UCC_UCP_CONTEXT"; From fef2a3798ceed60e4a7f311a9415d41a744cac15 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Fri, 19 Apr 2024 16:33:05 -0700 Subject: [PATCH 09/14] REVIEW: address feedback --- src/components/tl/ucp/alltoall/alltoall.c | 11 +- .../tl/ucp/alltoall/alltoall_onesided.c | 8 +- .../tl/ucp/alltoallv/alltoallv_onesided.c | 15 +- src/components/tl/ucp/tl_ucp.c | 5 - src/components/tl/ucp/tl_ucp.h | 64 +-- src/components/tl/ucp/tl_ucp_coll.c | 383 +++++++++--------- src/components/tl/ucp/tl_ucp_coll.h | 8 +- src/components/tl/ucp/tl_ucp_context.c | 27 +- src/components/tl/ucp/tl_ucp_sendrecv.h | 120 +++--- src/ucc/api/ucc.h | 38 +- src/ucc/api/ucc_status.h | 1 + src/utils/ucc_status.c | 2 + 12 files changed, 322 insertions(+), 360 deletions(-) diff --git a/src/components/tl/ucp/alltoall/alltoall.c b/src/components/tl/ucp/alltoall/alltoall.c index 3803d96426..3409ac3aec 100644 --- a/src/components/tl/ucp/alltoall/alltoall.c +++ b/src/components/tl/ucp/alltoall/alltoall.c @@ -72,8 +72,8 @@ ucc_status_t ucc_tl_ucp_alltoall_pairwise_init(ucc_base_coll_args_t *coll_args, } ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args, - ucc_base_team_t *team, - ucc_coll_task_t **task_h) + ucc_base_team_t *team, + ucc_coll_task_t **task_h) { ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); ucc_tl_ucp_task_t *task; @@ -99,7 +99,12 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args, *task_h = &task->super; task->super.post = ucc_tl_ucp_alltoall_onesided_start; task->super.progress = ucc_tl_ucp_alltoall_onesided_progress; - status = UCC_OK; + + status = ucc_tl_ucp_coll_dynamic_segment_init(&coll_args->args, task); + if (UCC_OK != status) { + tl_error(UCC_TL_TEAM_LIB(tl_team), + "failed to initialize dynamic segments"); + } out: return status; } diff --git a/src/components/tl/ucp/alltoall/alltoall_onesided.c b/src/components/tl/ucp/alltoall/alltoall_onesided.c index f84f98ca5e..c8943e5b0b 100644 --- a/src/components/tl/ucp/alltoall/alltoall_onesided.c +++ b/src/components/tl/ucp/alltoall/alltoall_onesided.c @@ -27,9 +27,14 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_start(ucc_coll_task_t *ctask) long *pSync = TASK_ARGS(task).global_work_buffer; ucc_memory_type_t mtype = TASK_ARGS(task).src.info.mem_type; ucc_rank_t peer; + ucc_status_t status; ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); - ucc_tl_ucp_coll_dynamic_segments(&TASK_ARGS(task), task); + status = ucc_tl_ucp_coll_dynamic_segment_exchange(task); + if (UCC_OK != status) { + task->super.status = status; + goto out; + } /* TODO: change when support for library-based work buffers is complete */ nelems = (nelems / gsize) * ucc_dt_size(TASK_ARGS(task).src.info.datatype); @@ -66,4 +71,5 @@ void ucc_tl_ucp_alltoall_onesided_progress(ucc_coll_task_t *ctask) pSync[0] = 0; task->super.status = UCC_OK; + ucc_tl_ucp_coll_dynamic_segment_finalize(task); } diff --git a/src/components/tl/ucp/alltoallv/alltoallv_onesided.c b/src/components/tl/ucp/alltoallv/alltoallv_onesided.c index 5c446a6941..fed6cf84f3 100644 --- a/src/components/tl/ucp/alltoallv/alltoallv_onesided.c +++ b/src/components/tl/ucp/alltoallv/alltoallv_onesided.c @@ -26,10 +26,15 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask) size_t rdt_size = ucc_dt_size(TASK_ARGS(task).dst.info_v.datatype); ucc_memory_type_t mtype = TASK_ARGS(task).src.info_v.mem_type; ucc_rank_t peer; + ucc_status_t status; size_t sd_disp, dd_disp, data_size; ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); - ucc_tl_ucp_coll_dynamic_segments(&TASK_ARGS(task), task); + status = ucc_tl_ucp_coll_dynamic_segment_exchange(task); + if (UCC_OK != status) { + task->super.status = status; + goto out; + } /* perform a put to each member peer using the peer's index in the * destination displacement. */ @@ -70,6 +75,7 @@ void ucc_tl_ucp_alltoallv_onesided_progress(ucc_coll_task_t *ctask) pSync[0] = 0; task->super.status = UCC_OK; + ucc_tl_ucp_coll_dynamic_segment_finalize(task); } ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args, @@ -100,7 +106,12 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args, *task_h = &task->super; task->super.post = ucc_tl_ucp_alltoallv_onesided_start; task->super.progress = ucc_tl_ucp_alltoallv_onesided_progress; - status = UCC_OK; + + status = ucc_tl_ucp_coll_dynamic_segment_init(&coll_args->args, task); + if (UCC_OK != status) { + tl_error(UCC_TL_TEAM_LIB(tl_team), + "failed to initialize dynamic segments"); + } out: return status; } diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 051f213f2e..a7ca9862e2 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -189,11 +189,6 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, use_reordering), UCC_CONFIG_TYPE_BOOL}, - {"USE_DYNAMIC_SEGMENTS", "n", - "Use dynamic segments in TL UCP for onesided collectives", - ucc_offsetof(ucc_tl_ucp_lib_config_t, use_dynamic_segments), - UCC_CONFIG_TYPE_BOOL}, - {"USE_XGVMI", "n", "Use XGVMI for onesided collectives", ucc_offsetof(ucc_tl_ucp_lib_config_t, use_xgvmi), diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index e8edef393f..6a4f40ed25 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -74,7 +74,6 @@ typedef struct ucc_tl_ucp_lib_config { uint32_t alltoallv_hybrid_pairwise_num_posts; ucc_ternary_auto_value_t use_topo; int use_reordering; - int use_dynamic_segments; int use_xgvmi; } ucc_tl_ucp_lib_config_t; @@ -98,28 +97,6 @@ typedef struct ucc_tl_ucp_lib { UCC_CLASS_DECLARE(ucc_tl_ucp_lib_t, const ucc_base_lib_params_t *, const ucc_base_config_t *); -/* dynamic segments stored in a flat buffer. An example with 4 segments on - * two PEs, with segments stored two at a time (collective with src/dst pair): - -rva/key => (rva, len, key size, key) tuple - -+-----------------------------+-----------------------------+ -| seg group 0 (seg 0 + seg 1) | seg group 1 (seg 2 + seg 3) | -+--------------+--------------+--------------+--------------+ -| rva/key pe 0 | rva/key pe 1 | rva/key pe 0 | rva/key pe 1 | -+--------------+--------------+--------------+--------------+ -*/ -typedef struct ucc_tl_ucp_dynamic_seg { - void *dyn_buff; /* flat buffer with rva, keys, etc. */ - size_t buff_size; - size_t *seg_groups; /* segment to segment group mapping */ - size_t *seg_group_start; /* offset of dyn_buff to start of seg group */ - size_t *seg_group_size; /* storage size of a seg group */ - size_t *starting_seg; /* starting seg for a seg group */ - size_t *num_seg_per_group; - size_t num_groups; -} ucc_tl_ucp_dynamic_seg_t; - typedef struct ucc_tl_ucp_remote_info { void * va_base; size_t len; @@ -146,13 +123,14 @@ typedef struct ucc_tl_ucp_context { uint32_t service_worker_throttling_count; ucc_mpool_t req_mp; ucc_tl_ucp_remote_info_t *remote_info; - ucc_tl_ucp_remote_info_t *dynamic_remote_info; - ucc_tl_ucp_dynamic_seg_t dyn_seg; ucp_rkey_h * rkeys; uint64_t n_rinfo_segs; - uint64_t n_dynrinfo_segs; uint64_t ucp_memory_types; int topo_required; + ucc_tl_ucp_remote_info_t *dynamic_remote_info; + void *dyn_seg_buf; + ucp_rkey_h *dyn_rkeys; + size_t n_dynrinfo_segs; } ucc_tl_ucp_context_t; UCC_CLASS_DECLARE(ucc_tl_ucp_context_t, const ucc_base_context_params_t *, const ucc_base_config_t *); @@ -216,38 +194,8 @@ extern ucc_config_field_t ucc_tl_ucp_lib_config_table[]; #define UCC_TL_UCP_REMOTE_RKEY(_ctx, _rank, _seg) \ ((_ctx)->rkeys[_rank * _ctx->n_rinfo_segs + _seg]) -#define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _size, _seg) \ - ((_ctx)->rkeys[_size * _ctx->n_rinfo_segs + _rank * _ctx->n_dynrinfo_segs + _seg]) - -#define UCC_TL_UCP_REMOTE_DYN_RVA(_ctx, _rank, _seg) \ - *(uint64_t *)(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \ - _ctx->dyn_seg.seg_group_start[_seg] \ - + _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \ - + (_seg - _ctx->dyn_seg.starting_seg[_seg]) * sizeof(uint64_t))) - -#define UCC_TL_UCP_REMOTE_DYN_LEN(_ctx, _rank, _seg) \ - *(uint64_t *)(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \ - sizeof(uint64_t) \ - * _ctx->dyn_seg.num_seg_per_group[_ctx->dyn_seg.seg_groups[_seg]] \ - + _ctx->dyn_seg.seg_group_start[_seg] \ - + _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \ - + (_seg - _ctx->dyn_seg.starting_seg[_seg]) * sizeof(uint64_t))) - -#define UCC_TL_UCP_REMOTE_DYN_KEY_SIZE(_ctx, _rank, _seg) \ - *(uint64_t *)(PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \ - 2 * sizeof(uint64_t) \ - * _ctx->dyn_seg.num_seg_per_group[_ctx->dyn_seg.seg_groups[_seg]] \ - + _ctx->dyn_seg.seg_group_start[_seg] \ - + _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \ - + (_seg - _ctx->dyn_seg.starting_seg[_seg]) * sizeof(uint64_t))) - -#define UCC_TL_UCP_REMOTE_DYN_KEY(_ctx, _rank, _offset, _seg) \ - (PTR_OFFSET(_ctx->dyn_seg.dyn_buff, \ - 3 * sizeof(uint64_t) \ - * _ctx->dyn_seg.num_seg_per_group[_ctx->dyn_seg.seg_groups[_seg]] \ - + _ctx->dyn_seg.seg_group_start[_seg] \ - + _ctx->dyn_seg.seg_group_size[_ctx->dyn_seg.seg_groups[_seg]] * _rank \ - + _offset)) +#define UCC_TL_UCP_DYN_REMOTE_RKEY(_ctx, _rank, _seg) \ + ((_ctx)->dyn_rkeys[_rank * _ctx->n_dynrinfo_segs + _seg]) extern ucs_memory_type_t ucc_memtype_to_ucs[UCC_MEMORY_TYPE_LAST+1]; diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index cf759fc3c4..013ffbfeb4 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -155,10 +155,9 @@ ucc_status_t ucc_tl_ucp_coll_finalize(ucc_coll_task_t *coll_task) return UCC_OK; } -static void ucc_tl_ucp_pack_data(ucc_tl_ucp_context_t *ctx, int starting_index, - void *pack) +static void ucc_tl_ucp_pack_data(ucc_tl_ucp_context_t *ctx, void *pack) { - uint64_t nsegs = ctx->n_dynrinfo_segs - starting_index; + uint64_t nsegs = ctx->n_dynrinfo_segs; uint64_t offset = 0; size_t section_offset = sizeof(uint64_t) * nsegs; void *keys; @@ -175,19 +174,17 @@ static void ucc_tl_ucp_pack_data(ucc_tl_ucp_context_t *ctx, int starting_index, keys = PTR_OFFSET(pack, (section_offset * 3)); for (i = 0; i < nsegs; i++) { - int index = i + starting_index; - rvas[i] = (uint64_t)ctx->dynamic_remote_info[index].va_base; - lens[i] = ctx->dynamic_remote_info[index].len; - key_sizes[i] = ctx->dynamic_remote_info[index].packed_key_len; - memcpy(PTR_OFFSET(keys, offset), - ctx->dynamic_remote_info[index].packed_key, - ctx->dynamic_remote_info[index].packed_key_len); - offset += ctx->dynamic_remote_info[index].packed_key_len; + rvas[i] = (uint64_t)ctx->dynamic_remote_info[i].va_base; + lens[i] = ctx->dynamic_remote_info[i].len; + key_sizes[i] = ctx->dynamic_remote_info[i].packed_key_len; + memcpy(PTR_OFFSET(keys, offset), ctx->dynamic_remote_info[i].packed_key, + ctx->dynamic_remote_info[i].packed_key_len); + offset += ctx->dynamic_remote_info[i].packed_key_len; } } -ucc_status_t ucc_tl_ucp_memmap_append_segment(ucc_tl_ucp_task_t *task, - ucc_mem_map_t *map, int segid) +ucc_status_t ucc_tl_ucp_memmap_segment(ucc_tl_ucp_task_t *task, + ucc_mem_map_t *map, int segid) { ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); ucc_tl_ucp_context_t *tl_ctx = UCC_TL_UCP_TEAM_CTX(tl_team); @@ -195,130 +192,151 @@ ucc_status_t ucc_tl_ucp_memmap_append_segment(ucc_tl_ucp_task_t *task, ucp_mem_map_params_t mmap_params; ucp_mem_h mh; - // map the memory + /* map the memory */ if (map->resource != NULL) { mmap_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_EXPORTED_MEMH_BUFFER; - mmap_params.exported_memh_buffer = map->resource; - - ucs_status = ucp_mem_map(tl_ctx->worker.ucp_context, &mmap_params, &mh); - if (ucs_status == UCS_ERR_UNREACHABLE) { - tl_error(tl_ctx->super.super.lib, "exported memh is unsupported"); - return ucs_status_to_ucc_status(ucs_status); - } else if (ucs_status < UCS_OK) { - tl_error(tl_ctx->super.super.lib, - "ucp_mem_map failed with error code: %d", ucs_status); - return ucs_status_to_ucc_status(ucs_status); - } - /* generate rkeys / packed keys */ - - tl_ctx->dynamic_remote_info[segid].va_base = map->address; - tl_ctx->dynamic_remote_info[segid].len = map->len; - tl_ctx->dynamic_remote_info[segid].mem_h = mh; + mmap_params.exported_memh_buffer = map->resource; tl_ctx->dynamic_remote_info[segid].packed_memh = map->resource; - ucs_status = - ucp_rkey_pack(tl_ctx->worker.ucp_context, mh, - &tl_ctx->dynamic_remote_info[segid].packed_key, - &tl_ctx->dynamic_remote_info[segid].packed_key_len); - if (UCS_OK != ucs_status) { - tl_error(tl_ctx->super.super.lib, - "failed to pack UCP key with error code: %d", ucs_status); - return ucs_status_to_ucc_status(ucs_status); - } } else { mmap_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH; - mmap_params.address = map->address; - mmap_params.length = map->len; - - ucs_status = ucp_mem_map(tl_ctx->worker.ucp_context, &mmap_params, &mh); - if (ucs_status != UCS_OK) { - tl_error(UCC_TASK_LIB(task), "failure in ucp_mem_map %s", - ucs_status_string(ucs_status)); - return ucs_status_to_ucc_status(ucs_status); - } - tl_ctx->dynamic_remote_info[segid].va_base = map->address; - tl_ctx->dynamic_remote_info[segid].len = map->len; - tl_ctx->dynamic_remote_info[segid].mem_h = mh; + mmap_params.address = map->address; + mmap_params.length = map->len; tl_ctx->dynamic_remote_info[segid].packed_memh = NULL; - ucs_status = - ucp_rkey_pack(tl_ctx->worker.ucp_context, mh, - &tl_ctx->dynamic_remote_info[segid].packed_key, - &tl_ctx->dynamic_remote_info[segid].packed_key_len); - if (UCS_OK != ucs_status) { - tl_error(tl_ctx->super.super.lib, - "failed to pack UCP key with error code: %d", ucs_status); - return ucs_status_to_ucc_status(ucs_status); + } + /* map exported memory handle */ + ucs_status = ucp_mem_map(tl_ctx->worker.ucp_context, &mmap_params, &mh); + if (ucs_status == UCS_ERR_UNREACHABLE) { + tl_error(tl_ctx->super.super.lib, "exported memh is unsupported"); + return UCC_ERR_MEM_MAP_FAILURE; + } else if (ucs_status < UCS_OK) { + tl_error(tl_ctx->super.super.lib, + "ucp_mem_map failed with error code: %d", ucs_status); + return UCC_ERR_MEM_MAP_FAILURE; + } + /* generate rkeys / packed keys */ + tl_ctx->dynamic_remote_info[segid].va_base = map->address; + tl_ctx->dynamic_remote_info[segid].len = map->len; + tl_ctx->dynamic_remote_info[segid].mem_h = mh; + ucs_status = + ucp_rkey_pack(tl_ctx->worker.ucp_context, mh, + &tl_ctx->dynamic_remote_info[segid].packed_key, + &tl_ctx->dynamic_remote_info[segid].packed_key_len); + if (UCS_OK != ucs_status) { + tl_error(tl_ctx->super.super.lib, + "failed to pack UCP key with error code: %d", ucs_status); + return ucs_status_to_ucc_status(ucs_status); + } + + return UCC_OK; +} + +ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, + ucc_tl_ucp_task_t *task) +{ + ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); + ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + int i = 0; + ucc_status_t status; + ucc_mem_map_t *maps = coll_args->mem_map.segments; + size_t n_segments = coll_args->mem_map.n_segments; + + if (n_segments == 0) { + maps = ucc_calloc(2, sizeof(ucc_mem_map_t)); + if (!maps) { + return UCC_ERR_NO_MEMORY; + } + + maps[0].address = coll_args->src.info.buffer; + maps[0].len = (coll_args->src.info.count / UCC_TL_TEAM_SIZE(tl_team)) * + ucc_dt_size(coll_args->src.info.datatype); + maps[0].resource = NULL; + + maps[1].address = coll_args->dst.info.buffer; + maps[1].len = (coll_args->dst.info.count / UCC_TL_TEAM_SIZE(tl_team)) * + ucc_dt_size(coll_args->dst.info.datatype); + maps[1].resource = NULL; + + n_segments = 2; + } + + ctx->dynamic_remote_info = + ucc_calloc(n_segments, sizeof(ucc_tl_ucp_remote_info_t), "dynamic remote info"); + /* map memory and fill in local segment information */ + for (i = 0; i < n_segments; i++) { + status = ucc_tl_ucp_memmap_segment(task, &maps[i], i); + if (status != UCC_OK) { + tl_error(UCC_TASK_LIB(task), "failed to memory map a segment"); + goto failed_memory_map; } + ++ctx->n_dynrinfo_segs; + } + if (coll_args->mem_map.n_segments == 0) { + free(maps); } return UCC_OK; +failed_memory_map: + for (i = 0; i < ctx->n_dynrinfo_segs; i++) { + if (ctx->dynamic_remote_info[i].mem_h) { + ucp_mem_unmap(ctx->worker.ucp_context, + ctx->dynamic_remote_info[i].mem_h); + } + if (ctx->dynamic_remote_info[i].packed_key) { + ucp_rkey_buffer_release(ctx->dynamic_remote_info[i].packed_key); + } + if (ctx->dynamic_remote_info[i].packed_memh) { + ucp_rkey_buffer_release(ctx->dynamic_remote_info[i].packed_memh); + } + } + ctx->n_dynrinfo_segs = 0; + if (coll_args->mem_map.n_segments == 0) { + free(maps); + } + return status; } -ucc_status_t ucc_tl_ucp_coll_dynamic_segments(ucc_coll_args_t *coll_args, - ucc_tl_ucp_task_t *task) +ucc_status_t ucc_tl_ucp_coll_dynamic_segment_exchange(ucc_tl_ucp_task_t *task) { - ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); - ucc_tl_ucp_lib_t *tl_lib = UCC_TL_UCP_TEAM_LIB(tl_team); - ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); - int i = 0; + ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); + ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + int i = 0; + size_t seg_pack_size = 0; + uint64_t *global_size = NULL; + void *ex_buffer = NULL; ucc_status_t status; - if (tl_lib->cfg.use_dynamic_segments && coll_args->mem_map.n_segments > 0) { - int starting_index = ctx->n_dynrinfo_segs; - size_t seg_pack_size = 0; - size_t *global_size = NULL; - size_t team_size = UCC_TL_TEAM_SIZE(tl_team); + if (ctx->n_dynrinfo_segs) { + size_t team_size = UCC_TL_TEAM_SIZE(tl_team); ucc_team_t *core_team = UCC_TL_CORE_TEAM(UCC_TL_UCP_TASK_TEAM(task)); - ucc_subset_t subset = {.map = tl_team->ctx_map, - .myrank = core_team->rank}; + ucc_subset_t subset = {.map = tl_team->ctx_map, + .myrank = core_team->rank}; ucc_service_coll_req_t *scoll_req; - void *ex_buffer; - ptrdiff_t old_offset; - - /* increase dynamic remote info size */ - ctx->dynamic_remote_info = ucc_realloc( - ctx->dynamic_remote_info, - sizeof(ucc_tl_ucp_remote_info_t) * - (ctx->n_dynrinfo_segs + coll_args->mem_map.n_segments), - "dyn remote info"); - if (!ctx->dynamic_remote_info) { - tl_error(UCC_TASK_LIB(task), "Out of Memory"); - return UCC_ERR_NO_MEMORY; - } - for (i = 0; i < coll_args->mem_map.n_segments; i++) { - /* map the buffer and populate the dynamic_remote_info segments */ - status = ucc_tl_ucp_memmap_append_segment( - task, &coll_args->mem_map.segments[i], starting_index + i); - if (status != UCC_OK) { - tl_error(UCC_TASK_LIB(task), "failed to memory map a segment"); - goto failed_memory_map; - } - seg_pack_size += - sizeof(uint64_t) * 3 + - ctx->dynamic_remote_info[starting_index + i].packed_key_len; + for (i = 0; i < ctx->n_dynrinfo_segs; i++) { + seg_pack_size += sizeof(uint64_t) * 3 + + ctx->dynamic_remote_info[i].packed_key_len; } - global_size = ucc_calloc(core_team->size, sizeof(size_t)); + global_size = ucc_calloc(core_team->size, sizeof(uint64_t)); if (!global_size) { tl_error(UCC_TASK_LIB(task), "Out of Memory"); - goto failed_memory_map; + return UCC_ERR_NO_MEMORY; } /* allgather on the new segments size */ status = ucc_service_allgather(core_team, &seg_pack_size, global_size, sizeof(uint64_t), subset, &scoll_req); if (status < UCC_OK) { - tl_error(UCC_TASK_LIB(task), "failed to perform a service allgather"); - ucc_free(global_size); - goto failed_memory_map; + tl_error(UCC_TASK_LIB(task), + "failed to perform a service allgather"); + goto failed_size_exch; } while (UCC_INPROGRESS == (status = ucc_service_coll_test(scoll_req))) { } if (status < UCC_OK) { tl_error(UCC_TASK_LIB(task), "failed on the allgather"); ucc_service_coll_finalize(scoll_req); - ucc_free(global_size); - goto failed_memory_map; + goto failed_size_exch; } ucc_service_coll_finalize(scoll_req); for (i = 0; i < core_team->size; i++) { @@ -327,126 +345,105 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segments(ucc_coll_args_t *coll_args, } } ucc_free(global_size); + global_size = NULL; /* pack the dynamic_remote_info segments */ - ctx->n_dynrinfo_segs += coll_args->mem_map.n_segments; ex_buffer = ucc_malloc(seg_pack_size, "ex pack size"); if (!ex_buffer) { tl_error(UCC_TASK_LIB(task), "Out of Memory"); status = UCC_ERR_NO_MEMORY; - goto failed_memory_map; + goto failed_data_exch; } - ucc_tl_ucp_pack_data(ctx, starting_index, ex_buffer); + ucc_tl_ucp_pack_data(ctx, ex_buffer); - old_offset = ctx->dyn_seg.buff_size; - ctx->dyn_seg.buff_size += seg_pack_size * core_team->size; - ctx->dyn_seg.dyn_buff = ucc_realloc(ctx->dyn_seg.dyn_buff, - ctx->dyn_seg.buff_size, "dyn buff"); - if (!ctx->dyn_seg.dyn_buff) { + ctx->dyn_seg_buf = ucc_calloc(1, team_size * seg_pack_size, "dyn buff"); + if (!ctx->dyn_seg_buf) { status = UCC_ERR_NO_MEMORY; tl_error(UCC_TASK_LIB(task), "Out of Memory"); - goto failed_memory_map; - } - ctx->dyn_seg.seg_groups = ucc_realloc( - ctx->dyn_seg.seg_groups, sizeof(uint64_t) * ctx->n_dynrinfo_segs, - "n_dynrinfo_segs"); - if (!ctx->dyn_seg.seg_groups) { - status = UCC_ERR_NO_MEMORY; - tl_error(UCC_TASK_LIB(task), "Out of Memory"); - goto failed_memory_map; - } - ctx->dyn_seg.seg_group_start = ucc_realloc( - ctx->dyn_seg.seg_group_start, - sizeof(uint64_t) * ctx->n_dynrinfo_segs, "n_dynrinfo_segs"); - if (!ctx->dyn_seg.seg_group_start) { - status = UCC_ERR_NO_MEMORY; - tl_error(UCC_TASK_LIB(task), "Out of Memory"); - goto failed_memory_map; - } - ctx->dyn_seg.seg_group_size = ucc_realloc( - ctx->dyn_seg.seg_group_size, - sizeof(uint64_t) * ctx->dyn_seg.num_groups + 1, "n_dynrinfo_segs"); - if (!ctx->dyn_seg.seg_group_size) { - status = UCC_ERR_NO_MEMORY; - tl_error(UCC_TASK_LIB(task), "Out of Memory"); - goto failed_memory_map; - } - - ctx->dyn_seg.starting_seg = ucc_realloc( - ctx->dyn_seg.starting_seg, sizeof(uint64_t) * ctx->n_dynrinfo_segs, - "n_dynrinfo_segs"); - if (!ctx->dyn_seg.starting_seg) { - status = UCC_ERR_NO_MEMORY; - tl_error(UCC_TASK_LIB(task), "Out of Memory"); - goto failed_memory_map; - } - ctx->dyn_seg.num_seg_per_group = ucc_realloc( - ctx->dyn_seg.num_seg_per_group, - sizeof(uint64_t) * ctx->dyn_seg.num_groups + 1, "n_dynrinfo_segs"); - if (!ctx->dyn_seg.num_seg_per_group) { - status = UCC_ERR_NO_MEMORY; - tl_error(UCC_TASK_LIB(task), "Out of Memory"); - goto failed_memory_map; - } - - ctx->dyn_seg.num_groups += 1; - ctx->dyn_seg.num_seg_per_group[ctx->dyn_seg.num_groups - 1] = - coll_args->mem_map.n_segments; - ctx->dyn_seg.seg_group_size[ctx->dyn_seg.num_groups - 1] = - seg_pack_size; - if (starting_index == 0) { - for (i = starting_index; i < ctx->n_dynrinfo_segs; i++) { - ctx->dyn_seg.seg_groups[i] = 0; - ctx->dyn_seg.seg_group_start[i] = 0; - ctx->dyn_seg.starting_seg[i] = starting_index; - } - } else { - for (i = starting_index; i < ctx->n_dynrinfo_segs; i++) { - ctx->dyn_seg.seg_groups[i] = - ctx->dyn_seg.seg_groups[starting_index - 1] + 1; - ctx->dyn_seg.seg_group_start[i] = old_offset; - ctx->dyn_seg.starting_seg[i] = starting_index; - } + goto failed_data_exch; } /* allgather on the new segments (packed) */ - status = ucc_service_allgather( - core_team, ex_buffer, PTR_OFFSET(ctx->dyn_seg.dyn_buff, old_offset), - seg_pack_size, subset, &scoll_req); + status = ucc_service_allgather(core_team, ex_buffer, ctx->dyn_seg_buf, + seg_pack_size, subset, &scoll_req); if (status < UCC_OK) { tl_error(UCC_TASK_LIB(task), "failed on the allgather"); - goto failed_memory_map; + goto failed_data_exch; } while (UCC_INPROGRESS == (status = ucc_service_coll_test(scoll_req))) { } if (status < UCC_OK) { tl_error(UCC_TASK_LIB(task), "failed on the allgather"); ucc_service_coll_finalize(scoll_req); - goto failed_memory_map; + goto failed_data_exch; } /* done with allgather */ ucc_service_coll_finalize(scoll_req); - ctx->rkeys = ucc_realloc(ctx->rkeys, - team_size * sizeof(ucp_rkey_h) * - (ctx->n_rinfo_segs + ctx->n_dynrinfo_segs), - "rkeys"); - memset(PTR_OFFSET(ctx->rkeys, team_size * sizeof(ucp_rkey_h) * - (ctx->n_rinfo_segs + starting_index)), - 0, - team_size * sizeof(ucp_rkey_h) * coll_args->mem_map.n_segments); + ctx->dyn_rkeys = + ucc_calloc(1, team_size * sizeof(ucp_rkey_h) * ctx->n_dynrinfo_segs, + "dyn rkeys"); + if (!ctx->dyn_rkeys) { + tl_error(UCC_TASK_LIB(task), "failed to allocate space for keys"); + status = UCC_ERR_NO_MEMORY; + goto failed_data_exch; + } ucc_free(ex_buffer); } return UCC_OK; -failed_memory_map: - for (i = 0; i < coll_args->mem_map.n_segments; i++) { - if (ctx->dynamic_remote_info[ctx->n_dynrinfo_segs + i].mem_h) { - ucp_mem_unmap(ctx->worker.ucp_context, ctx->dynamic_remote_info[ctx->n_dynrinfo_segs + i].mem_h); +failed_data_exch: + if (ctx->dyn_seg_buf) { + ucc_free(ctx->dyn_seg_buf); + ctx->dyn_seg_buf = NULL; + } + if (ex_buffer) { + ucc_free(ex_buffer); + } +failed_size_exch: + if (!global_size) { + ucc_free(global_size); + } + return status; +} + +void ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task) +{ + ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); + ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + int i = 0; + int j = 0; + /* free library resources, unmap user resources */ + if (ctx->dyn_seg_buf) { + /* unmap and release packed buffers */ + for (i = 0; i < ctx->n_dynrinfo_segs; i++) { + if (ctx->dynamic_remote_info[i].mem_h) { + ucp_mem_unmap(ctx->worker.ucp_context, + ctx->dynamic_remote_info[i].mem_h); + } + if (ctx->dynamic_remote_info[i].packed_key) { + ucp_rkey_buffer_release(ctx->dynamic_remote_info[i].packed_key); + } + if (ctx->dynamic_remote_info[i].packed_memh) { + ucp_rkey_buffer_release( + ctx->dynamic_remote_info[i].packed_memh); + } } - if (ctx->dynamic_remote_info[ctx->n_dynrinfo_segs + i].packed_key) { - ucp_rkey_buffer_release(ctx->dynamic_remote_info[ctx->n_dynrinfo_segs + i].packed_key); + /* destroy rkeys */ + for (i = 0; i < UCC_TL_TEAM_SIZE(tl_team); i++) { + for (j = 0; j < ctx->n_dynrinfo_segs; j++) { + if (UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, i, j)) { + ucp_rkey_destroy(UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, i, j)); + } + } } + free(ctx->dynamic_remote_info); + free(ctx->dyn_rkeys); + free(ctx->dyn_seg_buf); + + ctx->dynamic_remote_info = NULL; + ctx->dyn_rkeys = NULL; + ctx->dyn_seg_buf = NULL; + ctx->n_dynrinfo_segs = 0; } - return status; } ucc_status_t ucc_tl_ucp_coll_init(ucc_base_coll_args_t *coll_args, diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index 69944b7420..795d14e13b 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -491,7 +491,11 @@ ucc_tl_ucp_get_radix_from_range(ucc_tl_ucp_team_t *team, return radix; } -ucc_status_t ucc_tl_ucp_coll_dynamic_segments(ucc_coll_args_t *coll_args, - ucc_tl_ucp_task_t *task); +ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, + ucc_tl_ucp_task_t *task); + +ucc_status_t ucc_tl_ucp_coll_dynamic_segment_exchange(ucc_tl_ucp_task_t *task); + +void ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task); #endif diff --git a/src/components/tl/ucp/tl_ucp_context.c b/src/components/tl/ucp/tl_ucp_context.c index 4516814bcc..7ad94560ab 100644 --- a/src/components/tl/ucp/tl_ucp_context.c +++ b/src/components/tl/ucp/tl_ucp_context.c @@ -161,18 +161,16 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t, UCP_CHECK(ucp_config_read(prefix, NULL, &ucp_config), "failed to read ucp configuration", err_cfg_read, self); - ucp_params.field_mask = - UCP_PARAM_FIELD_FEATURES | UCP_PARAM_FIELD_TAG_SENDER_MASK | UCP_PARAM_FIELD_NAME; - ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_AM; - if (((params->params.mask & UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS) || - lib->cfg.use_dynamic_segments)) { - ucp_params.features |= UCP_FEATURE_RMA | UCP_FEATURE_AMO64; - } + ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES | + UCP_PARAM_FIELD_TAG_SENDER_MASK | + UCP_PARAM_FIELD_NAME; + ucp_params.features = + UCP_FEATURE_TAG | UCP_FEATURE_AM | UCP_FEATURE_RMA | UCP_FEATURE_AMO64; if (lib->cfg.use_xgvmi) { ucp_params.features |= UCP_FEATURE_EXPORTED_MEMH; } ucp_params.tag_sender_mask = UCC_TL_UCP_TAG_SENDER_MASK; - ucp_params.name = "UCC_UCP_CONTEXT"; + ucp_params.name = "UCC_UCP_CONTEXT"; if (params->estimated_num_ppn > 0) { ucp_params.field_mask |= UCP_PARAM_FIELD_ESTIMATED_NUM_PPN; @@ -253,10 +251,11 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t, self->remote_info = NULL; self->dynamic_remote_info = NULL; + self->dyn_seg_buf = NULL; self->n_rinfo_segs = 0; self->n_dynrinfo_segs = 0; self->rkeys = NULL; - memset(&self->dyn_seg, 0, sizeof(self->dyn_seg)); + self->dyn_rkeys = NULL; if (params->params.mask & UCC_CONTEXT_PARAM_FIELD_MEM_PARAMS && params->params.mask & UCC_CONTEXT_PARAM_FIELD_OOB) { ucc_status = ucc_tl_ucp_ctx_remote_populate( @@ -345,12 +344,6 @@ ucc_status_t ucc_tl_ucp_rinfo_destroy(ucc_tl_ucp_context_t *ctx) ucp_rkey_destroy(UCC_TL_UCP_REMOTE_RKEY(ctx, i, j)); } } - for (j = 0; j < ctx->n_dynrinfo_segs; j++) { - if (UCC_TL_UCP_REMOTE_RKEY(ctx, i, ctx->n_rinfo_segs + j)) { - ucp_rkey_destroy( - UCC_TL_UCP_REMOTE_RKEY(ctx, i, ctx->n_rinfo_segs + j)); - } - } } for (i = 0; i < ctx->n_rinfo_segs; i++) { if (ctx->remote_info[i].mem_h) { @@ -366,9 +359,6 @@ ucc_status_t ucc_tl_ucp_rinfo_destroy(ucc_tl_ucp_context_t *ctx) ucp_mem_unmap(ctx->worker.ucp_context, ctx->dynamic_remote_info[i].mem_h); } - if (ctx->dynamic_remote_info[i].packed_key) { - ucp_rkey_buffer_release(ctx->dynamic_remote_info[i].packed_key); - } } ucc_free(ctx->dynamic_remote_info); } @@ -376,7 +366,6 @@ ucc_status_t ucc_tl_ucp_rinfo_destroy(ucc_tl_ucp_context_t *ctx) ucc_free(ctx->rkeys); ctx->remote_info = NULL; ctx->rkeys = NULL; - ctx->dynamic_remote_info = NULL; return UCC_OK; } diff --git a/src/components/tl/ucp/tl_ucp_sendrecv.h b/src/components/tl/ucp/tl_ucp_sendrecv.h index 70cdafa689..ff5216731f 100644 --- a/src/components/tl/ucp/tl_ucp_sendrecv.h +++ b/src/components/tl/ucp/tl_ucp_sendrecv.h @@ -225,6 +225,25 @@ static inline ucc_status_t ucc_tl_ucp_send_nz(void *buffer, size_t msglen, dest_group_rank, team, task); } +static inline int resolve_segment(const void *va, size_t *key_sizes, + ptrdiff_t *key_offset, size_t nr_segments, + ucc_tl_ucp_remote_info_t *rinfo) +{ + int i; + uint64_t base; + uint64_t end; + + for (i = 0; i < nr_segments; i++) { + base = (uint64_t)rinfo[i].va_base; + end = base + rinfo[i].len; + if ((uint64_t)va >= base && (uint64_t)va < end) { + return i; + } + *key_offset += key_sizes[i]; + } + return -1; +} + static inline ucc_status_t ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, ucp_ep_h *ep, ucc_rank_t peer, uint64_t *rva, @@ -232,14 +251,13 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, { ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team); ptrdiff_t key_offset = 0; - const size_t section_offset = sizeof(uint64_t) * ctx->n_rinfo_segs; + size_t section_offset = sizeof(uint64_t) * ctx->n_rinfo_segs; ucc_rank_t core_rank; uint64_t *rvas; uint64_t *key_sizes; void *keys; void *offset; ptrdiff_t base_offset; - int i; *segment = -1; core_rank = ucc_ep_map_eval(UCC_TL_TEAM_MAP(team), peer); @@ -253,69 +271,51 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, rvas = (uint64_t *)base_offset; key_sizes = PTR_OFFSET(base_offset, (section_offset * 2)); keys = PTR_OFFSET(base_offset, (section_offset * 3)); - for (i = 0; i < ctx->n_rinfo_segs; i++) { - uint64_t base = (uint64_t)ctx->remote_info[i].va_base; - uint64_t end = base + ctx->remote_info[i].len; - if ((uint64_t)va >= base && (uint64_t)va < end) { - *segment = i; - *rva = rvas[i] + ((uint64_t)va - (uint64_t)base); - if (ucc_unlikely(NULL == - UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment))) { - ucs_status_t ucs_status = ucp_ep_rkey_unpack( - *ep, PTR_OFFSET(keys, key_offset), - &UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment)); - if (UCS_OK != ucs_status) { - return ucs_status_to_ucc_status(ucs_status); - } + *segment = resolve_segment(va, key_sizes, &key_offset, ctx->n_rinfo_segs, + ctx->remote_info); + if (*segment >= 0) { + *rva = rvas[*segment] + + ((uint64_t)va - (uint64_t)ctx->remote_info[*segment].va_base); + *packed_memh = (ctx->remote_info[*segment].packed_memh) + ? ctx->remote_info[*segment].mem_h + : NULL; + if (ucc_unlikely(NULL == UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment))) { + ucs_status_t ucs_status = ucp_ep_rkey_unpack( + *ep, PTR_OFFSET(keys, key_offset), + &UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment)); + if (UCS_OK != ucs_status) { + return ucs_status_to_ucc_status(ucs_status); } - *rkey = UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment); - *packed_memh = (ctx->remote_info[i].packed_memh) - ? ctx->remote_info[i].mem_h - : NULL; - return UCC_OK; } - key_offset += key_sizes[i]; + *rkey = UCC_TL_UCP_REMOTE_RKEY(ctx, peer, *segment); + return UCC_OK; } - if (0 > *segment) { - key_offset = 0; - for (i = 0; i < ctx->n_dynrinfo_segs; i++) { - uint64_t base = (uint64_t)ctx->dynamic_remote_info[i].va_base; - uint64_t end = base + ctx->dynamic_remote_info[i].len; - uint64_t check_base = (uint64_t)va; - uint64_t check_end = check_base + msglen; - size_t num_keys = 0; - void *packed_key = NULL; - size_t team_size = UCC_TL_TEAM_SIZE(team); - if (check_base >= base && check_base < end && check_end <= end) { - *segment = i; - *rva = UCC_TL_UCP_REMOTE_DYN_RVA(ctx, peer, i); - num_keys = *segment - ctx->dyn_seg.starting_seg[*segment]; - for (int j = 0; j < num_keys; j++) { - key_offset += UCC_TL_UCP_REMOTE_DYN_KEY_SIZE( - ctx, peer, ctx->dyn_seg.starting_seg[*segment] + j); - } - packed_key = - UCC_TL_UCP_REMOTE_DYN_KEY(ctx, peer, key_offset, *segment); - /* dynamic segment keys should be placed AFTER - * the ctx's keys (i.e., num_static_segs + segment_number) */ - if (ucc_unlikely(NULL == UCC_TL_UCP_DYN_REMOTE_RKEY( - ctx, peer, team_size, *segment))) { - ucs_status_t ucs_status = - ucp_ep_rkey_unpack(*ep, packed_key, - &UCC_TL_UCP_DYN_REMOTE_RKEY( - ctx, peer, team_size, *segment)); - if (UCS_OK != ucs_status) { - return ucs_status_to_ucc_status(ucs_status); - } - } - *rkey = - UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, team_size, *segment); - *packed_memh = (ctx->dynamic_remote_info[i].packed_memh) - ? ctx->dynamic_remote_info[i].mem_h - : NULL; - return UCC_OK; + + section_offset = sizeof(uint64_t) * ctx->n_dynrinfo_segs; + base_offset = (ptrdiff_t)(ctx->dyn_seg_buf); + rvas = (uint64_t *)base_offset; + key_sizes = PTR_OFFSET(base_offset, (section_offset * 2)); + keys = PTR_OFFSET(base_offset, (section_offset * 3)); + *segment = resolve_segment(va, key_sizes, &key_offset, ctx->n_dynrinfo_segs, + ctx->dynamic_remote_info); + if (*segment >= 0) { + *rva = rvas[*segment] + + ((uint64_t)va - + (uint64_t)ctx->dynamic_remote_info[*segment].va_base); + *packed_memh = (ctx->dynamic_remote_info[*segment].packed_memh) + ? ctx->dynamic_remote_info[*segment].mem_h + : NULL; + if (ucc_unlikely(NULL == + UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, *segment))) { + ucs_status_t ucs_status = ucp_ep_rkey_unpack( + *ep, PTR_OFFSET(keys, key_offset), + &UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, *segment)); + if (UCS_OK != ucs_status) { + return ucs_status_to_ucc_status(ucs_status); } } + *rkey = UCC_TL_UCP_DYN_REMOTE_RKEY(ctx, peer, *segment); + return UCC_OK; } tl_error( diff --git a/src/ucc/api/ucc.h b/src/ucc/api/ucc.h index 6d28276683..49a21f7401 100644 --- a/src/ucc/api/ucc.h +++ b/src/ucc/api/ucc.h @@ -890,15 +890,25 @@ typedef struct ucc_oob_coll { typedef ucc_oob_coll_t ucc_context_oob_coll_t; typedef ucc_oob_coll_t ucc_team_oob_coll_t; +/** + * @ingroup UCC_CONTEXT_DT + */ +typedef enum { + UCC_MEM_MAP_TYPE_SEND_BUF, + UCC_MEM_MAP_TYPE_RECV_BUF, + UCC_MEM_MAP_TYPE_SEND_RECV_BUF, +} ucc_mem_map_usage_t; + /** * * @ingroup UCC_CONTEXT_DT */ typedef struct ucc_mem_map { - void * address; /*!< the address of a buffer to be attached to + void *address; /*!< the address of a buffer to be attached to a UCC context */ - size_t len; /*!< the length of the buffer */ - void * resource; /*!< resource associated with the address. + size_t len; /*!< the length of the buffer */ + ucc_mem_map_usage_t type; /*!< the usage type of buffer being mapped. */ + void *resource; /*!< resource associated with the address. examples of resources include memory keys. */ } ucc_mem_map_t; @@ -1873,25 +1883,19 @@ typedef struct ucc_coll_args { to 0. */ ucc_coll_callback_t cb; double timeout; /*!< Timeout in seconds */ - ucc_mem_map_params_t mem_map; /*!< Memory regions to be used - for the current and/or - future one-sided collectives. - If set, the designated regions - will be mapped and information - exchanged with the team - associated with the collective - via an allgather operation. - Memory is unmapped during - context destruction. - It is recommended to use this - option sparingly due to the - increased overhead. Not necessary - for two-sided collectives. */ struct { uint64_t start; int64_t stride; uint64_t size; } active_set; + ucc_mem_map_params_t mem_map; /*!< Memory regions to be used + for the current collective. + If set, the designated regions + will be mapped and information + exchanged. Memory is unmapped + at collective completion. Not + necessary for two-sided + collectives. */ } ucc_coll_args_t; /** diff --git a/src/ucc/api/ucc_status.h b/src/ucc/api/ucc_status.h index 90d25b463b..49ab4e1d7e 100644 --- a/src/ucc/api/ucc_status.h +++ b/src/ucc/api/ucc_status.h @@ -40,6 +40,7 @@ typedef enum { UCC_ERR_NO_MESSAGE = -6, /*!< General purpose return code without specific error */ UCC_ERR_NOT_FOUND = -7, UCC_ERR_TIMED_OUT = -8, + UCC_ERR_MEM_MAP_FAILURE = -9, UCC_ERR_LAST = -100, } ucc_status_t; diff --git a/src/utils/ucc_status.c b/src/utils/ucc_status.c index 2e2285eb61..430d4a96d5 100644 --- a/src/utils/ucc_status.c +++ b/src/utils/ucc_status.c @@ -34,6 +34,8 @@ const char *ucc_status_string(ucc_status_t status) return "Not found"; case UCC_ERR_TIMED_OUT: return "Timeout expired"; + case UCC_ERR_MEM_MAP_FAILURE: + return "Failed to memory map address"; default: snprintf(error_str, sizeof(error_str) - 1, "Unknown error %d", status); return error_str; From f66dad775e438406dc6037021318de4bb396a4e5 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Mon, 29 Jul 2024 13:34:59 -0700 Subject: [PATCH 10/14] TL/UCP: ensure backwards compatibility --- src/components/tl/ucp/tl_ucp_context.c | 53 +++++++++++--------------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/src/components/tl/ucp/tl_ucp_context.c b/src/components/tl/ucp/tl_ucp_context.c index 7ad94560ab..b5d9955e29 100644 --- a/src/components/tl/ucp/tl_ucp_context.c +++ b/src/components/tl/ucp/tl_ucp_context.c @@ -455,6 +455,8 @@ ucc_status_t ucc_tl_ucp_ctx_remote_populate(ucc_tl_ucp_context_t * ctx, ucc_mem_map_params_t map, ucc_context_oob_coll_t oob) { + ucc_tl_ucp_lib_t *lib = + ucc_derived_of(ctx->super.super.lib, ucc_tl_ucp_lib_t); uint32_t size = oob.n_oob_eps; uint64_t nsegs = map.n_segments; ucp_mem_map_params_t mmap_params; @@ -465,72 +467,61 @@ ucc_status_t ucc_tl_ucp_ctx_remote_populate(ucc_tl_ucp_context_t * ctx, if (size < 2) { tl_error( - ctx->super.super.lib, + lib, "oob.n_oob_eps set to incorrect value for remote exchange (%d)", size); return UCC_ERR_INVALID_PARAM; } if (nsegs > MAX_NR_SEGMENTS) { - tl_error(ctx->super.super.lib, "cannot map more than %d segments", - MAX_NR_SEGMENTS); + tl_error(lib, "cannot map more than %d segments", MAX_NR_SEGMENTS); return UCC_ERR_INVALID_PARAM; } - ctx->rkeys = - (ucp_rkey_h *)ucc_calloc(sizeof(ucp_rkey_h), nsegs * size, "ucp_ctx_rkeys"); + ctx->rkeys = (ucp_rkey_h *)ucc_calloc(sizeof(ucp_rkey_h), nsegs * size, + "ucp_ctx_rkeys"); if (NULL == ctx->rkeys) { - tl_error(ctx->super.super.lib, "failed to allocated %zu bytes", + tl_error(lib, "failed to allocated %zu bytes", sizeof(ucp_rkey_h) * nsegs * size); return UCC_ERR_NO_MEMORY; } ctx->remote_info = (ucc_tl_ucp_remote_info_t *)ucc_calloc( nsegs, sizeof(ucc_tl_ucp_remote_info_t), "ucp_remote_info"); if (NULL == ctx->remote_info) { - tl_error(ctx->super.super.lib, "failed to allocated %zu bytes", + tl_error(lib, "failed to allocated %zu bytes", sizeof(ucc_tl_ucp_remote_info_t) * nsegs); ucc_status = UCC_ERR_NO_MEMORY; goto fail_alloc_remote_segs; } for (i = 0; i < nsegs; i++) { - if (map.segments[i].resource == NULL) { + if (lib->cfg.use_xgvmi == 0 || + (lib->cfg.use_xgvmi == 1 && map.segments[i].resource == NULL)) { mmap_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH; mmap_params.address = map.segments[i].address; mmap_params.length = map.segments[i].len; - - status = ucp_mem_map(ctx->worker.ucp_context, &mmap_params, &mh); - if (UCS_OK != status) { - tl_error(ctx->super.super.lib, - "ucp_mem_map failed with error code: %d", status); - ucc_status = ucs_status_to_ucc_status(status); - goto fail_mem_map; - } - ctx->remote_info[i].packed_memh = NULL; } else { mmap_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_EXPORTED_MEMH_BUFFER; mmap_params.exported_memh_buffer = map.segments[i].resource; - - status = ucp_mem_map(ctx->worker.ucp_context, &mmap_params, &mh); - if (status == UCS_ERR_UNREACHABLE) { - tl_error(ctx->super.super.lib, "exported memh is unsupported"); - ucc_status = ucs_status_to_ucc_status(status); - goto fail_mem_map; - } else if (status < UCS_OK) { - tl_error(ctx->super.super.lib, - "ucp_mem_map failed with error code: %d", status); - ucc_status = ucs_status_to_ucc_status(status); - goto fail_mem_map; - } + } + status = ucp_mem_map(ctx->worker.ucp_context, &mmap_params, &mh); + if (UCS_OK != status) { + tl_error(lib, "ucp_mem_map failed with error code: %d", status); + ucc_status = ucs_status_to_ucc_status(status); + goto fail_mem_map; + } + if (lib->cfg.use_xgvmi && map.segments[i].resource != NULL) { ctx->remote_info[i].packed_memh = map.segments[i].resource; + } else { + ctx->remote_info[i].packed_memh = NULL; } + ctx->remote_info[i].mem_h = (void *)mh; status = ucp_rkey_pack(ctx->worker.ucp_context, mh, &ctx->remote_info[i].packed_key, &ctx->remote_info[i].packed_key_len); if (UCS_OK != status) { - tl_error(ctx->super.super.lib, - "failed to pack UCP key with error code: %d", status); + tl_error(lib, "failed to pack UCP key with error code: %d", status); ucc_status = ucs_status_to_ucc_status(status); goto fail_mem_map; } From 2d98f076a309bbecd8cc151d8239a8cf2ff6e5b0 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Wed, 31 Jul 2024 23:12:18 -0700 Subject: [PATCH 11/14] TL/UCP: fix dynamic segment lookup --- src/components/tl/ucp/tl_ucp.h | 1 + src/components/tl/ucp/tl_ucp_coll.c | 122 +++++++++++++++++------- src/components/tl/ucp/tl_ucp_sendrecv.h | 2 +- 3 files changed, 89 insertions(+), 36 deletions(-) diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 6a4f40ed25..c3c8323272 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -130,6 +130,7 @@ typedef struct ucc_tl_ucp_context { ucc_tl_ucp_remote_info_t *dynamic_remote_info; void *dyn_seg_buf; ucp_rkey_h *dyn_rkeys; + size_t dyn_seg_size; size_t n_dynrinfo_segs; } ucc_tl_ucp_context_t; UCC_CLASS_DECLARE(ucc_tl_ucp_context_t, const ucc_base_context_params_t *, diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 013ffbfeb4..553544cde6 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -234,45 +234,95 @@ ucc_status_t ucc_tl_ucp_memmap_segment(ucc_tl_ucp_task_t *task, ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, ucc_tl_ucp_task_t *task) { - ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); - ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); - int i = 0; - ucc_status_t status; + ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); + ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + int i = 0; + uint64_t need_map = 0x7; ucc_mem_map_t *maps = coll_args->mem_map.segments; - size_t n_segments = coll_args->mem_map.n_segments; + ucc_mem_map_t *seg_maps = NULL; + size_t n_segments = 3; + ucc_status_t status; - if (n_segments == 0) { - maps = ucc_calloc(2, sizeof(ucc_mem_map_t)); - if (!maps) { - return UCC_ERR_NO_MEMORY; - } - maps[0].address = coll_args->src.info.buffer; - maps[0].len = (coll_args->src.info.count / UCC_TL_TEAM_SIZE(tl_team)) * - ucc_dt_size(coll_args->src.info.datatype); - maps[0].resource = NULL; + /* check if src, dst, global work in ctx mapped segments */ + for (i = 0; i < ctx->n_rinfo_segs && n_segments > 0; i++) { + uint64_t base = (uint64_t)ctx->remote_info[i].va_base; + uint64_t end = (uint64_t)(base + ctx->remote_info[i].len); + if ((uint64_t)coll_args->src.info.buffer >= base && + (uint64_t)coll_args->src.info.buffer < end) { + // found it + need_map ^= 1; + --n_segments; + } + if ((uint64_t)coll_args->dst.info.buffer >= base && + (uint64_t)coll_args->dst.info.buffer < end) { + // found it + need_map ^= 2; + --n_segments; + } - maps[1].address = coll_args->dst.info.buffer; - maps[1].len = (coll_args->dst.info.count / UCC_TL_TEAM_SIZE(tl_team)) * - ucc_dt_size(coll_args->dst.info.datatype); - maps[1].resource = NULL; + if ((uint64_t)coll_args->global_work_buffer >= base && + (uint64_t)coll_args->global_work_buffer < end) { + // found it + need_map ^= 4; + --n_segments; + } - n_segments = 2; + if (n_segments == 0) { + break; + } } - ctx->dynamic_remote_info = - ucc_calloc(n_segments, sizeof(ucc_tl_ucp_remote_info_t), "dynamic remote info"); - /* map memory and fill in local segment information */ - for (i = 0; i < n_segments; i++) { - status = ucc_tl_ucp_memmap_segment(task, &maps[i], i); - if (status != UCC_OK) { - tl_error(UCC_TASK_LIB(task), "failed to memory map a segment"); - goto failed_memory_map; + /* add any valid segments */ + if (n_segments > 0) { + int index = 0; + seg_maps = ucc_calloc(n_segments, sizeof(ucc_mem_map_t)); + if (!seg_maps) { + return UCC_ERR_NO_MEMORY; + } + + if (need_map & 0x1) { + seg_maps[index].address = coll_args->src.info.buffer; + seg_maps[index].len = (coll_args->src.info.count) * + ucc_dt_size(coll_args->src.info.datatype); + seg_maps[index++].resource = NULL; + } + if (need_map & 0x2) { + seg_maps[index].address = coll_args->dst.info.buffer; + seg_maps[index].len = (coll_args->dst.info.count) * + ucc_dt_size(coll_args->dst.info.datatype); + seg_maps[index++].resource = NULL; + } + if (need_map & 0x4) { + seg_maps[index].address = coll_args->global_work_buffer; + seg_maps[index].len = (ONESIDED_SYNC_SIZE + ONESIDED_REDUCE_SIZE) * sizeof(long); + seg_maps[index++].resource = NULL; } - ++ctx->n_dynrinfo_segs; } - if (coll_args->mem_map.n_segments == 0) { - free(maps); + + if (n_segments > 0) { + ctx->dynamic_remote_info = + ucc_calloc(n_segments, sizeof(ucc_tl_ucp_remote_info_t), "dynamic remote info"); + /* map memory and fill in local segment information */ + for (i = 0; i < n_segments; i++) { + status = ucc_tl_ucp_memmap_segment(task, &seg_maps[i], i); + if (status != UCC_OK) { + tl_error(UCC_TASK_LIB(task), "failed to memory map a segment"); + goto failed_memory_map; + } + ++ctx->n_dynrinfo_segs; + } + for (i = 0; i < coll_args->mem_map.n_segments; i++) { + status = ucc_tl_ucp_memmap_segment(task, &maps[i], i + n_segments); + if (status != UCC_OK) { + tl_error(UCC_TASK_LIB(task), "failed to memory map a segment"); + goto failed_memory_map; + } + ++ctx->n_dynrinfo_segs; + } + if (n_segments) { + free(seg_maps); + } } return UCC_OK; failed_memory_map: @@ -289,8 +339,8 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, } } ctx->n_dynrinfo_segs = 0; - if (coll_args->mem_map.n_segments == 0) { - free(maps); + if (n_segments) { + ucc_free(seg_maps); } return status; } @@ -387,6 +437,7 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_exchange(ucc_tl_ucp_task_t *task) status = UCC_ERR_NO_MEMORY; goto failed_data_exch; } + ctx->dyn_seg_size = seg_pack_size; ucc_free(ex_buffer); } return UCC_OK; @@ -435,13 +486,14 @@ void ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task) } } } - free(ctx->dynamic_remote_info); - free(ctx->dyn_rkeys); - free(ctx->dyn_seg_buf); + ucc_free(ctx->dynamic_remote_info); + ucc_free(ctx->dyn_rkeys); + ucc_free(ctx->dyn_seg_buf); ctx->dynamic_remote_info = NULL; ctx->dyn_rkeys = NULL; ctx->dyn_seg_buf = NULL; + ctx->dyn_seg_size = 0; ctx->n_dynrinfo_segs = 0; } } diff --git a/src/components/tl/ucp/tl_ucp_sendrecv.h b/src/components/tl/ucp/tl_ucp_sendrecv.h index ff5216731f..1575f902a3 100644 --- a/src/components/tl/ucp/tl_ucp_sendrecv.h +++ b/src/components/tl/ucp/tl_ucp_sendrecv.h @@ -292,7 +292,7 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, } section_offset = sizeof(uint64_t) * ctx->n_dynrinfo_segs; - base_offset = (ptrdiff_t)(ctx->dyn_seg_buf); + base_offset = (ptrdiff_t)(ctx->dyn_seg_buf + peer * ctx->dyn_seg_size); //PTR_OFFSET rvas = (uint64_t *)base_offset; key_sizes = PTR_OFFSET(base_offset, (section_offset * 2)); keys = PTR_OFFSET(base_offset, (section_offset * 3)); From 06aa1f3776f2abbb61d51dd1af2d80eb6067be1f Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Thu, 1 Aug 2024 13:19:03 -0700 Subject: [PATCH 12/14] REVIEW: address feedback --- src/components/tl/ucp/tl_ucp_coll.c | 16 ++++++++++++++-- src/components/tl/ucp/tl_ucp_coll.h | 2 +- src/components/tl/ucp/tl_ucp_sendrecv.h | 2 +- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 553544cde6..69d2dfb555 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -278,6 +278,7 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, int index = 0; seg_maps = ucc_calloc(n_segments, sizeof(ucc_mem_map_t)); if (!seg_maps) { + tl_error(UCC_TL_UCP_TEAM_LIB(tl_team), "Out of Memory"); return UCC_ERR_NO_MEMORY; } @@ -303,6 +304,11 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, if (n_segments > 0) { ctx->dynamic_remote_info = ucc_calloc(n_segments, sizeof(ucc_tl_ucp_remote_info_t), "dynamic remote info"); + if (!ctx->dynamic_remote_info) { + tl_error(UCC_TL_UCP_TEAM_LIB(tl_team), "Out of Memory"); + status = UCC_ERR_NO_MEMORY; + goto failed_memory_map; + } /* map memory and fill in local segment information */ for (i = 0; i < n_segments; i++) { status = ucc_tl_ucp_memmap_segment(task, &seg_maps[i], i); @@ -456,19 +462,24 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_exchange(ucc_tl_ucp_task_t *task) return status; } -void ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task) +ucc_status_t ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task) { ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); int i = 0; int j = 0; + ucs_status_t status; /* free library resources, unmap user resources */ if (ctx->dyn_seg_buf) { /* unmap and release packed buffers */ for (i = 0; i < ctx->n_dynrinfo_segs; i++) { if (ctx->dynamic_remote_info[i].mem_h) { - ucp_mem_unmap(ctx->worker.ucp_context, + status = ucp_mem_unmap(ctx->worker.ucp_context, ctx->dynamic_remote_info[i].mem_h); + if (UCS_OK != status) { + tl_error(UCC_TL_UCP_TEAM_LIB(tl_team), "Failed to unmap memory"); + return ucs_status_to_ucc_status(status); + } } if (ctx->dynamic_remote_info[i].packed_key) { ucp_rkey_buffer_release(ctx->dynamic_remote_info[i].packed_key); @@ -496,6 +507,7 @@ void ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task) ctx->dyn_seg_size = 0; ctx->n_dynrinfo_segs = 0; } + return UCC_OK; } ucc_status_t ucc_tl_ucp_coll_init(ucc_base_coll_args_t *coll_args, diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index 795d14e13b..c2063adaab 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -496,6 +496,6 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, ucc_status_t ucc_tl_ucp_coll_dynamic_segment_exchange(ucc_tl_ucp_task_t *task); -void ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task); +ucc_status_t ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task); #endif diff --git a/src/components/tl/ucp/tl_ucp_sendrecv.h b/src/components/tl/ucp/tl_ucp_sendrecv.h index 1575f902a3..26e5cf335c 100644 --- a/src/components/tl/ucp/tl_ucp_sendrecv.h +++ b/src/components/tl/ucp/tl_ucp_sendrecv.h @@ -292,7 +292,7 @@ ucc_tl_ucp_resolve_p2p_by_va(ucc_tl_ucp_team_t *team, void *va, size_t msglen, } section_offset = sizeof(uint64_t) * ctx->n_dynrinfo_segs; - base_offset = (ptrdiff_t)(ctx->dyn_seg_buf + peer * ctx->dyn_seg_size); //PTR_OFFSET + base_offset = (ptrdiff_t)PTR_OFFSET(ctx->dyn_seg_buf, peer * ctx->dyn_seg_size); rvas = (uint64_t *)base_offset; key_sizes = PTR_OFFSET(base_offset, (section_offset * 2)); keys = PTR_OFFSET(base_offset, (section_offset * 3)); From 0b81c8779d4b2bff0e6fd8164dcdfb143861ce45 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Mon, 19 Aug 2024 15:52:29 -0700 Subject: [PATCH 13/14] REVIEW: address feedback --- .../tl/ucp/alltoall/alltoall_onesided.c | 3 +- src/components/tl/ucp/tl_ucp.h | 6 ++++ src/components/tl/ucp/tl_ucp_coll.c | 34 +++++++++---------- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/components/tl/ucp/alltoall/alltoall_onesided.c b/src/components/tl/ucp/alltoall/alltoall_onesided.c index c8943e5b0b..ca0fa7c257 100644 --- a/src/components/tl/ucp/alltoall/alltoall_onesided.c +++ b/src/components/tl/ucp/alltoall/alltoall_onesided.c @@ -70,6 +70,5 @@ void ucc_tl_ucp_alltoall_onesided_progress(ucc_coll_task_t *ctask) } pSync[0] = 0; - task->super.status = UCC_OK; - ucc_tl_ucp_coll_dynamic_segment_finalize(task); + task->super.status = ucc_tl_ucp_coll_dynamic_segment_finalize(task); } diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index c3c8323272..25da807f66 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -36,6 +36,12 @@ #define ONESIDED_SYNC_SIZE 1 #define ONESIDED_REDUCE_SIZE 4 +typedef enum { + UCC_TL_UCP_DYN_SEG_UPDATE_SRC = UCC_BIT(0), + UCC_TL_UCP_DYN_SEG_UPDATE_DST = UCC_BIT(1), + UCC_TL_UCP_DYN_SEG_UPDATE_GLOBAL = UCC_BIT(2), +} ucc_tl_ucp_dynamic_segment_update_mask_t; + typedef struct ucc_tl_ucp_iface { ucc_tl_iface_t super; } ucc_tl_ucp_iface_t; diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 69d2dfb555..c93cc2be9b 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -237,13 +237,14 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); int i = 0; - uint64_t need_map = 0x7; ucc_mem_map_t *maps = coll_args->mem_map.segments; ucc_mem_map_t *seg_maps = NULL; size_t n_segments = 3; + uint64_t need_map = UCC_TL_UCP_DYN_SEG_UPDATE_SRC | + UCC_TL_UCP_DYN_SEG_UPDATE_DST | + UCC_TL_UCP_DYN_SEG_UPDATE_GLOBAL; ucc_status_t status; - /* check if src, dst, global work in ctx mapped segments */ for (i = 0; i < ctx->n_rinfo_segs && n_segments > 0; i++) { uint64_t base = (uint64_t)ctx->remote_info[i].va_base; @@ -251,23 +252,21 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, if ((uint64_t)coll_args->src.info.buffer >= base && (uint64_t)coll_args->src.info.buffer < end) { // found it - need_map ^= 1; + need_map ^= UCC_TL_UCP_DYN_SEG_UPDATE_SRC; --n_segments; } if ((uint64_t)coll_args->dst.info.buffer >= base && (uint64_t)coll_args->dst.info.buffer < end) { // found it - need_map ^= 2; + need_map ^= UCC_TL_UCP_DYN_SEG_UPDATE_DST; --n_segments; } - if ((uint64_t)coll_args->global_work_buffer >= base && (uint64_t)coll_args->global_work_buffer < end) { // found it - need_map ^= 4; + need_map ^= UCC_TL_UCP_DYN_SEG_UPDATE_GLOBAL; --n_segments; } - if (n_segments == 0) { break; } @@ -282,19 +281,19 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, return UCC_ERR_NO_MEMORY; } - if (need_map & 0x1) { + if (need_map & UCC_TL_UCP_DYN_SEG_UPDATE_SRC) { seg_maps[index].address = coll_args->src.info.buffer; seg_maps[index].len = (coll_args->src.info.count) * ucc_dt_size(coll_args->src.info.datatype); seg_maps[index++].resource = NULL; } - if (need_map & 0x2) { + if (need_map & UCC_TL_UCP_DYN_SEG_UPDATE_DST) { seg_maps[index].address = coll_args->dst.info.buffer; seg_maps[index].len = (coll_args->dst.info.count) * ucc_dt_size(coll_args->dst.info.datatype); seg_maps[index++].resource = NULL; } - if (need_map & 0x4) { + if (need_map & UCC_TL_UCP_DYN_SEG_UPDATE_GLOBAL) { seg_maps[index].address = coll_args->global_work_buffer; seg_maps[index].len = (ONESIDED_SYNC_SIZE + ONESIDED_REDUCE_SIZE) * sizeof(long); seg_maps[index++].resource = NULL; @@ -464,10 +463,11 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_exchange(ucc_tl_ucp_task_t *task) ucc_status_t ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task) { - ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); - ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); - int i = 0; - int j = 0; + ucc_tl_ucp_team_t *tl_team = UCC_TL_UCP_TASK_TEAM(task); + ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team); + int i = 0; + int j = 0; + ucc_status_t ret_status = UCC_OK; ucs_status_t status; /* free library resources, unmap user resources */ if (ctx->dyn_seg_buf) { @@ -475,10 +475,10 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task) for (i = 0; i < ctx->n_dynrinfo_segs; i++) { if (ctx->dynamic_remote_info[i].mem_h) { status = ucp_mem_unmap(ctx->worker.ucp_context, - ctx->dynamic_remote_info[i].mem_h); + ctx->dynamic_remote_info[i].mem_h); if (UCS_OK != status) { tl_error(UCC_TL_UCP_TEAM_LIB(tl_team), "Failed to unmap memory"); - return ucs_status_to_ucc_status(status); + ret_status = ucs_status_to_ucc_status(status); } } if (ctx->dynamic_remote_info[i].packed_key) { @@ -507,7 +507,7 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_finalize(ucc_tl_ucp_task_t *task) ctx->dyn_seg_size = 0; ctx->n_dynrinfo_segs = 0; } - return UCC_OK; + return ret_status; } ucc_status_t ucc_tl_ucp_coll_init(ucc_base_coll_args_t *coll_args, From d64ea2cb9a2072396adfc512c25eda4a4b2f1b46 Mon Sep 17 00:00:00 2001 From: ferrol aderholdt Date: Wed, 4 Sep 2024 11:27:25 -0700 Subject: [PATCH 14/14] REVIEW: address feedback --- src/components/tl/ucp/tl_ucp_coll.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index c93cc2be9b..3f13cb4e2a 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -244,6 +244,7 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, UCC_TL_UCP_DYN_SEG_UPDATE_DST | UCC_TL_UCP_DYN_SEG_UPDATE_GLOBAL; ucc_status_t status; + ucs_status_t ucs_status; /* check if src, dst, global work in ctx mapped segments */ for (i = 0; i < ctx->n_rinfo_segs && n_segments > 0; i++) { @@ -333,8 +334,11 @@ ucc_status_t ucc_tl_ucp_coll_dynamic_segment_init(ucc_coll_args_t *coll_args, failed_memory_map: for (i = 0; i < ctx->n_dynrinfo_segs; i++) { if (ctx->dynamic_remote_info[i].mem_h) { - ucp_mem_unmap(ctx->worker.ucp_context, - ctx->dynamic_remote_info[i].mem_h); + ucs_status = ucp_mem_unmap(ctx->worker.ucp_context, + ctx->dynamic_remote_info[i].mem_h); + if (UCS_OK != ucs_status) { + tl_error(UCC_TL_UCP_TEAM_LIB(tl_team), "Failed to unmap memory"); + } } if (ctx->dynamic_remote_info[i].packed_key) { ucp_rkey_buffer_release(ctx->dynamic_remote_info[i].packed_key);