Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Net CQs #101

Draft
wants to merge 9 commits into
base: mhba
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 37 additions & 25 deletions src/team_lib/mhba/xccl_mhba_collective.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <xccl_mhba_collective.h>
#include "utils/utils.h"

#define TMP_TRANSPOSE_PREALLOC 256
#define TMP_TRANSPOSE_PREALLOC 256 //todo check size

xccl_status_t xccl_mhba_collective_init_base(xccl_coll_op_args_t *coll_args,
xccl_mhba_coll_req_t **request,
Expand Down Expand Up @@ -68,22 +68,24 @@ static xccl_status_t xccl_mhba_reg_fanin_start(xccl_coll_task_t *task)

xccl_mhba_debug("register memory buffers");

request->send_bf_mr = ibv_reg_mr(
team->node.shared_pd, (void *)request->args.buffer_info.src_buffer,
request->args.buffer_info.len * team->size, sr_mem_access_flags);
if (!request->send_bf_mr) {
ucs_rcache_region_t* send_ptr;
ucs_rcache_region_t* recv_ptr;
if(UCS_OK != ucs_rcache_get(team->context->rcache, (void *)request->args.buffer_info.src_buffer,
request->args.buffer_info.len * team->size,
PROT_READ,&sr_mem_access_flags, &send_ptr)) {
xccl_mhba_error("Failed to register send_bf memory (errno=%d)", errno);
return XCCL_ERR_NO_RESOURCE;
}
request->receive_bf_mr = ibv_reg_mr(
team->node.shared_pd, (void *)request->args.buffer_info.dst_buffer,
request->args.buffer_info.len * team->size, dr_mem_access_flags);
if (!request->receive_bf_mr) {
xccl_mhba_error("Failed to register receive_bf memory (errno=%d)",
errno);
ibv_dereg_mr(request->send_bf_mr);
request->send_rcache_region_p = xccl_rcache_ucs_get_reg_data(send_ptr);

if(UCS_OK != ucs_rcache_get(team->context->rcache, (void *)request->args.buffer_info.dst_buffer,
request->args.buffer_info.len * team->size,
PROT_WRITE,&dr_mem_access_flags,&recv_ptr)) {
xccl_mhba_error("Failed to register receive_bf memory");
ucs_rcache_region_put(team->context->rcache,request->send_rcache_region_p->region);
return XCCL_ERR_NO_RESOURCE;
}
request->recv_rcache_region_p = xccl_rcache_ucs_get_reg_data(recv_ptr);

xccl_mhba_debug("fanin start");
/* start task if completion event received */
Expand Down Expand Up @@ -247,7 +249,7 @@ static inline xccl_status_t send_block_data(struct ibv_qp *qp,

static inline xccl_status_t send_atomic(struct ibv_qp *qp, uint64_t remote_addr,
uint32_t rkey, xccl_mhba_team_t *team,
xccl_mhba_coll_req_t *request)
xccl_mhba_coll_req_t *request, int flags)
{
struct ibv_send_wr *bad_wr;
struct ibv_sge list = {
Expand All @@ -261,7 +263,7 @@ static inline xccl_status_t send_atomic(struct ibv_qp *qp, uint64_t remote_addr,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_ATOMIC_FETCH_AND_ADD,
.send_flags = IBV_SEND_SIGNALED,
.send_flags = flags,
.wr.atomic.remote_addr = remote_addr,
.wr.atomic.rkey = rkey,
.wr.atomic.compare_add = 1ULL,
Expand Down Expand Up @@ -311,10 +313,11 @@ static inline xccl_status_t prepost_dummy_recv(struct ibv_qp *qp, int num)
return XCCL_OK;
}

//todo check cq's in case of parallel operations
// add polling mechanism for blocks in order to maintain const qp tx rx
static xccl_status_t
xccl_mhba_send_blocks_start_with_transpose(xccl_coll_task_t *task)
{
{ //todo make non-blocking
xccl_mhba_task_t *self = ucs_derived_of(task, xccl_mhba_task_t);
xccl_mhba_coll_req_t *request = self->req;
xccl_mhba_team_t *team = request->team;
Expand Down Expand Up @@ -384,16 +387,13 @@ xccl_mhba_send_blocks_start_with_transpose(xccl_coll_task_t *task)
xccl_mhba_error("Failed sending block [%d,%d,%d]", i, j, k);
return status;
}
while (!ibv_poll_cq(team->net.cq, 1, transpose_completion)) {}
while (!ibv_poll_cq(team->net.cqs[i], 1, transpose_completion)) {}
}
}
}

for (i = 0; i < net_size; i++) {
status = send_atomic(team->net.qps[i],
(uintptr_t)team->net.remote_ctrl[i].addr +
(index * MHBA_CTRL_SIZE),
team->net.remote_ctrl[i].rkey, team, request);
(index * MHBA_CTRL_SIZE),
team->net.remote_ctrl[i].rkey, team, request,0);
if (status != XCCL_OK) {
xccl_mhba_error("Failed sending atomic to node [%d]", i);
return status;
Expand Down Expand Up @@ -448,7 +448,7 @@ static xccl_status_t xccl_mhba_send_blocks_start(xccl_coll_task_t *task)
status = send_atomic(team->net.qps[i],
(uintptr_t)team->net.remote_ctrl[i].addr +
(index * MHBA_CTRL_SIZE),
team->net.remote_ctrl[i].rkey, team, request);
team->net.remote_ctrl[i].rkey, team, request,IBV_SEND_SIGNALED);
if (status != XCCL_OK) {
xccl_mhba_error("Failed sending atomic to node [%d]", i);
return status;
Expand All @@ -458,13 +458,18 @@ static xccl_status_t xccl_mhba_send_blocks_start(xccl_coll_task_t *task)
return XCCL_OK;
}

xccl_status_t xccl_mhba_send_blocks_progress_transpose(xccl_coll_task_t *task){
task->state = XCCL_TASK_STATE_COMPLETED;
return XCCL_OK;
}

xccl_status_t xccl_mhba_send_blocks_progress(xccl_coll_task_t *task)
{
xccl_mhba_task_t *self = ucs_derived_of(task, xccl_mhba_task_t);
xccl_mhba_coll_req_t *request = self->req;
xccl_mhba_team_t *team = request->team;
int i, completions_num;
completions_num = ibv_poll_cq(team->net.cq, team->net.sbgp->group_size,
completions_num = ibv_poll_cq(team->net.cqs[0], team->net.sbgp->group_size,
team->work_completion);
if (completions_num < 0) {
xccl_mhba_error("ibv_poll_cq() failed for RDMA_ATOMIC execution");
Expand Down Expand Up @@ -506,11 +511,13 @@ xccl_status_t xccl_mhba_alltoall_init(xccl_coll_op_args_t *coll_args,
}
xccl_schedule_init(&request->schedule, team->super.ctx);
if (team->transpose_hw_limitations) {
block_size = team->blocks_sizes[__ucs_ilog2_u32(len - 1)];
block_size = (len == 1) ? team->blocks_sizes[__ucs_ilog2_u32(len - 1)+1] : team->blocks_sizes[0];
} else {
block_size = team->node.sbgp->group_size;
}

block_size = team->requested_block_size ? team->requested_block_size : block_size;

//todo following section correct assuming homogenous PPN across all nodes
if (team->node.sbgp->group_size % block_size != 0) {
if (team->node.sbgp->group_rank == team->node.asr_rank) {
Expand All @@ -536,6 +543,10 @@ xccl_status_t xccl_mhba_alltoall_init(xccl_coll_op_args_t *coll_args,
request->tmp_transpose_buf = NULL;
request->tasks =
(xccl_mhba_task_t *)malloc(sizeof(xccl_mhba_task_t) * n_tasks);
if (!request->tasks){
xccl_mhba_error("malloc tasks failed");
return XCCL_ERR_NO_MEMORY;
}
request->seq_num = team->sequence_number;
xccl_mhba_debug("Seq num is %d", request->seq_num);
team->sequence_number++;
Expand Down Expand Up @@ -571,11 +582,12 @@ xccl_status_t xccl_mhba_alltoall_init(xccl_coll_op_args_t *coll_args,
if (team->transpose) {
request->tasks[2].super.handlers[XCCL_EVENT_COMPLETED] =
xccl_mhba_send_blocks_start_with_transpose;
request->tasks[2].super.progress = xccl_mhba_send_blocks_progress_transpose;
} else {
request->tasks[2].super.handlers[XCCL_EVENT_COMPLETED] =
xccl_mhba_send_blocks_start;
request->tasks[2].super.progress = xccl_mhba_send_blocks_progress;
}
request->tasks[2].super.progress = xccl_mhba_send_blocks_progress;

request->tasks[3].super.handlers[XCCL_EVENT_COMPLETED] =
xccl_mhba_fanout_start;
Expand Down
6 changes: 3 additions & 3 deletions src/team_lib/mhba/xccl_mhba_collective.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ typedef struct xccl_mhba_coll_req {
xccl_mhba_task_t *tasks;
xccl_coll_op_args_t args;
xccl_mhba_team_t *team;
int seq_num;
struct ibv_mr *send_bf_mr;
struct ibv_mr *receive_bf_mr;
uint64_t seq_num;
xccl_tl_coll_req_t *barrier_req;
int block_size;
int started;
xccl_mhba_reg_t *send_rcache_region_p;
xccl_mhba_reg_t *recv_rcache_region_p;
struct ibv_mr *transpose_buf_mr;
void *tmp_transpose_buf;
} xccl_mhba_coll_req_t;
Expand Down
21 changes: 9 additions & 12 deletions src/team_lib/mhba/xccl_mhba_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ static ucs_config_field_t xccl_tl_mhba_context_config_table[] = {
ucs_offsetof(xccl_tl_mhba_context_config_t, transpose),
UCS_CONFIG_TYPE_UINT},

{"TRANSPOSE_HW_LIMITATIONS", "1",
{"TRANSPOSE_HW_LIMITATIONS", "0",
"Boolean - with transpose hw limitations or not",
ucs_offsetof(xccl_tl_mhba_context_config_t, transpose_hw_limitations),
UCS_CONFIG_TYPE_UINT},
UCS_CONFIG_TYPE_UINT}, //todo change to 1 in production

{"IB_GLOBAL", "0", "Use global ib routing",
ucs_offsetof(xccl_tl_mhba_context_config_t, ib_global),
Expand All @@ -41,6 +41,10 @@ static ucs_config_field_t xccl_tl_mhba_context_config_table[] = {
ucs_offsetof(xccl_tl_mhba_context_config_t, transpose_buf_size),
UCS_CONFIG_TYPE_MEMUNITS},

{"BLOCK_SIZE", "0", "Size of the blocks that are sent using blocked AlltoAll Algorithm",
ucs_offsetof(xccl_tl_mhba_context_config_t, block_size),
UCS_CONFIG_TYPE_UINT},

{NULL}
};

Expand Down Expand Up @@ -187,17 +191,10 @@ static xccl_status_t xccl_mhba_collective_finalize(xccl_tl_coll_req_t *request)
xccl_status_t status = XCCL_OK;
xccl_mhba_coll_req_t *req = ucs_derived_of(request, xccl_mhba_coll_req_t);
xccl_mhba_team_t * team = req->team;
if (ibv_dereg_mr(req->send_bf_mr)) {
xccl_mhba_error("Failed to dereg_mr send buffer (errno=%d)", errno);
status = XCCL_ERR_NO_MESSAGE;
}
if (ibv_dereg_mr(req->receive_bf_mr)) {
xccl_mhba_error("Failed to dereg_mr send buffer (errno=%d)", errno);
status = XCCL_ERR_NO_MESSAGE;
}
ucs_rcache_region_put(team->context->rcache,req->send_rcache_region_p->region);
ucs_rcache_region_put(team->context->rcache,req->recv_rcache_region_p->region);
if (team->transpose) {
if (req->tmp_transpose_buf)
free(req->tmp_transpose_buf);
free(req->tmp_transpose_buf);
if (req->transpose_buf_mr != team->transpose_buf_mr) {
ibv_dereg_mr(req->transpose_buf_mr);
free(req->transpose_buf_mr->addr);
Expand Down
23 changes: 17 additions & 6 deletions src/team_lib/mhba/xccl_mhba_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "topo/xccl_topo.h"
#include <infiniband/verbs.h>
#include <infiniband/mlx5dv.h>
#include <ucs/memory/rcache.h>

#define MAX_OUTSTANDING_OPS 1 //todo change - according to limitations (52 top)
#define SEQ_INDEX(_seq_num) ((_seq_num) % MAX_OUTSTANDING_OPS)
Expand All @@ -27,10 +28,9 @@ typedef struct xccl_tl_mhba_context_config {
int transpose;
int transpose_hw_limitations;
size_t transpose_buf_size;
int block_size;
} xccl_tl_mhba_context_config_t;

//todo add block_size config

typedef struct xccl_team_lib_mhba {
xccl_team_lib_t super;
xccl_team_lib_mhba_config_t config;
Expand Down Expand Up @@ -73,6 +73,7 @@ typedef struct xccl_mhba_context {
struct ibv_context *ib_ctx;
struct ibv_pd *ib_pd;
int ib_port;
ucs_rcache_t *rcache;
} xccl_mhba_context_t;

typedef struct xccl_mhba_op {
Expand Down Expand Up @@ -103,19 +104,28 @@ typedef struct xccl_mhba_node {
struct mlx5dv_qp_ex *umr_mlx5dv_qp_ex;
} xccl_mhba_node_t;

#define MHBA_CTRL_SIZE 128 //todo change according to arch
#define MHBA_CTRL_SIZE 128 //todo change to UCS_ARCH_CACHE_LINE_SIZE
#define MHBA_DATA_SIZE sizeof(struct mlx5dv_mr_interleaved)
#define MHBA_NUM_OF_BLOCKS_SIZE_BINS 7
#define MHBA_NUM_OF_BLOCKS_SIZE_BINS 8
#define MAX_TRANSPOSE_SIZE 8000 // HW transpose unit is limited to matrix size
#define MAX_MSG_SIZE 128 // HW transpose unit is limited to element size
#define MAX_STRIDED_ENTRIES 55 // from limit of NIC memory - Sergey Gorenko's email

typedef struct xccl_mhba_reg {
struct ibv_mr *mr;
ucs_rcache_region_t *region;
} xccl_mhba_reg_t;

static inline xccl_mhba_reg_t* xccl_rcache_ucs_get_reg_data(ucs_rcache_region_t *region) {
return (xccl_mhba_reg_t *)((ptrdiff_t)region + sizeof(ucs_rcache_region_t));
}

typedef struct xccl_mhba_net {
xccl_sbgp_t *sbgp;
int net_size;
int *rank_map;
struct ibv_qp **qps;
struct ibv_cq *cq;
struct ibv_cq **cqs;
struct ibv_mr *ctrl_mr;
struct {
void *addr;
Expand All @@ -132,13 +142,14 @@ typedef struct xccl_mhba_team {
uint64_t max_msg_size;
xccl_mhba_node_t node;
xccl_mhba_net_t net;
int sequence_number;
uint64_t sequence_number;
int op_busy[MAX_OUTSTANDING_OPS];
int cq_completions[MAX_OUTSTANDING_OPS];
xccl_mhba_context_t *context;
int blocks_sizes[MHBA_NUM_OF_BLOCKS_SIZE_BINS];
int size;
uint64_t dummy_atomic_buff;
int requested_block_size;
struct ibv_mr *dummy_bf_mr;
struct ibv_wc *work_completion;
void *transpose_buf;
Expand Down
2 changes: 1 addition & 1 deletion src/team_lib/mhba/xccl_mhba_mkeys.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ static void update_mkey_entry(xccl_mhba_node_t *node, xccl_mhba_coll_req_t *req,
(struct mlx5dv_mr_interleaved *)(direction_send ?
node->ops[index].my_send_umr_data
: node->ops[index].my_recv_umr_data);
struct ibv_mr *buff = direction_send ? req->send_bf_mr : req->receive_bf_mr;
struct ibv_mr *buff = direction_send ? req->send_rcache_region_p->mr : req->recv_rcache_region_p->mr;
mkey_entry->addr = (uintptr_t)buff->addr;
mkey_entry->bytes_count = req->block_size * req->args.buffer_info.len;
mkey_entry->bytes_skip = 0;
Expand Down
Loading