Skip to content

Commit

Permalink
Fix prdcr_stream_status's handler
Browse files Browse the repository at this point in the history
- Remove a possible race between forwarding the stream_status request to
the connected producer and receiving the response back. The race could
cause LDMSD to free the forwarding request context before LDMSD receives
all the responses.
  • Loading branch information
nichamon authored and tom95858 committed Jan 21, 2023
1 parent 783cfdd commit 46ab920
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 38 deletions.
114 changes: 77 additions & 37 deletions ldms/src/ldmsd/ldmsd_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1896,20 +1896,18 @@ static int prdcr_unsubscribe_regex_handler(ldmsd_req_ctxt_t reqc)
return 0;
}

struct prdcr_stream_dir_regex_ctxt {
int sent_req; /* Number of producers the STREAM_INFO_REG sent to */
int all; /* 1 if LDMSD sent a request to all matched producers */
int recv_resp; /* Number of responses received by LDMSD */
struct pstream_status_regex_ctxt {
json_entity_t stream_dict;
struct ldmsd_str_list prdcr_list;
pthread_mutex_t lock;
};

struct prdcr_stream_dir_ctxt {
const char *prdcr_name;
struct prdcr_stream_dir_regex_ctxt *base;
struct prdcr_stream_status_ctxt {
struct ldmsd_str_ent *pname;
struct pstream_status_regex_ctxt *base;
};

static int __process_stream_status(struct prdcr_stream_dir_ctxt *ctxt, char *data, size_t data_len)
static int __process_stream_status(struct prdcr_stream_status_ctxt *ctxt, char *data, size_t data_len)
{
int rc = 0;
json_parser_t parser;
Expand All @@ -1927,19 +1925,21 @@ static int __process_stream_status(struct prdcr_stream_dir_ctxt *ctxt, char *dat
for (a = json_attr_first(d); a; a = json_attr_next(a)) {
stream_name = json_attr_name(a)->str;
s = json_attr_value(a);
pthread_mutex_lock(&ctxt->base->lock);
ss = json_value_find(ctxt->base->stream_dict, stream_name);
if (!ss) {
ss = json_entity_new(JSON_DICT_VALUE);
json_attr_add(ctxt->base->stream_dict, stream_name, ss);
}
assert(!json_value_find(ss, ctxt->prdcr_name)); /* Receive stream_dir from this producer twice */
pthread_mutex_unlock(&ctxt->base->lock);
assert(!json_value_find(ss, ctxt->pname->str)); /* Receive stream_dir from this producer twice */
json_attr_rem(s, "publishers"); /* We need to know only the overall statistic on sampler. */
p = json_entity_copy(s);
if (!p) {
rc = ENOMEM;
goto free_json;
}
json_attr_add(ss, ctxt->prdcr_name, p);
json_attr_add(ss, ctxt->pname->str, p);
}
free_json:
json_entity_free(d);
Expand All @@ -1949,24 +1949,25 @@ static int __process_stream_status(struct prdcr_stream_dir_ctxt *ctxt, char *dat
static int __on_stream_status_resp(ldmsd_req_cmd_t rcmd)
{
int rc = 0;
struct prdcr_stream_dir_ctxt *ctxt = rcmd->ctxt;
struct prdcr_stream_dir_regex_ctxt *base = ctxt->base;
struct prdcr_stream_status_ctxt *ctxt = rcmd->ctxt;
struct pstream_status_regex_ctxt *base = ctxt->base;

__sync_fetch_and_add(&base->recv_resp, 1);
ldmsd_req_hdr_t resp = (ldmsd_req_hdr_t)(rcmd->reqc->req_buf);
ldmsd_req_attr_t attr = ldmsd_first_attr(resp);
assert(attr->attr_id == LDMSD_ATTR_JSON);
pthread_mutex_lock(&ctxt->base->lock);
rc = __process_stream_status(ctxt, (char*)attr->attr_value, attr->attr_len);
pthread_mutex_unlock(&ctxt->base->lock);
TAILQ_REMOVE(&base->prdcr_list, ctxt->pname, entry);
free(ctxt->pname->str);
free(ctxt->pname);
free(ctxt);

if (!base->all)
goto out;
if (base->sent_req != base->recv_resp)
if (!TAILQ_EMPTY(&base->prdcr_list))
goto out;

/* Respond to the client */
/*
* We have received all responses.
* Send the response back to the client
*/
jbuf_t jb;
struct ldmsd_req_attr_s _a;
jb = json_entity_dump(NULL, base->stream_dict);
Expand All @@ -1980,22 +1981,24 @@ static int __on_stream_status_resp(ldmsd_req_cmd_t rcmd)
uint32_t discrim = 0;
ldmsd_append_reply(rcmd->org_reqc, (char*)&(discrim),
sizeof(discrim), LDMSD_REQ_EOM_F);
json_entity_free(base->stream_dict);
free(base);
out:
return rc;
}

static int __prdcr_stream_status(ldmsd_prdcr_t prdcr, ldmsd_req_ctxt_t oreqc,
struct prdcr_stream_dir_regex_ctxt *base)
struct pstream_status_regex_ctxt *base,
struct ldmsd_str_ent *pname)
{
int rc;
ldmsd_req_cmd_t rcmd;
struct prdcr_stream_dir_ctxt *ctxt;
struct prdcr_stream_status_ctxt *ctxt;

ctxt = malloc(sizeof(*ctxt));
if (!ctxt)
return ENOMEM;
ctxt->prdcr_name = prdcr->obj.name;
ctxt->pname = pname;
ctxt->base = base;

ldmsd_prdcr_lock(prdcr);
Expand All @@ -2010,7 +2013,6 @@ static int __prdcr_stream_status(ldmsd_prdcr_t prdcr, ldmsd_req_ctxt_t oreqc,
rc = ldmsd_req_cmd_attr_term(rcmd);
if (rc)
goto rcmd_err;
__sync_fetch_and_add(&base->sent_req, 1);
}
ldmsd_prdcr_unlock(prdcr);
return 0;
Expand All @@ -2030,58 +2032,96 @@ int prdcr_stream_status_handler(ldmsd_req_ctxt_t reqc)
struct ldmsd_sec_ctxt sctxt;
regex_t regex;
ldmsd_prdcr_t prdcr;
struct prdcr_stream_dir_regex_ctxt *ctxt;
int count = 0;
struct pstream_status_regex_ctxt *ctxt;
struct ldmsd_str_ent *pname, *nxt_pname;

prdcr_regex = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_REGEX);
if (!prdcr_regex) {
rc = EINVAL;
cnt = snprintf(reqc->line_buf, reqc->line_len,
"The attribute 'regex' is required by prdcr_stop_regex.");
goto send_reply;
goto send_resp_code;
}
ldmsd_req_ctxt_sec_get(reqc, &sctxt);

ctxt = calloc(1, sizeof(*ctxt));
if (!ctxt)
return ENOMEM;
TAILQ_INIT(&ctxt->prdcr_list);

ctxt->stream_dict = json_entity_new(JSON_DICT_VALUE);
if (!ctxt->stream_dict) {
rc = ENOMEM;
goto free_ctxt;
goto send_resp_code;
}
pthread_mutex_init(&ctxt->lock, NULL);

rc = ldmsd_compile_regex(&regex, prdcr_regex, reqc->line_buf, reqc->line_len);
if (rc)
goto free_ctxt;
goto send_resp_code;

ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR);
/* Count the producers matched the regex */
for (prdcr = ldmsd_prdcr_first(); prdcr; prdcr = ldmsd_prdcr_next(prdcr)) {
rc = regexec(&regex, prdcr->obj.name, 0, NULL, 0);
if (rc)
continue;
(void) __prdcr_stream_status(prdcr, reqc, ctxt); /* Ignore the failed one */
count++;
if (prdcr->conn_state != LDMSD_PRDCR_STATE_CONNECTED)
continue;
pname = malloc(sizeof(*pname));
if (!pname) {
rc = ENOMEM;
goto free_ctxt;
}
pname->str = strdup(prdcr->obj.name);
if (!pname->str) {
rc = ENOMEM;
goto free_ctxt;
}
TAILQ_INSERT_TAIL(&ctxt->prdcr_list, pname, entry);
}

/* Forward the request to the connected producers */
pname = TAILQ_FIRST(&ctxt->prdcr_list);
prdcr = ldmsd_prdcr_first();
while (pname && prdcr) {
if (0 != strcmp(pname->str, prdcr->obj.name))
goto next_prdcr;

nxt_pname = TAILQ_NEXT(pname, entry);
rc = __prdcr_stream_status(prdcr, reqc, ctxt, pname);
if (rc) {
/* Failed to forward the request.
* Remove the producer name from the list
*/
TAILQ_REMOVE(&ctxt->prdcr_list, pname, entry);
free(pname->str);
free(pname);
}
pname = nxt_pname;

next_prdcr:
prdcr = ldmsd_prdcr_next(prdcr);
}

ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR);
ctxt->all = 1;
regfree(&regex);
/* Don't reply now. LDMSD will reply when receiving the response from the producers. */
if (0 == count) {
if (TAILQ_EMPTY(&ctxt->prdcr_list)) {
snprintf(reqc->line_buf, reqc->line_len, "No matched producers");
rc = ENOENT;
goto send_reply;
reqc->errcode = ENOENT;
rc = 0;
goto free_ctxt;
}
return 0;

send_resp_code:
reqc->errcode = rc;
free_ctxt:
free(ctxt);
send_reply:
reqc->errcode = rc;
ldmsd_send_req_response(reqc, reqc->line_buf);
free(prdcr_regex);
return 0;
return rc;
}

int __prdcr_status_json_obj(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_t prdcr, int prdcr_cnt)
Expand Down
2 changes: 1 addition & 1 deletion ldms/src/ldmsd/ldmsd_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ int __publisher_json(struct buf_s *buf, ldmsd_stream_publisher_t p)
{
int rc;
rc = buf_printf(buf, "\"%s\":{"
"\"info\":",
"\"recv\":",
p->p_name);
if (rc)
return rc;
Expand Down

0 comments on commit 46ab920

Please sign in to comment.