Skip to content

Commit

Permalink
Also add stream name into the endpoint index assignment
Browse files Browse the repository at this point in the history
Adding the (hash of) stream name into the endpoint index calculation to
increase entropy (and hopefully more even distribution among endpoints).
  • Loading branch information
narategithub committed Jul 13, 2024
1 parent 7c76f84 commit f147f17
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 10 deletions.
1 change: 1 addition & 0 deletions ldms/src/core/ldms.h
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,7 @@ struct ldms_stream_recv_data_s {
json_entity_t json; /* json entity */
struct ldms_cred cred; /* credential */
uint32_t perm; /* permission */
uint32_t name_hash; /* stream name hash */
};

/* To report subscrube / unsubscribe return status */
Expand Down
35 changes: 26 additions & 9 deletions ldms/src/core/ldms_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

#include "ovis_json/ovis_json.h"
#include "coll/rbt.h"
#include "coll/fnv_hash.h"
#include "ovis_ref/ref.h"
#include "ovis_log/ovis_log.h"

Expand Down Expand Up @@ -221,6 +222,7 @@ static int __part_send(struct ldms_rail_ep_s *rep,
}

static int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name,
uint32_t hash,
ldms_stream_type_t stream_type,
struct ldms_addr *src, uint64_t msg_gn,
ldms_cred_t cred, int perm,
Expand Down Expand Up @@ -254,6 +256,7 @@ static int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name,
msg.cred.uid = htobe32(cred->uid);
msg.cred.gid = htobe32(cred->gid);
msg.perm = htobe32(perm);
msg.name_hash = htobe32(hash);
rc = __part_send(rep, &msg.src, msg_gn,
&msg, sizeof(msg), /* msg hdr */
stream_name, name_len, /* name */
Expand All @@ -272,6 +275,7 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg)
int ep_idx;
int rc;
uint64_t addr_port;
uint64_t hash;
if (ev->type == LDMS_STREAM_EVENT_CLOSE)
return 0;
assert( ev->type == LDMS_STREAM_EVENT_RECV );
Expand All @@ -280,17 +284,19 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg)
r = (ldms_rail_t)ev->recv.client->x;
switch (ev->recv.src.sa_family) {
case 0:
ep_idx = 0;
ep_idx = ( ev->recv.name_hash % primer ) % r->n_eps;
break;
case AF_INET:
addr_port = be32toh(*(int*)&ev->recv.src.addr[0]);
addr_port = (addr_port<<16) | be16toh(ev->recv.src.sin_port);
ep_idx = ( addr_port % primer ) % r->n_eps;
hash = (addr_port << 32) | ev->recv.name_hash;
ep_idx = ( hash % primer ) % r->n_eps;
break;
case AF_INET6:
addr_port = be32toh(*(int*)&ev->recv.src.addr[12]);
addr_port = (addr_port<<16) | be16toh(ev->recv.src.sin_port);
ep_idx = ( addr_port % primer ) % r->n_eps;
hash = (addr_port << 32) | ev->recv.name_hash;
ep_idx = ( hash % primer ) % r->n_eps;
break;
default:
assert(0 == "Unexpected network family");
Expand All @@ -306,7 +312,8 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg)
if (rc)
goto out;

rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.type,
rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.name_hash,
ev->recv.type,
&ev->recv.src, ev->recv.msg_gn,
&ev->recv.cred, ev->recv.perm,
ev->recv.data,
Expand Down Expand Up @@ -458,6 +465,7 @@ void __counters_update(struct ldms_stream_counters_s *ctr,
static int
__stream_deliver(struct ldms_addr *src, uint64_t msg_gn,
const char *stream_name, int name_len,
uint32_t hash,
ldms_stream_type_t stream_type,
ldms_cred_t cred, uint32_t perm,
const char *data, size_t data_len)
Expand All @@ -483,6 +491,7 @@ __stream_deliver(struct ldms_addr *src, uint64_t msg_gn,
.name_len = name_len,
.data_len = data_len,
.name = stream_name,
.name_hash = hash,
.data = data,
.cred = *cred,
.perm = perm,
Expand Down Expand Up @@ -803,6 +812,8 @@ int ldms_stream_publish(ldms_t x, const char *stream_name,
int name_len = strlen(stream_name) + 1;
struct ldms_cred _cred;
int rc;
uint32_t hash;
int ep_idx;

msg_gn = __atomic_fetch_add(&stream_gn, 1, __ATOMIC_SEQ_CST);

Expand All @@ -819,18 +830,22 @@ int ldms_stream_publish(ldms_t x, const char *stream_name,
cred = &_cred;
}

hash = fnv_hash_a1_32(stream_name, strlen(stream_name), FNV_32_PRIME);

/* publish directly to remote peer */
if (x) {
if (!XTYPE_IS_RAIL(x->xtype))
return ENOTSUP;
r = (ldms_rail_t)x;
return __rep_publish(&r->eps[0], stream_name, stream_type, 0,
msg_gn, cred, perm, data, data_len);
ep_idx = ( hash % primer ) % r->n_eps;
return __rep_publish(&r->eps[ep_idx], stream_name, hash,
stream_type, 0, msg_gn, cred, perm, data,
data_len);
}

/* else publish locally */
return __stream_deliver(0, msg_gn, stream_name, name_len, stream_type,
cred, perm, data, data_len);
return __stream_deliver(0, msg_gn, stream_name, name_len, hash,
stream_type, cred, perm, data, data_len);
}

static void __client_ref_free(void *arg)
Expand Down Expand Up @@ -1209,14 +1224,16 @@ __process_stream_msg(ldms_t x, struct ldms_request *req)
sbuf->msg->cred.uid = be32toh(sbuf->msg->cred.uid);
sbuf->msg->cred.gid = be32toh(sbuf->msg->cred.gid);
sbuf->msg->perm = be32toh(sbuf->msg->perm);
sbuf->msg->name_hash = be32toh(sbuf->msg->name_hash);

name = sbuf->msg->msg;
name_len = strlen(name)+1;
data = sbuf->msg->msg + name_len;
data_len = sbuf->msg->msg_len - name_len;

__stream_deliver(&sbuf->msg->src, sbuf->msg->msg_gn,
name, name_len, sbuf->msg->stream_type,
name, name_len, sbuf->msg->name_hash,
sbuf->msg->stream_type,
&sbuf->msg->cred, sbuf->msg->perm,
data, data_len);
__rail_ep_credit_return(rep, name_len + data_len);
Expand Down
2 changes: 1 addition & 1 deletion ldms/src/core/ldms_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ struct ldms_stream_full_msg_s {
uint32_t stream_type;
struct ldms_cred cred; /* credential of the originator */
uint32_t perm; /* 0777 style permission */
uint32_t preserved;
uint32_t name_hash;
char msg[OVIS_FLEX];
/* `msg` format:
* .----------------------.
Expand Down

0 comments on commit f147f17

Please sign in to comment.