Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream-Rail distribution with addr + port #1411

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ldms/src/core/ldms.h
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,7 @@ struct ldms_stream_recv_data_s {
json_entity_t json; /* json entity */
struct ldms_cred cred; /* credential */
uint32_t perm; /* permission */
uint32_t name_hash; /* stream name hash */
};

/* To report subscrube / unsubscribe return status */
Expand Down
40 changes: 31 additions & 9 deletions ldms/src/core/ldms_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

#include "ovis_json/ovis_json.h"
#include "coll/rbt.h"
#include "coll/fnv_hash.h"
#include "ovis_ref/ref.h"
#include "ovis_log/ovis_log.h"

Expand Down Expand Up @@ -221,6 +222,7 @@ static int __part_send(struct ldms_rail_ep_s *rep,
}

static int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name,
uint32_t hash,
ldms_stream_type_t stream_type,
struct ldms_addr *src, uint64_t msg_gn,
ldms_cred_t cred, int perm,
Expand Down Expand Up @@ -254,6 +256,7 @@ static int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name,
msg.cred.uid = htobe32(cred->uid);
msg.cred.gid = htobe32(cred->gid);
msg.perm = htobe32(perm);
msg.name_hash = hash;
rc = __part_send(rep, &msg.src, msg_gn,
&msg, sizeof(msg), /* msg hdr */
stream_name, name_len, /* name */
Expand All @@ -271,6 +274,8 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg)
ldms_rail_t r;
int ep_idx;
int rc;
uint64_t addr_port;
uint64_t hash;
if (ev->type == LDMS_STREAM_EVENT_CLOSE)
return 0;
assert( ev->type == LDMS_STREAM_EVENT_RECV );
Expand All @@ -279,13 +284,19 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg)
r = (ldms_rail_t)ev->recv.client->x;
switch (ev->recv.src.sa_family) {
case 0:
ep_idx = 0;
ep_idx = ( ev->recv.name_hash % primer ) % r->n_eps;
break;
case AF_INET:
ep_idx = ( be32toh(*(int*)&ev->recv.src.addr[0]) % primer ) % r->n_eps;
addr_port = be32toh(*(int*)&ev->recv.src.addr[0]);
addr_port = (addr_port<<16) | be16toh(ev->recv.src.sin_port);
hash = (addr_port << 32) | ev->recv.name_hash;
ep_idx = ( hash % primer ) % r->n_eps;
break;
case AF_INET6:
ep_idx = ( be32toh(*(int*)&ev->recv.src.addr[12]) % primer ) % r->n_eps;
addr_port = be32toh(*(int*)&ev->recv.src.addr[12]);
addr_port = (addr_port<<16) | be16toh(ev->recv.src.sin_port);
hash = (addr_port << 32) | ev->recv.name_hash;
ep_idx = ( hash % primer ) % r->n_eps;
break;
default:
assert(0 == "Unexpected network family");
Expand All @@ -301,7 +312,8 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg)
if (rc)
goto out;

rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.type,
rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.name_hash,
ev->recv.type,
&ev->recv.src, ev->recv.msg_gn,
&ev->recv.cred, ev->recv.perm,
ev->recv.data,
Expand Down Expand Up @@ -453,6 +465,7 @@ void __counters_update(struct ldms_stream_counters_s *ctr,
static int
__stream_deliver(struct ldms_addr *src, uint64_t msg_gn,
const char *stream_name, int name_len,
uint32_t hash,
ldms_stream_type_t stream_type,
ldms_cred_t cred, uint32_t perm,
const char *data, size_t data_len)
Expand All @@ -478,6 +491,7 @@ __stream_deliver(struct ldms_addr *src, uint64_t msg_gn,
.name_len = name_len,
.data_len = data_len,
.name = stream_name,
.name_hash = hash,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're going to put the hash in the message, you could put the entire thing in there and avoid the computation on every message. The port and addr aren't changing. This would also mean that the same hash is used all the way up the chain from sender to consumer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The address got changed in the following case.

There is a logic to exclude the use of local address (e.g. 127.0.0.1) as src address here:

https://github.com/ovis-hpc/ovis/blob/f71e7adcff4a852f3ccc05b143a7794a6bc71986/ldms/src/core/ldms_stream.c#L1131-L1153

This is to prevent the case that the application running on the same host connects to sampler ldmsd using localhost address and make all src in the messages from all nodes being 127.0.0.1. Basically, if the src address is local, do not resolve it and let the next guy resolve it instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there must be code somewhere else that checks the address and family and if 0 sets it to the address and port of the forwarder? Where is that logic?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@narategithub In any event, all we're doing here is randomizing the seed to what we hope will be a random hash. So as-usual, we've found ourselves in a hole so we keep digging. Let's just back up and decide how we're going to generate a random hash.

.data = data,
.cred = *cred,
.perm = perm,
Expand Down Expand Up @@ -798,6 +812,8 @@ int ldms_stream_publish(ldms_t x, const char *stream_name,
int name_len = strlen(stream_name) + 1;
struct ldms_cred _cred;
int rc;
uint32_t hash;
int ep_idx;

msg_gn = __atomic_fetch_add(&stream_gn, 1, __ATOMIC_SEQ_CST);

Expand All @@ -814,18 +830,22 @@ int ldms_stream_publish(ldms_t x, const char *stream_name,
cred = &_cred;
}

hash = fnv_hash_a1_32(stream_name, strlen(stream_name), FNV_32_PRIME);

/* publish directly to remote peer */
if (x) {
if (!XTYPE_IS_RAIL(x->xtype))
return ENOTSUP;
r = (ldms_rail_t)x;
return __rep_publish(&r->eps[0], stream_name, stream_type, 0,
msg_gn, cred, perm, data, data_len);
ep_idx = ( hash % primer ) % r->n_eps;
return __rep_publish(&r->eps[ep_idx], stream_name, hash,
stream_type, 0, msg_gn, cred, perm, data,
data_len);
}

/* else publish locally */
return __stream_deliver(0, msg_gn, stream_name, name_len, stream_type,
cred, perm, data, data_len);
return __stream_deliver(0, msg_gn, stream_name, name_len, hash,
stream_type, cred, perm, data, data_len);
}

static void __client_ref_free(void *arg)
Expand Down Expand Up @@ -1204,14 +1224,16 @@ __process_stream_msg(ldms_t x, struct ldms_request *req)
sbuf->msg->cred.uid = be32toh(sbuf->msg->cred.uid);
sbuf->msg->cred.gid = be32toh(sbuf->msg->cred.gid);
sbuf->msg->perm = be32toh(sbuf->msg->perm);
/* sbuf->msg->name_hash does not need byte conversion */

name = sbuf->msg->msg;
name_len = strlen(name)+1;
data = sbuf->msg->msg + name_len;
data_len = sbuf->msg->msg_len - name_len;

__stream_deliver(&sbuf->msg->src, sbuf->msg->msg_gn,
name, name_len, sbuf->msg->stream_type,
name, name_len, sbuf->msg->name_hash,
sbuf->msg->stream_type,
&sbuf->msg->cred, sbuf->msg->perm,
data, data_len);
__rail_ep_credit_return(rep, name_len + data_len);
Expand Down
2 changes: 1 addition & 1 deletion ldms/src/core/ldms_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ struct ldms_stream_full_msg_s {
uint32_t stream_type;
struct ldms_cred cred; /* credential of the originator */
uint32_t perm; /* 0777 style permission */
uint32_t preserved;
uint32_t name_hash;
char msg[OVIS_FLEX];
/* `msg` format:
* .----------------------.
Expand Down
Loading