diff --git a/ldms/src/core/ldms.c b/ldms/src/core/ldms.c index 943baba63..827db985e 100644 --- a/ldms/src/core/ldms.c +++ b/ldms/src/core/ldms.c @@ -800,6 +800,7 @@ static void __destroy_set_no_lock(void *v) } pthread_mutex_unlock(&x->lock); } + rbt_del(&__del_tree, &set->del_node); mm_free(set->meta); __ldms_set_info_delete(&set->local_info); @@ -821,10 +822,6 @@ void ldms_set_delete(ldms_set_t s) { ldms_t x; struct ldms_set *__set; - struct rbt lookup_coll, push_coll; - struct rbn *rbn; - struct ldms_push_peer *pp; - struct ldms_lookup_peer *lp; __ldms_set_tree_lock(); __set = __ldms_set_by_id(s->set_id); @@ -840,43 +837,25 @@ void ldms_set_delete(ldms_set_t s) rbt_del(&__id_tree, &s->id_node); __ldms_set_tree_unlock(); - /* NOTE: We can cut down the entire tree at once because the - * reader doesn't hold on to the rbn after mutex unlock. - * In other words, the reader always lock, iterate, then unlock. + /* NOTE: We will clean up the push and lookup collections + * when we destroy the set. While we wait for the + * SET_DELETE replies from the peers, we iterate + * through the collections to check if there are any peers + * that are connected or not. + * + * If no peers are connected, the set will be destroyed + * regardless of its reference count. See delete_proc(). */ pthread_mutex_lock(&s->lock); - push_coll = s->push_coll; - s->push_coll.root = NULL; - lookup_coll = s->lookup_coll; - s->lookup_coll.root = NULL; x = s->xprt; s->xprt = NULL; pthread_mutex_unlock(&s->lock); if (x) ldms_xprt_put(x); - /* Clean up the push peer collection */ - while ((rbn = rbt_min(&push_coll))) { - rbt_del(&push_coll, rbn); - pp = container_of(rbn, struct ldms_push_peer, rbn); - ldms_xprt_put(pp->xprt); - free(pp); - } - /* Notify downstream transports about the set deletion. */ __ldms_dir_del_set(s); - /* - * Clean up the lookup peer collection and - * remove set from the transport's set collection. - */ - while ((rbn = rbt_min(&lookup_coll))) { - rbt_del(&lookup_coll, rbn); - lp = container_of(rbn, struct ldms_lookup_peer, rbn); - free(lp); - } - - /* Add the set to the delete tree with the current timestamp */ s->del_time = time(NULL); rbn_init(&s->del_node, &s->del_time); @@ -3750,12 +3729,17 @@ int ldms_mval_parse_scalar(ldms_mval_t v, enum ldms_value_type vt, const char *s #define DELETE_TIMEOUT (60) /* 1 minute */ #define DELETE_CHECK (15) +extern int ldms_xprt_connected(struct ldms_xprt *x); static void *delete_proc(void *arg) { - struct rbn *rbn; + struct rbn *rbn, *prev_rbn, *xrbn; struct ldms_set *set; + struct ldms_lookup_peer *lp; + struct ldms_push_peer *pp; ldms_name_t name; time_t dur; + ldms_t x; + struct ldms_context *ctxt; char *to = getenv("LDMS_DELETE_TIMEOUT"); int timeout = (to ? atoi(to) : DELETE_TIMEOUT); if (timeout <= DELETE_CHECK) @@ -3769,6 +3753,7 @@ static void *delete_proc(void *arg) pthread_mutex_lock(&__del_tree_lock); rbn = rbt_max(&__del_tree); while (rbn) { + prev_rbn = rbn_pred(rbn); set = container_of(rbn, struct ldms_set, del_node); name = get_instance_name(set->meta); dur = time(NULL) - set->del_time; @@ -3779,13 +3764,133 @@ static void *delete_proc(void *arg) fflush(stderr); if (dur < timeout) break; + + /* + * Look for a connected push/lookup peer. + * If there is a connected peer, we skip the set + * and wait for the SET_DELETE reply from the connected peers. + * + * This prevents the race between the SET_DELETE timeout + * and the SET_DELETE reply. + */ + pthread_mutex_lock(&set->lock); + xrbn = rbt_min(&set->lookup_coll); + while (xrbn) { + lp = container_of(xrbn, struct ldms_lookup_peer, rbn); + if (ldms_xprt_connected(lp->xprt)) { + /* + * A push peer is connected... skip the set. + */ + pthread_mutex_unlock(&set->lock); + goto next; + } + xrbn = rbn_succ(xrbn); + } + xrbn = rbt_min(&set->push_coll); + while (xrbn) { + pp = container_of(xrbn, struct ldms_push_peer, rbn); + if (ldms_xprt_connected(pp->xprt)) { + /* + * A lookup peer is connected ... skip the set. + */ + pthread_mutex_unlock(&set->lock); + goto next; + } + } + pthread_mutex_unlock(&set->lock); fprintf(stderr, "Deleting dangling set %s with reference " "count %d, waited %jd seconds\n", name->name, set->ref.ref_count, dur); ref_dump(&set->ref, __func__, stderr); + + /* + * Since all peers have disconnected, the push and lookup + * collections should be empty at this point. + * + * Below logic is for a corner case and makes sure + * that we are not leaking and leaving the transport dangling. + */ + /* Clean up the push peer collection */ + while ((rbn = rbt_min(&set->push_coll))) { + rbt_del(&set->push_coll, rbn); + pp = container_of(rbn, struct ldms_push_peer, rbn); + if (!pp->xprt) + goto free_pp; + x = pp->xprt; + pthread_mutex_lock(&x->lock); + TAILQ_FOREACH(ctxt, &x->ctxt_list, link) { + switch (ctxt->type) { + case LDMS_CONTEXT_LOOKUP_READ: + if (ctxt->lu_read.s == set) + ctxt->lu_read.s = NULL; + break; + case LDMS_CONTEXT_UPDATE: + case LDMS_CONTEXT_UPDATE_META: + if (ctxt->update.s == set) + ctxt->update.s = NULL; + break; + case LDMS_CONTEXT_REQ_NOTIFY: + if (ctxt->req_notify.s == set) + ctxt->req_notify.s = NULL; + break; + case LDMS_CONTEXT_SET_DELETE: + if (ctxt->set_delete.s == set) + ctxt->set_delete.s = NULL; + break; + default: + break; + } + } + pthread_mutex_unlock(&x->lock); + ldms_xprt_put(x); + free_pp: + free(pp); + } + + /* + * Clean up the lookup peer collection and + * remove set from the transport's set collection. + */ + while ((rbn = rbt_min(&set->lookup_coll))) { + rbt_del(&set->lookup_coll, rbn); + lp = container_of(rbn, struct ldms_lookup_peer, rbn); + if (!lp->xprt) + goto free_lp; + x = lp->xprt; + pthread_mutex_lock(&x->lock); + TAILQ_FOREACH(ctxt, &x->ctxt_list, link) { + switch (ctxt->type) { + case LDMS_CONTEXT_LOOKUP_READ: + if (ctxt->lu_read.s == set) + ctxt->lu_read.s = NULL; + break; + case LDMS_CONTEXT_UPDATE: + case LDMS_CONTEXT_UPDATE_META: + if (ctxt->update.s == set) + ctxt->update.s = NULL; + break; + case LDMS_CONTEXT_REQ_NOTIFY: + if (ctxt->req_notify.s == set) + ctxt->req_notify.s = NULL; + break; + case LDMS_CONTEXT_SET_DELETE: + if (ctxt->set_delete.s == set) + ctxt->set_delete.s = NULL; + break; + default: + break; + } + } + pthread_mutex_unlock(&x->lock); + ldms_xprt_put(x); + free_lp: + free(lp); + } + __destroy_set_no_lock(set); - rbn = rbt_max(&__del_tree); + next: + rbn = prev_rbn; fflush(stderr); } pthread_mutex_unlock(&__del_tree_lock); diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index 1fe25b8d8..cc528e859 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -142,11 +142,12 @@ ldms_t ldms_xprt_get(ldms_t x) return x; } -static int ldms_xprt_connected(struct ldms_xprt *x) +int ldms_xprt_connected(struct ldms_xprt *x) { assert(x && x->ref_count); return (x->disconnected == 0 && x->zap_ep && zap_ep_connected(x->zap_ep)); } + LIST_HEAD(xprt_list, ldms_xprt) xprt_list; ldms_t ldms_xprt_first() {