Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi Instance Plugins #1324

Closed
wants to merge 13 commits into from
467 changes: 270 additions & 197 deletions ldms/man/ldmsd_controller.man

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion ldms/python/ldms.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3875,7 +3875,12 @@ cdef class StreamData(object):

cdef int __stream_client_cb(ldms_stream_event_t ev, void *arg) with gil:
cdef StreamClient c = <StreamClient>arg
cdef StreamData sdata = StreamData.from_ldms_stream_event(PTR(ev))
cdef StreamData sdata

if ev.type != LDMS_STREAM_EVENT_RECV:
return 0

sdata = StreamData.from_ldms_stream_event(PTR(ev))
if c.cb:
c.cb(c, sdata, c.cb_arg)
else:
Expand Down
11 changes: 6 additions & 5 deletions ldms/python/ldmsd/ldmsd_controller
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ class LdmsdCmdParser(cmd.Cmd):
"""
Load a plugin at the Aggregator/Producer
Parameters:
name= The plugin name
name= The name of the plugin instance (user-defined, e.g. p0).
plugin= The plugin to load (e.g. meminfo, store_sos).
"""
arg = self.handle_args('load', arg)
if arg:
Expand Down Expand Up @@ -1628,7 +1629,7 @@ class LdmsdCmdParser(cmd.Cmd):
"""
Unload the plugin
Parameters:
name= The plugin name
name= The name of the plugin instance to terminate.
"""
arg = self.handle_args('term', arg)
if arg:
Expand All @@ -1643,7 +1644,7 @@ class LdmsdCmdParser(cmd.Cmd):
"""
Send a configuration command to the specified plugin.
Parameters:
name= The plugin name
name= The plugin instance name
... Plugin specific attr=value tuples
"""
arg = self.handle_args('config', arg)
Expand All @@ -1659,7 +1660,7 @@ class LdmsdCmdParser(cmd.Cmd):
"""
Start a sampler plugin
Parameters:
name= The plugin name
name= The sampler plugin instance name
interval= The sample interval in microseconds
[offset=] Optional offset (shift) from the sample mark in microseconds.
Offset can be positive or negative with magnitude up to 1/2
Expand All @@ -1680,7 +1681,7 @@ class LdmsdCmdParser(cmd.Cmd):
"""
Stop a sampler plugin
Parameters:
name= The plugin name
name= The sampler plugin instance name
"""
arg = self.handle_args('stop', arg)
if arg:
Expand Down
9 changes: 9 additions & 0 deletions ldms/src/core/ldms.h
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,9 @@ enum ldms_stream_event_type {
LDMS_STREAM_EVENT_RECV, /* stream data received */
LDMS_STREAM_EVENT_SUBSCRIBE_STATUS, /* reporting subscription status */
LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS, /* reporting unsubscription status */
LDMS_STREAM_EVENT_CLOSE, /* reporting stream client close event.
* This is the last event to deliver from a
* client. */
};

/* For stream data delivery to the application */
Expand All @@ -1138,12 +1141,18 @@ struct ldms_stream_return_status_s {
int status;
};

/* For stream close event */
struct ldms_stream_close_event_s {
ldms_stream_client_t client;
};

typedef struct ldms_stream_event_s {
ldms_t r; /* rail */
enum ldms_stream_event_type type;
union {
struct ldms_stream_recv_data_s recv;
struct ldms_stream_return_status_s status;
struct ldms_stream_close_event_s close;
};
} *ldms_stream_event_t;

Expand Down
68 changes: 61 additions & 7 deletions ldms/src/core/ldms_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ TAILQ_HEAD(, ldms_stream_client_s) __regex_client_tq = TAILQ_HEAD_INITIALIZER(__

static uint64_t stream_gn = 0;

static pthread_mutex_t __stream_close_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t __stream_close_cond = PTHREAD_COND_INITIALIZER;
static pthread_t __stream_close_thread;
static TAILQ_HEAD(, ldms_stream_client_s)
__stream_close_tq = TAILQ_HEAD_INITIALIZER(__stream_close_tq);

int __rail_rep_send_raw(struct ldms_rail_ep_s *rep, void *data, int len);

/*
Expand Down Expand Up @@ -253,6 +259,8 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg)
ldms_rail_t r;
int ep_idx;
int rc;
if (ev->type == LDMS_STREAM_EVENT_CLOSE)
return 0;
assert( ev->type == LDMS_STREAM_EVENT_RECV );
if (!XTYPE_IS_RAIL(ev->recv.client->x->xtype))
return ENOTSUP;
Expand Down Expand Up @@ -964,7 +972,12 @@ void ldms_stream_close(ldms_stream_client_t c)
ref_put(&c->ref, "__regex_client_tq");
}
__STREAM_UNLOCK();
ref_put(&c->ref, "init");

/* reuse the c->entry for 'close' event queing */
pthread_mutex_lock(&__stream_close_mutex);
TAILQ_INSERT_TAIL(&__stream_close_tq, c, entry);
pthread_cond_signal(&__stream_close_cond);
pthread_mutex_unlock(&__stream_close_mutex);
}

struct __sub_req_ctxt_s {
Expand Down Expand Up @@ -1366,7 +1379,7 @@ void __stream_reply_recv(ldms_t x, int cmd, struct ldms_reply *reply)
__process_stream_subunsub_reply(x, reply, LDMS_STREAM_EVENT_SUBSCRIBE_STATUS);
break;
case LDMS_CMD_STREAM_UNSUB_REPLY:
__process_stream_subunsub_reply(x, reply, LDMS_STREAM_EVENT_SUBSCRIBE_STATUS);
__process_stream_subunsub_reply(x, reply, LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS);
break;
default:
assert(0 == "Unexpected reply");
Expand Down Expand Up @@ -2032,12 +2045,53 @@ int ldms_stream_publish_file(ldms_t x, const char *stream_name,
return rc;
}

static void __ldms_stream_init();

static void *__stream_close_proc(void *arg)
{
struct ldms_stream_client_s *c;
struct ldms_stream_event_s ev;

pthread_atfork(NULL, NULL, __ldms_stream_init); /* re-initialize at fork */

pthread_mutex_lock(&__stream_close_mutex);
loop:
c = TAILQ_FIRST(&__stream_close_tq);
if (!c) {
pthread_cond_wait(&__stream_close_cond, &__stream_close_mutex);
goto loop;
}
TAILQ_REMOVE(&__stream_close_tq, c, entry);
pthread_mutex_unlock(&__stream_close_mutex);

ev.r = c->x;
ev.type = LDMS_STREAM_EVENT_CLOSE;
ev.close.client = c;

ref_get(&c->ref, "cb");
c->cb_fn(&ev, c->cb_arg);
ref_put(&c->ref, "cb");

ref_put(&c->ref, "init");

pthread_mutex_lock(&__stream_close_mutex);
goto loop;

return NULL;
}

__attribute__((constructor))
static void __ldms_stream_init()
{
static int once = 0;
if (once)
return ;
__ldms_stream_log = ovis_log_register("ldms.stream", "LDMS Stream Library");
once = 1;
int rc;
pthread_mutex_init(&__stream_close_mutex, NULL);
pthread_cond_init(&__stream_close_cond, NULL);
if (!__ldms_stream_log)
__ldms_stream_log = ovis_log_register("ldms.stream", "LDMS Stream Library");
rc = pthread_create(&__stream_close_thread, NULL, __stream_close_proc, NULL);
if (rc) {
__ERROR("cannot create ldms_stream_close thread, rc: %d, errno: %d\n", rc, errno);
} else {
pthread_setname_np(__stream_close_thread, "ldms_strm_cls");
}
}
Loading