diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index cae205dca..65a4dc25e 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -1896,20 +1896,18 @@ static int prdcr_unsubscribe_regex_handler(ldmsd_req_ctxt_t reqc) return 0; } -struct prdcr_stream_dir_regex_ctxt { - int sent_req; /* Number of producers the STREAM_INFO_REG sent to */ - int all; /* 1 if LDMSD sent a request to all matched producers */ - int recv_resp; /* Number of responses received by LDMSD */ +struct pstream_status_regex_ctxt { json_entity_t stream_dict; + struct ldmsd_str_list prdcr_list; pthread_mutex_t lock; }; -struct prdcr_stream_dir_ctxt { - const char *prdcr_name; - struct prdcr_stream_dir_regex_ctxt *base; +struct prdcr_stream_status_ctxt { + struct ldmsd_str_ent *pname; + struct pstream_status_regex_ctxt *base; }; -static int __process_stream_status(struct prdcr_stream_dir_ctxt *ctxt, char *data, size_t data_len) +static int __process_stream_status(struct prdcr_stream_status_ctxt *ctxt, char *data, size_t data_len) { int rc = 0; json_parser_t parser; @@ -1927,19 +1925,21 @@ static int __process_stream_status(struct prdcr_stream_dir_ctxt *ctxt, char *dat for (a = json_attr_first(d); a; a = json_attr_next(a)) { stream_name = json_attr_name(a)->str; s = json_attr_value(a); + pthread_mutex_lock(&ctxt->base->lock); ss = json_value_find(ctxt->base->stream_dict, stream_name); if (!ss) { ss = json_entity_new(JSON_DICT_VALUE); json_attr_add(ctxt->base->stream_dict, stream_name, ss); } - assert(!json_value_find(ss, ctxt->prdcr_name)); /* Receive stream_dir from this producer twice */ + pthread_mutex_unlock(&ctxt->base->lock); + assert(!json_value_find(ss, ctxt->pname->str)); /* Receive stream_dir from this producer twice */ json_attr_rem(s, "publishers"); /* We need to know only the overall statistic on sampler. */ p = json_entity_copy(s); if (!p) { rc = ENOMEM; goto free_json; } - json_attr_add(ss, ctxt->prdcr_name, p); + json_attr_add(ss, ctxt->pname->str, p); } free_json: json_entity_free(d); @@ -1949,24 +1949,25 @@ static int __process_stream_status(struct prdcr_stream_dir_ctxt *ctxt, char *dat static int __on_stream_status_resp(ldmsd_req_cmd_t rcmd) { int rc = 0; - struct prdcr_stream_dir_ctxt *ctxt = rcmd->ctxt; - struct prdcr_stream_dir_regex_ctxt *base = ctxt->base; + struct prdcr_stream_status_ctxt *ctxt = rcmd->ctxt; + struct pstream_status_regex_ctxt *base = ctxt->base; - __sync_fetch_and_add(&base->recv_resp, 1); ldmsd_req_hdr_t resp = (ldmsd_req_hdr_t)(rcmd->reqc->req_buf); ldmsd_req_attr_t attr = ldmsd_first_attr(resp); assert(attr->attr_id == LDMSD_ATTR_JSON); - pthread_mutex_lock(&ctxt->base->lock); rc = __process_stream_status(ctxt, (char*)attr->attr_value, attr->attr_len); - pthread_mutex_unlock(&ctxt->base->lock); + TAILQ_REMOVE(&base->prdcr_list, ctxt->pname, entry); + free(ctxt->pname->str); + free(ctxt->pname); free(ctxt); - if (!base->all) - goto out; - if (base->sent_req != base->recv_resp) + if (!TAILQ_EMPTY(&base->prdcr_list)) goto out; - /* Respond to the client */ + /* + * We have received all responses. + * Send the response back to the client + */ jbuf_t jb; struct ldmsd_req_attr_s _a; jb = json_entity_dump(NULL, base->stream_dict); @@ -1980,22 +1981,24 @@ static int __on_stream_status_resp(ldmsd_req_cmd_t rcmd) uint32_t discrim = 0; ldmsd_append_reply(rcmd->org_reqc, (char*)&(discrim), sizeof(discrim), LDMSD_REQ_EOM_F); + json_entity_free(base->stream_dict); free(base); out: return rc; } static int __prdcr_stream_status(ldmsd_prdcr_t prdcr, ldmsd_req_ctxt_t oreqc, - struct prdcr_stream_dir_regex_ctxt *base) + struct pstream_status_regex_ctxt *base, + struct ldmsd_str_ent *pname) { int rc; ldmsd_req_cmd_t rcmd; - struct prdcr_stream_dir_ctxt *ctxt; + struct prdcr_stream_status_ctxt *ctxt; ctxt = malloc(sizeof(*ctxt)); if (!ctxt) return ENOMEM; - ctxt->prdcr_name = prdcr->obj.name; + ctxt->pname = pname; ctxt->base = base; ldmsd_prdcr_lock(prdcr); @@ -2010,7 +2013,6 @@ static int __prdcr_stream_status(ldmsd_prdcr_t prdcr, ldmsd_req_ctxt_t oreqc, rc = ldmsd_req_cmd_attr_term(rcmd); if (rc) goto rcmd_err; - __sync_fetch_and_add(&base->sent_req, 1); } ldmsd_prdcr_unlock(prdcr); return 0; @@ -2030,58 +2032,96 @@ int prdcr_stream_status_handler(ldmsd_req_ctxt_t reqc) struct ldmsd_sec_ctxt sctxt; regex_t regex; ldmsd_prdcr_t prdcr; - struct prdcr_stream_dir_regex_ctxt *ctxt; - int count = 0; + struct pstream_status_regex_ctxt *ctxt; + struct ldmsd_str_ent *pname, *nxt_pname; prdcr_regex = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_REGEX); if (!prdcr_regex) { rc = EINVAL; cnt = snprintf(reqc->line_buf, reqc->line_len, "The attribute 'regex' is required by prdcr_stop_regex."); - goto send_reply; + goto send_resp_code; } ldmsd_req_ctxt_sec_get(reqc, &sctxt); ctxt = calloc(1, sizeof(*ctxt)); if (!ctxt) return ENOMEM; + TAILQ_INIT(&ctxt->prdcr_list); + ctxt->stream_dict = json_entity_new(JSON_DICT_VALUE); if (!ctxt->stream_dict) { rc = ENOMEM; - goto free_ctxt; + goto send_resp_code; } pthread_mutex_init(&ctxt->lock, NULL); rc = ldmsd_compile_regex(®ex, prdcr_regex, reqc->line_buf, reqc->line_len); if (rc) - goto free_ctxt; + goto send_resp_code; ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR); + /* Count the producers matched the regex */ for (prdcr = ldmsd_prdcr_first(); prdcr; prdcr = ldmsd_prdcr_next(prdcr)) { rc = regexec(®ex, prdcr->obj.name, 0, NULL, 0); if (rc) continue; - (void) __prdcr_stream_status(prdcr, reqc, ctxt); /* Ignore the failed one */ - count++; + if (prdcr->conn_state != LDMSD_PRDCR_STATE_CONNECTED) + continue; + pname = malloc(sizeof(*pname)); + if (!pname) { + rc = ENOMEM; + goto free_ctxt; + } + pname->str = strdup(prdcr->obj.name); + if (!pname->str) { + rc = ENOMEM; + goto free_ctxt; + } + TAILQ_INSERT_TAIL(&ctxt->prdcr_list, pname, entry); + } + + /* Forward the request to the connected producers */ + pname = TAILQ_FIRST(&ctxt->prdcr_list); + prdcr = ldmsd_prdcr_first(); + while (pname && prdcr) { + if (0 != strcmp(pname->str, prdcr->obj.name)) + goto next_prdcr; + + nxt_pname = TAILQ_NEXT(pname, entry); + rc = __prdcr_stream_status(prdcr, reqc, ctxt, pname); + if (rc) { + /* Failed to forward the request. + * Remove the producer name from the list + */ + TAILQ_REMOVE(&ctxt->prdcr_list, pname, entry); + free(pname->str); + free(pname); + } + pname = nxt_pname; + + next_prdcr: + prdcr = ldmsd_prdcr_next(prdcr); } + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR); - ctxt->all = 1; regfree(®ex); /* Don't reply now. LDMSD will reply when receiving the response from the producers. */ - if (0 == count) { + if (TAILQ_EMPTY(&ctxt->prdcr_list)) { snprintf(reqc->line_buf, reqc->line_len, "No matched producers"); - rc = ENOENT; - goto send_reply; + reqc->errcode = ENOENT; + rc = 0; + goto free_ctxt; } return 0; +send_resp_code: + reqc->errcode = rc; free_ctxt: free(ctxt); -send_reply: - reqc->errcode = rc; ldmsd_send_req_response(reqc, reqc->line_buf); free(prdcr_regex); - return 0; + return rc; } int __prdcr_status_json_obj(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_t prdcr, int prdcr_cnt) diff --git a/ldms/src/ldmsd/ldmsd_stream.c b/ldms/src/ldmsd/ldmsd_stream.c index 810634a02..717b75f32 100644 --- a/ldms/src/ldmsd/ldmsd_stream.c +++ b/ldms/src/ldmsd/ldmsd_stream.c @@ -915,7 +915,7 @@ int __publisher_json(struct buf_s *buf, ldmsd_stream_publisher_t p) { int rc; rc = buf_printf(buf, "\"%s\":{" - "\"info\":", + "\"recv\":", p->p_name); if (rc) return rc;