From f147f17eb03264de64e918b3cb216ca2cd7222c0 Mon Sep 17 00:00:00 2001 From: Narate Taerat Date: Sat, 13 Jul 2024 09:15:31 -0500 Subject: [PATCH] Also add stream name into the endpoint index assignment Adding the (hash of) stream name into the endpoint index calculation to increase entropy (and hopefully more even distribution among endpoints). --- ldms/src/core/ldms.h | 1 + ldms/src/core/ldms_stream.c | 35 ++++++++++++++++++++++++++--------- ldms/src/core/ldms_stream.h | 2 +- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index d94c725bb2..53e24a671f 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -1171,6 +1171,7 @@ struct ldms_stream_recv_data_s { json_entity_t json; /* json entity */ struct ldms_cred cred; /* credential */ uint32_t perm; /* permission */ + uint32_t name_hash; /* stream name hash */ }; /* To report subscrube / unsubscribe return status */ diff --git a/ldms/src/core/ldms_stream.c b/ldms/src/core/ldms_stream.c index 9dc64425c5..ee4f3e4d4e 100644 --- a/ldms/src/core/ldms_stream.c +++ b/ldms/src/core/ldms_stream.c @@ -70,6 +70,7 @@ #include "ovis_json/ovis_json.h" #include "coll/rbt.h" +#include "coll/fnv_hash.h" #include "ovis_ref/ref.h" #include "ovis_log/ovis_log.h" @@ -221,6 +222,7 @@ static int __part_send(struct ldms_rail_ep_s *rep, } static int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, + uint32_t hash, ldms_stream_type_t stream_type, struct ldms_addr *src, uint64_t msg_gn, ldms_cred_t cred, int perm, @@ -254,6 +256,7 @@ static int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, msg.cred.uid = htobe32(cred->uid); msg.cred.gid = htobe32(cred->gid); msg.perm = htobe32(perm); + msg.name_hash = htobe32(hash); rc = __part_send(rep, &msg.src, msg_gn, &msg, sizeof(msg), /* msg hdr */ stream_name, name_len, /* name */ @@ -272,6 +275,7 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) int ep_idx; int rc; uint64_t addr_port; + uint64_t hash; if (ev->type == LDMS_STREAM_EVENT_CLOSE) return 0; assert( ev->type == LDMS_STREAM_EVENT_RECV ); @@ -280,17 +284,19 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) r = (ldms_rail_t)ev->recv.client->x; switch (ev->recv.src.sa_family) { case 0: - ep_idx = 0; + ep_idx = ( ev->recv.name_hash % primer ) % r->n_eps; break; case AF_INET: addr_port = be32toh(*(int*)&ev->recv.src.addr[0]); addr_port = (addr_port<<16) | be16toh(ev->recv.src.sin_port); - ep_idx = ( addr_port % primer ) % r->n_eps; + hash = (addr_port << 32) | ev->recv.name_hash; + ep_idx = ( hash % primer ) % r->n_eps; break; case AF_INET6: addr_port = be32toh(*(int*)&ev->recv.src.addr[12]); addr_port = (addr_port<<16) | be16toh(ev->recv.src.sin_port); - ep_idx = ( addr_port % primer ) % r->n_eps; + hash = (addr_port << 32) | ev->recv.name_hash; + ep_idx = ( hash % primer ) % r->n_eps; break; default: assert(0 == "Unexpected network family"); @@ -306,7 +312,8 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) if (rc) goto out; - rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.type, + rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.name_hash, + ev->recv.type, &ev->recv.src, ev->recv.msg_gn, &ev->recv.cred, ev->recv.perm, ev->recv.data, @@ -458,6 +465,7 @@ void __counters_update(struct ldms_stream_counters_s *ctr, static int __stream_deliver(struct ldms_addr *src, uint64_t msg_gn, const char *stream_name, int name_len, + uint32_t hash, ldms_stream_type_t stream_type, ldms_cred_t cred, uint32_t perm, const char *data, size_t data_len) @@ -483,6 +491,7 @@ __stream_deliver(struct ldms_addr *src, uint64_t msg_gn, .name_len = name_len, .data_len = data_len, .name = stream_name, + .name_hash = hash, .data = data, .cred = *cred, .perm = perm, @@ -803,6 +812,8 @@ int ldms_stream_publish(ldms_t x, const char *stream_name, int name_len = strlen(stream_name) + 1; struct ldms_cred _cred; int rc; + uint32_t hash; + int ep_idx; msg_gn = __atomic_fetch_add(&stream_gn, 1, __ATOMIC_SEQ_CST); @@ -819,18 +830,22 @@ int ldms_stream_publish(ldms_t x, const char *stream_name, cred = &_cred; } + hash = fnv_hash_a1_32(stream_name, strlen(stream_name), FNV_32_PRIME); + /* publish directly to remote peer */ if (x) { if (!XTYPE_IS_RAIL(x->xtype)) return ENOTSUP; r = (ldms_rail_t)x; - return __rep_publish(&r->eps[0], stream_name, stream_type, 0, - msg_gn, cred, perm, data, data_len); + ep_idx = ( hash % primer ) % r->n_eps; + return __rep_publish(&r->eps[ep_idx], stream_name, hash, + stream_type, 0, msg_gn, cred, perm, data, + data_len); } /* else publish locally */ - return __stream_deliver(0, msg_gn, stream_name, name_len, stream_type, - cred, perm, data, data_len); + return __stream_deliver(0, msg_gn, stream_name, name_len, hash, + stream_type, cred, perm, data, data_len); } static void __client_ref_free(void *arg) @@ -1209,6 +1224,7 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) sbuf->msg->cred.uid = be32toh(sbuf->msg->cred.uid); sbuf->msg->cred.gid = be32toh(sbuf->msg->cred.gid); sbuf->msg->perm = be32toh(sbuf->msg->perm); + sbuf->msg->name_hash = be32toh(sbuf->msg->name_hash); name = sbuf->msg->msg; name_len = strlen(name)+1; @@ -1216,7 +1232,8 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) data_len = sbuf->msg->msg_len - name_len; __stream_deliver(&sbuf->msg->src, sbuf->msg->msg_gn, - name, name_len, sbuf->msg->stream_type, + name, name_len, sbuf->msg->name_hash, + sbuf->msg->stream_type, &sbuf->msg->cred, sbuf->msg->perm, data, data_len); __rail_ep_credit_return(rep, name_len + data_len); diff --git a/ldms/src/core/ldms_stream.h b/ldms/src/core/ldms_stream.h index d0683bb653..9e29b3cd3d 100644 --- a/ldms/src/core/ldms_stream.h +++ b/ldms/src/core/ldms_stream.h @@ -134,7 +134,7 @@ struct ldms_stream_full_msg_s { uint32_t stream_type; struct ldms_cred cred; /* credential of the originator */ uint32_t perm; /* 0777 style permission */ - uint32_t preserved; + uint32_t name_hash; char msg[OVIS_FLEX]; /* `msg` format: * .----------------------.