Skip to content

Commit

Permalink
Not destroy deleting sets that a client has reference on it
Browse files Browse the repository at this point in the history
When a set delete timeout has reached, delete_proc() does not destroy
the set that one of its push or lookup peer is connected. This prevents
the race between delete_proc() and the SET_DELETE reply from the client.
  • Loading branch information
nichamon authored and tom95858 committed Sep 28, 2022
1 parent ac33eff commit c7b880a
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 33 deletions.
169 changes: 137 additions & 32 deletions ldms/src/core/ldms.c
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ static void __destroy_set_no_lock(void *v)
}
pthread_mutex_unlock(&x->lock);
}

rbt_del(&__del_tree, &set->del_node);
mm_free(set->meta);
__ldms_set_info_delete(&set->local_info);
Expand All @@ -821,10 +822,6 @@ void ldms_set_delete(ldms_set_t s)
{
ldms_t x;
struct ldms_set *__set;
struct rbt lookup_coll, push_coll;
struct rbn *rbn;
struct ldms_push_peer *pp;
struct ldms_lookup_peer *lp;

__ldms_set_tree_lock();
__set = __ldms_set_by_id(s->set_id);
Expand All @@ -840,43 +837,25 @@ void ldms_set_delete(ldms_set_t s)
rbt_del(&__id_tree, &s->id_node);
__ldms_set_tree_unlock();

/* NOTE: We can cut down the entire tree at once because the
* reader doesn't hold on to the rbn after mutex unlock.
* In other words, the reader always lock, iterate, then unlock.
/* NOTE: We will clean up the push and lookup collections
* when we destroy the set. While we wait for the
* SET_DELETE replies from the peers, we iterate
* through the collections to check if there are any peers
* that are connected or not.
*
* If no peers are connected, the set will be destroyed
* regardless of its reference count. See delete_proc().
*/
pthread_mutex_lock(&s->lock);
push_coll = s->push_coll;
s->push_coll.root = NULL;
lookup_coll = s->lookup_coll;
s->lookup_coll.root = NULL;
x = s->xprt;
s->xprt = NULL;
pthread_mutex_unlock(&s->lock);
if (x)
ldms_xprt_put(x);

/* Clean up the push peer collection */
while ((rbn = rbt_min(&push_coll))) {
rbt_del(&push_coll, rbn);
pp = container_of(rbn, struct ldms_push_peer, rbn);
ldms_xprt_put(pp->xprt);
free(pp);
}

/* Notify downstream transports about the set deletion. */
__ldms_dir_del_set(s);

/*
* Clean up the lookup peer collection and
* remove set from the transport's set collection.
*/
while ((rbn = rbt_min(&lookup_coll))) {
rbt_del(&lookup_coll, rbn);
lp = container_of(rbn, struct ldms_lookup_peer, rbn);
free(lp);
}


/* Add the set to the delete tree with the current timestamp */
s->del_time = time(NULL);
rbn_init(&s->del_node, &s->del_time);
Expand Down Expand Up @@ -3750,12 +3729,17 @@ int ldms_mval_parse_scalar(ldms_mval_t v, enum ldms_value_type vt, const char *s
#define DELETE_TIMEOUT (60) /* 1 minute */
#define DELETE_CHECK (15)

extern int ldms_xprt_connected(struct ldms_xprt *x);
static void *delete_proc(void *arg)
{
struct rbn *rbn;
struct rbn *rbn, *prev_rbn, *xrbn;
struct ldms_set *set;
struct ldms_lookup_peer *lp;
struct ldms_push_peer *pp;
ldms_name_t name;
time_t dur;
ldms_t x;
struct ldms_context *ctxt;
char *to = getenv("LDMS_DELETE_TIMEOUT");
int timeout = (to ? atoi(to) : DELETE_TIMEOUT);
if (timeout <= DELETE_CHECK)
Expand All @@ -3769,6 +3753,7 @@ static void *delete_proc(void *arg)
pthread_mutex_lock(&__del_tree_lock);
rbn = rbt_max(&__del_tree);
while (rbn) {
prev_rbn = rbn_pred(rbn);
set = container_of(rbn, struct ldms_set, del_node);
name = get_instance_name(set->meta);
dur = time(NULL) - set->del_time;
Expand All @@ -3779,13 +3764,133 @@ static void *delete_proc(void *arg)
fflush(stderr);
if (dur < timeout)
break;

/*
* Look for a connected push/lookup peer.
* If there is a connected peer, we skip the set
* and wait for the SET_DELETE reply from the connected peers.
*
* This prevents the race between the SET_DELETE timeout
* and the SET_DELETE reply.
*/
pthread_mutex_lock(&set->lock);
xrbn = rbt_min(&set->lookup_coll);
while (xrbn) {
lp = container_of(xrbn, struct ldms_lookup_peer, rbn);
if (ldms_xprt_connected(lp->xprt)) {
/*
* A push peer is connected... skip the set.
*/
pthread_mutex_unlock(&set->lock);
goto next;
}
xrbn = rbn_succ(xrbn);
}
xrbn = rbt_min(&set->push_coll);
while (xrbn) {
pp = container_of(xrbn, struct ldms_push_peer, rbn);
if (ldms_xprt_connected(pp->xprt)) {
/*
* A lookup peer is connected ... skip the set.
*/
pthread_mutex_unlock(&set->lock);
goto next;
}
}
pthread_mutex_unlock(&set->lock);
fprintf(stderr,
"Deleting dangling set %s with reference "
"count %d, waited %jd seconds\n",
name->name, set->ref.ref_count, dur);
ref_dump(&set->ref, __func__, stderr);

/*
* Since all peers have disconnected, the push and lookup
* collections should be empty at this point.
*
* Below logic is for a corner case and makes sure
* that we are not leaking and leaving the transport dangling.
*/
/* Clean up the push peer collection */
while ((rbn = rbt_min(&set->push_coll))) {
rbt_del(&set->push_coll, rbn);
pp = container_of(rbn, struct ldms_push_peer, rbn);
if (!pp->xprt)
goto free_pp;
x = pp->xprt;
pthread_mutex_lock(&x->lock);
TAILQ_FOREACH(ctxt, &x->ctxt_list, link) {
switch (ctxt->type) {
case LDMS_CONTEXT_LOOKUP_READ:
if (ctxt->lu_read.s == set)
ctxt->lu_read.s = NULL;
break;
case LDMS_CONTEXT_UPDATE:
case LDMS_CONTEXT_UPDATE_META:
if (ctxt->update.s == set)
ctxt->update.s = NULL;
break;
case LDMS_CONTEXT_REQ_NOTIFY:
if (ctxt->req_notify.s == set)
ctxt->req_notify.s = NULL;
break;
case LDMS_CONTEXT_SET_DELETE:
if (ctxt->set_delete.s == set)
ctxt->set_delete.s = NULL;
break;
default:
break;
}
}
pthread_mutex_unlock(&x->lock);
ldms_xprt_put(x);
free_pp:
free(pp);
}

/*
* Clean up the lookup peer collection and
* remove set from the transport's set collection.
*/
while ((rbn = rbt_min(&set->lookup_coll))) {
rbt_del(&set->lookup_coll, rbn);
lp = container_of(rbn, struct ldms_lookup_peer, rbn);
if (!lp->xprt)
goto free_lp;
x = lp->xprt;
pthread_mutex_lock(&x->lock);
TAILQ_FOREACH(ctxt, &x->ctxt_list, link) {
switch (ctxt->type) {
case LDMS_CONTEXT_LOOKUP_READ:
if (ctxt->lu_read.s == set)
ctxt->lu_read.s = NULL;
break;
case LDMS_CONTEXT_UPDATE:
case LDMS_CONTEXT_UPDATE_META:
if (ctxt->update.s == set)
ctxt->update.s = NULL;
break;
case LDMS_CONTEXT_REQ_NOTIFY:
if (ctxt->req_notify.s == set)
ctxt->req_notify.s = NULL;
break;
case LDMS_CONTEXT_SET_DELETE:
if (ctxt->set_delete.s == set)
ctxt->set_delete.s = NULL;
break;
default:
break;
}
}
pthread_mutex_unlock(&x->lock);
ldms_xprt_put(x);
free_lp:
free(lp);
}

__destroy_set_no_lock(set);
rbn = rbt_max(&__del_tree);
next:
rbn = prev_rbn;
fflush(stderr);
}
pthread_mutex_unlock(&__del_tree_lock);
Expand Down
3 changes: 2 additions & 1 deletion ldms/src/core/ldms_xprt.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ ldms_t ldms_xprt_get(ldms_t x)
return x;
}

static int ldms_xprt_connected(struct ldms_xprt *x)
int ldms_xprt_connected(struct ldms_xprt *x)
{
assert(x && x->ref_count);
return (x->disconnected == 0 && x->zap_ep && zap_ep_connected(x->zap_ep));
}

LIST_HEAD(xprt_list, ldms_xprt) xprt_list;
ldms_t ldms_xprt_first()
{
Expand Down

0 comments on commit c7b880a

Please sign in to comment.