diff --git a/ldms/man/ldmsd_sampler_discovery.man b/ldms/man/ldmsd_sampler_discovery.man new file mode 100644 index 0000000000..94e22424b6 --- /dev/null +++ b/ldms/man/ldmsd_sampler_discovery.man @@ -0,0 +1,245 @@ +\" Manpage for ldmsd_sampler_discovery +.TH man 7 "27 March 2024" "v5" "LDMSD Sampler Discovery man page" + +.\""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""/. +.SH NAME +ldmsd_sampler_disconvery - Manual for LDMSD Sampler Discovery + +.\""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""/. +.SH SYNOPSIS + +**Sampler side Commands** + +.IP \fBadvertise_add +.RI "name=" NAME " xprt=" XPRT " host=" HOST " port=" PORT +.RI "[auth=" AUTH_DOMAIN "]" + +.IP \fBadvertise_start +.RI "name=" NAME + +.IP \fBadvertise_stop +.RI "name=" NAME + +.IP \fBadvertise_del +.RI "name=" NAME + +.IP \fBadvertise_status +.RI "[name=" NAME "]" + +.PP +**Aggregator Side Commands** + +.IP \fBprdcr_listen_add +.RI "name=" NAME " RECONNECT=" INTVL +.RI "[regex=" REGEX "] [rail=" SIZE "] [credits=" BYTES "] [rx_rate=" RATE_LIMIT "]" + +.IP \fBprdcr_listen_start +.RI "name=" NAME + +.IP \fBprdcr_listen_stop +.RI "name=" NAME + +.IP \fBprdcr_listen_del +.RI "name=" NAME + +.IP \fBprdcr_listen_status + +.SH DESCRIPTION + +LDMSD Sampler Discovery is a capability that enables LDMSD automatically add +producers that its hostname matches a given regular expression. The feature +eliminates the need for manual configuration of sampler hostname in the +aggregator configuration file. + +Admins specify the aggregator hostname and the listening port in sampler +configuration via the \fBadvertise_add\fR command and start the advertisement +with the \fBadvertise_start\fR command. The samplers now advertise their +hostname to the aggregator. On the aggregator, admins specify a regular +expression to be matched with sampler hostname via the \fBprdcr_listen_add\fR +command. The \fBprdcr_listen_start\fR command is used to tell the aggregator to +automatically add producers corresponding to a sampler of which the hostname +matches the regular expression. + +The auto-generated producers is of the ‘generated’ type. The producer name is +the same as the name given at the \fBadvertise_add\fR line in the sampler +configuration file. LDMSD automatically starts them; however, admins need to +stop them manually by using the command \fBprdcr_stop\fR or +\fBprdcr_stop_regex\fR. They can be restarted by using the command +\fBprdcr_start\fR or \fBprdcr_start_regex\fR. + +The description for each command and its parameters are as follows. + +**Sampler Side Commands** + +\fBadvertise_add\fR adds a new advertisement. The parameters are: +.RS +.IP \fBname\fR=\fINAME +String of the advertisement name. The aggregator uses the string as the producer name as well. +.IP \fBhost\fR=\fIHOST +Aggregator hostname +.IP \fBxprt\fR=\fIXPRT +Transport to connect to the aggregator +.IP \fBport\fR=\fIPORT +Listen port of the aggregator +.IP \fBreconnect\fR=\fIINTERVAL +Reconnect interval +.IP \fB[auth\fR=\fIAUTH_DOMAIN\fB] +The authentication domain to be used to connect to the aggregator +.RE + +\fBadvertise_start\fR starts an advertisement. The parameters are: +.RS +.IP \fBname\fR=\fINAME +The advertisement name to be started +.RE + + +\fBadvertise_stop\fR stops an advertisement. The parameters are: +.RS +.IP \fBname\fR=\fINAME +The advertisement name to be stopped +.RE + +\fBadvertise_del\fR deletes an advertisement. The parameters are: +.RS +.IP \fBname\fR=\fINAME +The advertisement name to be deleted +.RE + +\fBadvertise_status reports the status of each advertisement. An optional parameter is: +.RS +.IP \fB[name\fR=\fINAME\fB] +Advertisement name +.RE + +.PP +**Aggregator Side commands** + +\fBprdcr_listen_add\fR adds a regular expression to match sampler advertisements. The parameters are: +.RS +.IP \fBname\fR=\fINAME +String of the prdcr_listen name. +.IP \fBreconnect\fR=\fIINTERVAL +Reconnect interval of the auto-generated producers that the hostname matches the regular expression +.IP \fB[regex\fR=\fIREGEX\fB] +Regular expression to match with hostnames in sampler advertisements +.IP \fB[rail\fR=\fIRAIL\fB] +Number of rails +.IP \fB[credit\fR=\fICREDIT\fB] +Receive credits each producer connection accepts in bytes +.IP \fB[rx_rate\fR=\fIRATE\fB] +Receive rate limit each producer connection acceipts +.RE + +\fBprdcr_listen_start\fR starts accepting sampler advertisement with matches hostnames. The parameters are: +.RS +.IP \fBname\fR=\fINAME +Name of prdcr_listen to be started +.RE + +\fBprdcr_listen_stop\fR stops accepting sampler advertisement with matches hostnames. The parameters are: +.RS +.IP \fBname\fR=\fINAME +Name of prdcr_listen to be stopped +.RE + +\fBprdcr_listen_del\fR deletes a regular expression to match hostnames in sampler advertisements. The parameters are: +.RS +.IP \fBname\fR=\fINAME +Name of prdcr_listen to be deleted +.RE + +\fBprdcr_listen_status\fR report the status of each prdcr_listen object. There is no parameter. + +.SH EXAMPLE + +In this example, there are three LDMS daemons running on \fBnode-1\fR, +\fBnode-2\fR, and \fBnode03\fR. LDMSD running on \fBnode-1\fR and \fBnode-2\fR +are sampler daemons, namely \fBsamplerd-1\fR and \fBsamplerd-2\fR. The +aggregator (\fBagg\fR) runs on \fBnode-3\fR. All LDMSD listen on port 411. + +The sampler daemons collect the \fBmeminfo\fR set, and they are configured to +advertise themselves and connect to the aggregator using sock on host +\fBnode-3\fR at port 411. The following are the configuration files of the +\fBsamplerd-1\fR and \fBsamplerd-2\fR. + +.EX +.B +> cat samplerd-1.conf +.RS 4 +# Add and start an advertisement +advertise_add name=samplerd-1 xprt=sock host=node-3 port=411 reconnect=10s +advertise_start name=samplerd-1 +# Load, configure, and start the meminfo plugin +load name=meminfo +config name=meminfo producer=samplerd-1 instance=samplerd-1/meminfo +start name=meminfo interval=1s +.RE + +.B +> cat samplerd-2.conf +.RS 4 +# Add and start an advertisement +advertise_add name=samplerd-2 host=node-3 port=411 reconnect=10s +advertise_start name=samplerd-2 +# Load, configure, and start the meminfo plugin +load name=meminfo +config name=meminfo producer=samplerd-2 instance=samplerd-2/meminfo +start name=meminfo interval=1s +.RE +.EE + +The aggregator is configured to accept advertisements from the sampler daemons +that the hostnames match the regular expressions \fBnode0[1-2]\fR. The +auto-added producers will check for an establish connection with the samplers +every 10 seconds if the connection becomes disconnected. An updater is added to +update the sets of all producers on the aggregators every 10 seconds at the 100 +milliseconds offset. + +.EX +.B +> cat agg.conf +.RS 4 +# Accept advertisements sent from LDMSD running on hostnames matched node-[1-2] +prdcr_listen_add name=computes regex=node-[1-2] reconnect=10s +prdcr_listen_start name=computes +# Add and start an updater +updtr_add name=all_sets interval=1s offset=100ms +updtr_prdcr_add name=all_sets regex=.* +updtr_start name=all +.RE +.EE + +LDMSD provides the command \fBadvertise_status\fR to report the status of +advertisement of a sampler daemon. + +.EX +.B +> ldmsd_controller -x sock -p 10001 -h node-1 +Welcome to the LDMSD control processor +sock:node-1:10001> advertise_status +Name Aggregator Host Aggregator Port Transport Reconnect (us) State +---------------- ---------------- --------------- ------------ --------------- ------------ +samplerd-1 node-3 10001 sock 10000000 CONNECTED +sock:node-1:10001> +.EE + +Similarly, LDMSD provides the command \fBprdcr_listen_status\fR to report the +status of all prdcr_listen objects on an aggregator. The command also reports +the list of auto-added producers corresponding to each prdcr_listen object. + +.EX +.B +> ldmsd_controller -x sock -p 10001 -h node-3 +Welcome to the LDMSD control processor +sock:node-3:10001> prdcr_listen_status +Name Regex Reconnect(us) State +-------------------- --------------- --------------- ---------- +compute node-[1-2] 10000000 running +Producers: samplerd-1, samplerd-2 +sock:node-3:10001> +.EE + +.SH SEE ALSO +.BR ldmsd (8) +.BR ldmsd_controller (8) diff --git a/ldms/python/ldmsd/ldmsd_communicator.py b/ldms/python/ldmsd/ldmsd_communicator.py index 7b74fee459..50a7fe06d6 100644 --- a/ldms/python/ldmsd/ldmsd_communicator.py +++ b/ldms/python/ldmsd/ldmsd_communicator.py @@ -55,7 +55,6 @@ import time import json import errno -from pickle import NONE #:Dictionary contains the cmd_id, required attribute list #:and optional attribute list of each ldmsd commands. For example, @@ -207,7 +206,26 @@ 'opt_attr': ['instance'] }, ##### Authetication. ##### - 'auth_add': {'req_attr': ['name', 'plugin'], 'opt_attr': []}, + 'auth_add': {'req_attr': ['name', 'xprt', 'host', 'port', 'reconnect'], + 'opt_attr' : [ 'auth', 'perm', 'rail', 'credits', 'rx_rate' ] }, + ##### Sampler Discovery ##### + 'advertiser_add': {'req_attr': ['name', 'xprt', 'host', 'port'], + 'opt_attr' : [ 'auth', 'perm', 'interval', + 'reconnect', 'rail', + 'credits', 'rx_rate' ] }, + 'advertiser_del': {'req_attr': ['name'], 'opt_attr': []}, + 'advertiser_start': {'req_attr': ['name'], + 'opt_attr' : ['xprt', 'host', 'port', + 'auth', 'perm', + 'reconnect', 'rail', + 'credits', 'rx_rate' ] }, + 'advertiser_stop': {'req_attr': ['name'], 'opt_attr': []}, + 'prdcr_listen_add': {'req_attr': ['name', 'reconnect'], + 'opt_attr': ['rail', 'credits', 'rx_rate', 'regex']}, + 'prdcr_listen_del': {'req_attr': ['name'], 'opt_attr': []}, + 'prdcr_listen_start': {'req_attr': ['name'], 'opt_attr': []}, + 'prdcr_listen_stop': {'req_attr': ['name'], 'opt_attr': []}, + 'prdcr_listen_status': {'req_attr': [], 'opt_attr': []}, } def fmt_status(msg): @@ -276,7 +294,9 @@ class LDMSD_Req_Attr(object): CREDITS = 39 RX_RATE = 40 SUMMARY = 41 - LAST = 42 + SIZE = 42 + IP = 43 + LAST = 44 NAME_ID_MAP = {'name': NAME, 'interval': INTERVAL, @@ -323,6 +343,8 @@ class LDMSD_Req_Attr(object): 'rx_rate' : RX_RATE, 'reconnect' : INTERVAL, 'summary' : SUMMARY, + 'size' : SIZE, + 'IP' : IP, 'TERMINATING': LAST } @@ -474,6 +496,16 @@ class LDMSD_Request(object): PRDCR_SUBSCRIBE = 0x100 + 9 PRDCR_UNSUBSCRIBE = 0x100 + 10 PRDCR_STREAM_STATUS = 0x100 + 11 + PRDCR_BRDIGE_ADD = 0x100 + 12 + ADVERTISER_ADD = 0x100 + 13 + ADVERTISER_START = 0x100 + 14 + ADVERTISER_STOP = 0x100 + 15 + ADVERTISER_DEL = 0x100 + 16 + PRDCR_LISTEN_ADD = 0x100 + 17 + PRDCR_LISTEN_DEL = 0x100 + 18 + PRDCR_LISTEN_START = 0x100 + 19 + PRDCR_LISTEN_STOP = 0x100 + 20 + PRDCR_LISTEN_STATUS = 0x100 + 21 STRGP_ADD = 0x200 STRGP_DEL = 0x200 + 1 @@ -581,6 +613,16 @@ class LDMSD_Request(object): 'prdcr_unsubscribe': {'id': PRDCR_UNSUBSCRIBE}, 'prdcr_stream_status' : {'id': PRDCR_STREAM_STATUS}, + 'advertiser_add': {'id': ADVERTISER_ADD}, + 'advertiser_start': {'id': ADVERTISER_START}, + 'advertiser_stop': {'id': ADVERTISER_STOP}, + 'advertiser_del': {'id': ADVERTISER_DEL}, + 'prdcr_listen_add': {'id': PRDCR_LISTEN_ADD}, + 'prdcr_listen_start': {'id': PRDCR_LISTEN_START}, + 'prdcr_listen_stop': {'id': PRDCR_LISTEN_STOP}, + 'prdcr_listen_del': {'id': PRDCR_LISTEN_DEL}, + 'prdcr_listen_status': {'id': PRDCR_LISTEN_STATUS}, + 'strgp_add': {'id': STRGP_ADD}, 'strgp_del': {'id': STRGP_DEL}, 'strgp_start': {'id': STRGP_START}, @@ -2065,6 +2107,30 @@ def plugn_start(self, name, interval_us, offset_us=None): self.close() return errno.ENOTCONN, str(e) + def _prdcr_add_attr_prep(self, **kwargs): + attrs = [ + LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.NAME, value=kwargs['name']), + LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.XPRT, value=kwargs['xprt']), + LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.HOST, value=kwargs['host']), + LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.PORT, value=str(kwargs['port'])) + ] + if 'reconnect' in kwargs.keys() and kwargs['reconnect']: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.INTERVAL, value=str(kwargs['reconnect']))) + if 'ptype' in kwargs.keys() and kwargs['ptype']: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.TYPE, value=kwargs['ptype'])) + if 'auth' in kwargs.keys() and kwargs['auth']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.AUTH, value=kwargs['auth'])) + if 'perm' in kwargs.keys() and kwargs['perm']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.PERM, value=str(kwargs['perm']))) + if 'rail' in kwargs.keys() and kwargs['rail']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RAIL, value=str(int(kwargs['rail'])))) + if 'credit' in kwargs.keys() and kwargs['credits']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.CREDITS, value=str(int(kwargs['credits'])))) + if 'rx_rate' in kwargs.keys() and kwargs['rx_rate']: + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RX_RATE, value=str(int(kwargs['rx_rate'])))) + + return attrs + def prdcr_add(self, name, ptype, xprt, host, port, reconnect, auth=None, perm=None, rail=None, credits=None, rx_rate=None): """ @@ -2098,28 +2164,11 @@ def prdcr_add(self, name, ptype, xprt, host, port, reconnect, auth=None, perm=No - status is an errno from the errno module - data is an error message if status != 0 or None """ - attrs = [ - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.NAME, value=name), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.TYPE, value=ptype), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.XPRT, value=xprt), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.HOST, value=host), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.PORT, value=str(port)), - LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.INTERVAL, value=str(reconnect)) - ] - if auth: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.AUTH, value=auth)) - if perm: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.PERM, value=str(perm))) - if rail: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RAIL, value=str(int(rail)))) - if credits: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.CREDITS, value=str(int(credits)))) - if rx_rate: - attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RX_RATE, value=str(int(rx_rate)))) - - req = LDMSD_Request( - command_id=LDMSD_Request.PRDCR_ADD, - attrs=attrs) + args_d = {'name': name, 'ptype': ptype, 'xprt': xprt, 'host': host, 'port': port, + 'reconnect': reconnect, 'auth': auth, 'perm': perm, + 'rail': rail, 'credits': credits, 'rx_rate': rx_rate} + attrs = self._prdcr_add_attr_prep(**args_d) + req = LDMSD_Request( command_id = LDMSD_Request.PRDCR_ADD, attrs = attrs) try: req.send(self) resp = req.receive(self) @@ -2153,7 +2202,7 @@ def prdcr_del(self, name): self.close() return errno.ENOTCONN, str(e) - def prdcr_start(self, name, regex=True, reconnect=None): + def prdcr_start(self, name, regex=True, reconnect=None, **kwargs): """ Start one or more STOPPED producers @@ -2167,6 +2216,9 @@ def prdcr_start(self, name, regex=True, reconnect=None): reconnect - The reconnect interval in microseconds. If not None, this will override the interval specified when the producer was created. Default is None. + kwargs - Additional keyword argument as in prdcr_add(). + It is to support producer creation if it doesn't exist at start. + Currently, only advertiser_start() uses this feature. Returns: A tuple of status, data @@ -2187,6 +2239,9 @@ def prdcr_start(self, name, regex=True, reconnect=None): attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.INTERVAL, value = str(reconnect))) + for key, value in kwargs.items(): + attrs.append(LDMSD_Req_Attr(attr_name = key, value = value)) + req = LDMSD_Request(command_id = cmd_id, attrs = attrs) try: req.send(self) @@ -2394,6 +2449,233 @@ def prdcr_hint_tree(self, name=None): except Exception as e: return errno.ENOTCONN, str(e) + def advertiser_add(self, name, xprt, host, port, reconnect, auth=None, perm=None, + rail=None, credits=None, rx_rate=None): + """ + Add an advertiser. An advertiser sends an advertisement to an aggregator + add it as a producer. Once started, the LDSMD will attempt to + periodically send a connection request until a connection is established. + + An advertiser starts in the STOPPED state. Use the advertiser_start() function + to start the advertiser. + + Parameters: + - The name to give the advertiser. This name must be unique among all advertisement sent to the aggregator. + - The transport type, one of 'sock', 'ugni', 'rdma', or 'fabric' + - The aggregator's hostname + - The aggregator's listening port number + - The reconnect interval in microseconds + + Keyword Parameters: + auth - The authentication demain + perm - The configuration client permission required to + modify the producer configuration. Default is None. + rail - The number of endpoints in a rail. The default is 1. + credits - The send credits of our side of the connection (the daemon we + are controlling). The default is the daemon's default + ('-C' ldmsd option). + rx_rate - The recv rate (bytes/second) limit for this connection. The + default is -1 (unlimited). + + Returns: + A tuple of status, data + - status is an errno from the errno module + - data is an error message if status != 0 or None + """ + args_d = {'name': name, 'xprt': xprt, 'host': host, 'port': port, + 'reconnect': reconnect, 'auth': auth, 'perm': perm, + 'rail': rail, 'credits': credits, 'rx_rate': rx_rate} + attrs = self._prdcr_add_attr_prep(**args_d) + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.TYPE, value="advertise")) + req = LDMSD_Request( command_id = LDMSD_Request.ADVERTISER_ADD, attrs = attrs) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + + def advertiser_start(self, name, xprt=None, host=None, port=None, + reconnect=None, auth=None, perm=None, + rail=None, credits=None, rx_rate=None): + """ + Start an advertiser. If the advertiser does not exist, LDMSD will create it. + In this case, the values of the required attributes in advertiser_add must be given. + + Parameters: + - The name to give the advertiser. This name must be unique among all advertisement sent to the aggregator. + + Keyword Parameters: + xprt - The transport type, one of 'sock', 'ugni', 'rdma', or 'fabric' + host - The aggregator's hostname + port - The aggregator's listening port number + reconnect - The reconnect interval in microseconds + auth - The authentication demain + perm - The configuration client permission required to + modify the producer configuration. Default is None. + rail - The number of endpoints in a rail. The default is 1. + credits - The send credits of our side of the connection (the daemon we + are controlling). The default is the daemon's default + ('-C' ldmsd option). + rx_rate - The recv rate (bytes/second) limit for this connection. The + default is -1 (unlimited). + + Returns: + A tuple of status, data + - status is an errno from the errno module + - data is an error message if status != 0 or None + """ + args_d = {'name': name, 'xprt': xprt, 'host': host, 'port': port, + 'reconnect': reconnect, 'auth': auth, 'perm': perm, + 'rail': rail, 'credits': credits, 'rx_rate': rx_rate} + attrs = self._prdcr_add_attr_prep(**args_d) + attrs.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.TYPE, value="advertise")) + req = LDMSD_Request( command_id = LDMSD_Request.ADVERTISER_START, attrs = attrs) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + + def advertiser_stop(self, name): + req = LDMSD_Request(command_id = LDMSD_Request.ADVERTISER_STOP, + attrs = [LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)]) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + + def advertiser_del(self, name): + req = LDMSD_Request(command_id = LDMSD_Request.ADVERTISER_DEL, + attrs = [LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)]) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + + def prdcr_listen_add(self, name, regex, reconnect, rail=None, credits=None, rx_rate=None): + """ + Tell an aggregator to wait for advertisements from samplers + + The ggregator automatically adds and starts a producer when it receives + an advertisement that the peer (sampler) hostname matches the regular expression. + + Parameters: + - Name of the producer listen + - Regular expression to match sampler hostnames + - The number of rail + - The credits in bytes + - The receive rate limit + + Return: + - status is an errno from the errno module + - data is an error message if status !=0 or None + """ + attr_list = [ LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name), + LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.REGEX, value=regex), + LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.INTERVAL, value=reconnect) + ] + + if rail is not None: + attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RAIL, value=rail)) + if credits is not None: + attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.CREDITS, value=credits)) + if rx_rate is not None: + attr_list.append(LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.RX_RATE, value=rx_rate)) + + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_ADD, + attrs=attr_list) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + + def prdcr_listen_del(self, name): + """ + Delete a producer listen + + Parameter: + - Name of the producer listen to be deleted + + Return: + - Status is an errno from the errno module + - Data is an error message if status != 0 or None + """ + attr_list = [ LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)] + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_DEL, + attrs=attr_list) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + + def prdcr_listen_start(self, name): + """ + Start a producer listen + + Parameter: + - Name of the producer listen to be started + + Return: + - Status is an errno from the errno module + - Data is an error message if status != 0 or None + """ + attr_list = [ LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)] + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_START, + attrs=attr_list) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + + def prdcr_listen_stop(self, name): + """ + Stop a producer listen + + Parameter: + - Name of the producer listen to be stopped + + Return: + - Status is an errno from the errno module + - Data is an error message if status != 0 or None + """ + attr_list = [ LDMSD_Req_Attr(attr_id=LDMSD_Req_Attr.NAME, value=name)] + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_STOP, + attrs=attr_list) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + + def prdcr_listen_status(self): + """ + Get the status of all producer listen + """ + req = LDMSD_Request(command_id=LDMSD_Request.PRDCR_LISTEN_STATUS) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + return errno.ENOTCONN, str(e) + def updtr_add(self, name, interval=1000000, offset=None, push=None, auto=None, perm=None): """ Add an Updater that will periodically update Producer metric sets either diff --git a/ldms/python/ldmsd/ldmsd_controller b/ldms/python/ldmsd/ldmsd_controller index 6cecbb54e8..286d06302f 100755 --- a/ldms/python/ldmsd/ldmsd_controller +++ b/ldms/python/ldmsd/ldmsd_controller @@ -426,6 +426,7 @@ class LdmsdCmdParser(cmd.Cmd): [rx_rate=] The recv rate (bytes/sec) limit for this connection. The default is -1 (unlimited). """ + arg = f"{arg} type=bridge" # Automatically add the producer type arg = self.handle_args('prdcr_add', arg) if arg is None: return @@ -1023,9 +1024,14 @@ class LdmsdCmdParser(cmd.Cmd): print("Name Host Port Transport State Type") print("---------------- ---------------- ------------ ------------ ------------ ----------") for prdcr in producers: + port = prdcr['port'] if prdcr['type'] == 'bridge': continue - print(f"{prdcr['name']:16} {prdcr['host']:16} {prdcr['port']:12} {prdcr['transport']:12} " \ + elif prdcr['type'] == 'generated': + port = '-' + print(f"{prdcr['name']:16} {prdcr['host']:16} " \ + f"{port:12} " \ + f"{prdcr['transport']:12} " \ f"{prdcr['state']:12} {prdcr['type']:10}") for pset in prdcr['sets']: print(" {0:16} {1:16} {2}".format(pset['inst_name'], @@ -2800,6 +2806,242 @@ class LdmsdCmdParser(cmd.Cmd): if rc: print(f"Failed to reset the statistics") + def do_advertiser_add(self, arg): + """ + Add an advertisement of its hostname to an aggregator. This is a part of the sampler discovery feature. + Parameters: + name= A unique name to be used as producer name on the aggregator + xprt= The transport name [sock, rdma, ugni] + host= The aggregator hostname + reconnect= The connection retry interval + [auth=] The authentication method + [perm=] The permission to modify the producer in the future. + [rail=] The number of rail endpoints for the prdcr (default: 1). + [credits=] The send credits our ldmsd (the one we are controlling) + advertises to the prdcr (default: value from ldmsd --credits + option). This limits how much outstanding data our ldmsd + holds for the prdcr. The prdcr drops messages when it does + not have enough send credits. + [rx_rate=] The recv rate (bytes/sec) limit for this connection. The + default is -1 (unlimited). + """ + arg = self.handle_args('advertiser_add', arg) + if arg is None: + return + if arg['reconnect'] is None: + print(f"The attribute 'reconnect' is missing.") + else: + rc, msg = self.comm.advertiser_add(arg['name'], + arg['xprt'], + arg['host'], + arg['port'], + arg['reconnect'], + arg['auth'], + arg['perm'], + arg['rail'], + arg['credits'], + arg['rx_rate']) + if rc: + print(f'Error adding Bridge {arg["name"]}: {msg}') + + def complete_advertiser_add(self, text, line, begidx, endidx): + return self.__complete_attr_list('advertiser_add', text) + + def do_advertiser_del(self, arg): + """ + Delete an advertise from the sampler daemon. The advertise connot be in use. + Parameters: + name = The advertise name + """ + arg = self.handle_args('advertiser_del', arg) + if arg is None: + return + rc, msg = self.comm.advertiser_del(**arg) + if rc: + print(f"Error deleting advertiser {arg['name']}: {msg}") + + def complete_advertiser_del(self, text, line, begidx, endidx): + return self.__complete_attr_list('advertiser_del', text) + + def do_advertiser_start(self, arg): + """ + Start an advertisement + Parameters: + name = The advertise name + """ + arg = self.handle_args('advertiser_start', arg) + if arg is None: + return + rc, msg = self.comm.advertiser_start(**arg) + if rc: + print(f"Error starting advertiser {arg['name']}: {msg}") + + def complete_advertiser_start(self, text, line, begidx, endidx): + return self.__complete_attr_list('advertiser_start', text) + + def do_advertiser_stop(self, arg): + """ + Stop an advertisement + Parameters: + name = The advertise name + """ + arg = self.handle_args('advertiser_stop', arg) + if arg is None: + return + rc, msg = self.comm.advertiser_stop(**arg) + if rc: + print(f"Error stopping advertiser {arg['name']}: {msg}") + + def complete_advertiser_stop(self, text, line, begidx, endidx): + return self.__complete_attr_list('advertiser_stop', text) + + def do_advertiser_status(self, arg): + """ + Get the statuses of advertisers + Parameters: + [name=] Advertiser name + """ + arg = self.handle_args('prdcr_status', arg) + if arg: + rc, msg = self.comm.prdcr_status(arg['name']) + if rc == 0 and msg is not None: + producers = fmt_status(msg) + print("Name Aggregator Host Aggregator Port Transport Reconnect(us) State") + print("---------------- ---------------- --------------- ------------ --------------- ------------") + for prdcr in producers: + if prdcr['type'] != 'advertise': + continue + print(f"{prdcr['name']:16} {prdcr['host']:16} {prdcr['port']:<15} " \ + f"{prdcr['transport']:12} {prdcr['reconnect_us']:15} " \ + f"{prdcr['state']:12}") + else: + print(f'Error getting advertise status: {msg}') + + def complete_advertiser_status(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_status', text) + + def do_prdcr_listen_add(self, arg): + """ + Add a producer listen + + The producer listen must be started by using 'prdcr_listen_start'. + + After the producer listen starts, + the aggregator waits for advertisements from samplers and + automatically adds and starts a producer if the peer (sampler) hostname + matches the regular expression. + + The auto-generated producers can be stopped and restarted by using + prdcr_stop and prdcr_start, respectively, as manually added producers. + + Parameters: + name= A unique name of the producer listen + reconnect= The retry interval to check for connection establishment of producers matched the regular expression. + [regex=] A regular expression to match sampler hostnames + [rail=] The number of rail endpoints for the prdcr (default: 1). + [credits=] The send credits our ldmsd (the one we are controlling) + advertises to the prdcr (default: value from ldmsd --credits + option). This limits how much outstanding data our ldmsd + holds for the prdcr. The prdcr drops messages when it does + not have enough send credits. + [rx_rate=] The recv rate (bytes/sec) limit for this connection. The + default is -1 (unlimited). + """ + arg = self.handle_args('prdcr_listen_add', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_add(**arg) + if rc: + print(f"Error adding producer listen {arg['name']}: {msg}") + + def complete_prdcr_listen_add(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_listen_add', text) + + def do_prdcr_listen_del(self, arg): + """ + Delete a producer_listen + + The producer listen must not be running. + + Parameters: + name= A unique name of the producer listen + """ + arg = self.handle_args('prdcr_listen_del', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_del(**arg) + if rc: + print(f"Error deleting producer listen {arg['name']}: {msg}") + + def complete_prdcr_listen_del(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_listen_del', text) + + def do_prdcr_listen_start(self, arg): + """ + Start a producer_listen + + The aggregator waits for advertisements from samplers. It matches the + hostnames in advertisements with the regular expression. If they match, + there are two scenarios; 1) no producer of the same name exists, and 2) + a producer of the same name exists. In the former case, the aggregator + will create a producer with the given name and start it. In the later + case, if the producer is stopped, the aggregator will _not_ start it + automatically. The producer can be started using `prdcr_start` or + `prdcr_start_regex`. + + Parameters: + name= A unique name of the producer listen + """ + arg = self.handle_args('prdcr_listen_start', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_start(**arg) + if rc: + print(f"Error starting producer listen {arg['name']}: {msg}") + + def complete_prdcr_listen_start(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_listen_start', text) + + def do_prdcr_listen_stop(self, arg): + """ + Stop a running producer_listen + + The aggregator stops matching the hostnames in advertisements with the + regular expression. That is, the aggregator will not automatically add + any producers that the hostnames matches the regular expression. + + Parameters: + name= A unique name of the producer listen + """ + arg = self.handle_args('prdcr_listen_stop', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_stop(**arg) + if rc: + print(f"Error stopping producer listen {arg['name']}: {msg}") + + def complete_prdcr_listen_stop(self, text, line, begidx, endidx): + return self.__complete_attr_list('prdcr_listen_stop', text) + + def do_prdcr_listen_status(self, arg): + """ + Display the status of all producer listen + """ + arg = self.handle_args('prdcr_listen_status', arg) + if arg is None: + return + rc, msg = self.comm.prdcr_listen_status(**arg) + if rc == 0 and msg is not None: + l = fmt_status(msg) + print(f"{'Name':20} {'Regex':15} {'State':10}") + print(f"{'-'*20} {'-'*15} {'-'*10}") + for pl in l: + print(f"{pl['name']:20} {pl['regex']:15} {pl['state']:10}") + if len(pl['producers']): + print(f"Producers: {', '.join(p for p in pl['producers'])}") + else: + print(f'Error getting prdcr_listen status: {msg}') + def do_option(self, arg): """ ONLY SUPPORTED IN CONFIGURATION FILES diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index e2a2d132ea..1f0a0c5d93 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -902,6 +902,51 @@ int ldms_xprt_sockaddr(ldms_t x, struct sockaddr *local_sa, struct sockaddr *remote_sa, socklen_t *sa_len); +/* currently only support IPv4 and IPv6 */ +struct ldms_addr { + sa_family_t sa_family; /* host-endian */ + in_port_t sin_port; /* network-endian */ + uint8_t addr[16]; /* addr[0-3] for IPv4, + addr[0-15] for IPv6 */ +}; + +/** + * \brief Get local and remote address in \c ldms_addr struct from the xprt + * + * \param x LDMS Transport pointer + * \param local_addr Local address (re-entrant) + * \param remote_addr Remote address (re-entrant) + * + * \return 0 on success. + */ +int ldms_xprt_addr(ldms_t x, struct ldms_addr *local_addr, + struct ldms_addr *remote_addr); + +const char *ldms_sockaddr_ntop(struct sockaddr *sa, char *buff, size_t sz); + +/** + * \brief Convert a CIDR IP address string to \c ldms_addr + * + * The address is stored in \c addr, and the prefix length is stored in \c prefix_len. + * + * \param addr ldms_addr pointer + * \param prefix_len Integer pointer + * + * \retval 0 if success. Otherwise, an errno is returned. + */ +int ldms_cidr2addr(const char *cdir_str, struct ldms_addr *addr, int *prefix_len); + +/** + * \brief Verify if \c sa is in \net_addr with the prefix \c prefix_len + * + * \param ip_addr IP Address + * \param net_addr Network Address + * \param prefix_len Prefix length for masking + * + * \return 1 if the IP address is in the network address. Otherwise, 0 is returned. + */ +int ldms_addr_in_network_addr(struct ldms_addr *ip_addr, + struct ldms_addr *net_addr, int prefix_len); /** * \brief Close a connection to an LDMS host. * @@ -1104,14 +1149,6 @@ int ldms_stream_publish_file(ldms_t x, const char *stream_name, typedef struct ldms_stream_client_s *ldms_stream_client_t; typedef struct json_entity_s *json_entity_t; -/* currently only support IPv4 and IPv6 */ -struct ldms_addr { - sa_family_t sa_family; /* host-endian */ - in_port_t sin_port; /* network-endian */ - uint8_t addr[16]; /* addr[0-3] for IPv4, - addr[0-15] for IPv6 */ -}; - enum ldms_stream_event_type { LDMS_STREAM_EVENT_RECV, /* stream data received */ LDMS_STREAM_EVENT_SUBSCRIBE_STATUS, /* reporting subscription status */ diff --git a/ldms/src/core/ldms_rail.c b/ldms/src/core/ldms_rail.c index d35bbf074b..773577ab07 100644 --- a/ldms/src/core/ldms_rail.c +++ b/ldms/src/core/ldms_rail.c @@ -1411,6 +1411,65 @@ const char *ldms_addr_ntop(struct ldms_addr *addr, char *buff, size_t sz) return buff; } +/* *2 for the two hex digits needed for each 16-bit value, * 8 for the 8 groups of values, + 7 for the 7 colons */ +#define MAX_IPV6_STR_LEN (sizeof(uint16_t) * 2 * 8 + 7) +int ldms_cidr2addr(const char *cdir_str, struct ldms_addr *addr, int *prefix_len) +{ + int rc; + int is_ipv6 = 0; + char netaddr_str[MAX_IPV6_STR_LEN]; + int _prefix_len; + + if (strchr(cdir_str, ':') != NULL) + is_ipv6 = 1; + + rc = sscanf(cdir_str, "%[^/]/%d", netaddr_str, &_prefix_len); + if (rc != 2) { + return EINVAL; + } + + if (prefix_len) + *prefix_len = _prefix_len; + + if (addr) { + if (is_ipv6) { + addr->sa_family = AF_INET6; + rc = inet_pton(AF_INET6, netaddr_str, &addr->addr); + } else { + addr->sa_family = AF_INET; + rc = inet_pton(AF_INET, netaddr_str, &addr->addr); + } + } + + if (rc != 1) + return rc; + return 0; +} + +int ldms_addr_in_network_addr(struct ldms_addr *ip_addr, + struct ldms_addr *net_addr, int prefix_len) +{ + if (ip_addr->sa_family != net_addr->sa_family) + return 0; + + int i; + int masked_bytes = prefix_len/8; + int residue_bits = prefix_len % 8; + + for (i = 0; i < masked_bytes; i++) { + if (ip_addr->addr[i] != net_addr->addr[i]) + return 0; + } + + if (residue_bits) { + uint8_t mask_bits = 0xff << (8 - residue_bits); + if ( (ip_addr->addr[i] & mask_bits) != (net_addr->addr[i] & mask_bits)) + return 0; + } + + return 1; +} + ldms_t __ldms_xprt_to_rail(ldms_t x) { if (XTYPE_IS_RAIL(x->xtype)) { diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index e6c1673703..b0e710bfa9 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -4293,6 +4293,31 @@ int ldms_xprt_sockaddr(ldms_t x, struct sockaddr *local_sa, return x->ops.sockaddr(x, local_sa, remote_sa, sa_len); } +/* The implementation is in ldms_rail.c. */ +extern int sockaddr2ldms_addr(struct sockaddr *sa, struct ldms_addr *la); +int ldms_xprt_addr(ldms_t x, struct ldms_addr *local_addr, + struct ldms_addr *remote_addr) +{ + int rc; + struct sockaddr_storage local_so, remote_so; + socklen_t so_len = sizeof(local_so); + + rc = ldms_xprt_sockaddr(x, (void*)&local_so, (void*)&remote_so, &so_len); + if (rc) + return rc; + if (local_addr) { + rc = sockaddr2ldms_addr((void*)&local_so, local_addr); + if (rc) + return rc; + } + if (remote_addr) { + rc = sockaddr2ldms_addr((void*)&remote_so, remote_addr); + if (rc) + return rc; + } + return 0; +} + int __ldms_xprt_get_threads(ldms_t x, pthread_t *out, int n) { if (n < 1) diff --git a/ldms/src/ldmsd/ldmsd.h b/ldms/src/ldmsd/ldmsd.h index 6c6b60d0f6..7c0ad79828 100644 --- a/ldms/src/ldmsd/ldmsd.h +++ b/ldms/src/ldmsd/ldmsd.h @@ -153,6 +153,7 @@ typedef enum ldmsd_cfgobj_type { LDMSD_CFGOBJ_STRGP, LDMSD_CFGOBJ_LISTEN, LDMSD_CFGOBJ_AUTH, + LDMSD_CFGOBJ_PRDCR_LISTEN, } ldmsd_cfgobj_type_t; struct ldmsd_cfgobj; @@ -221,7 +222,7 @@ typedef struct ldmsd_prdcr { LDMSD_PRDCR_STATE_DISCONNECTED, /** Connection request is outstanding */ LDMSD_PRDCR_STATE_CONNECTING, - /** Connect complete */ + /** Connect complete, and ready to send a dir request */ LDMSD_PRDCR_STATE_CONNECTED, /** Waiting for task join and xprt cleanup */ LDMSD_PRDCR_STATE_STOPPING, @@ -235,7 +236,28 @@ typedef struct ldmsd_prdcr { /** Producer is local to this daemon */ LDMSD_PRDCR_TYPE_LOCAL, /** Connection initiated at this side but the peer will initiate the dir request. */ + /** + * Connection initiated at this side and the peer is aware of its existence. + * The peer will initiate the dir request after the connection is established. + */ LDMSD_PRDCR_TYPE_BRIDGE, + /** + * Connection initiated at this side to advertise itself to the peer. + * The peer does not know about its existence until it sends + * an advertise_notification request. The peer will initiate the dir request + * after the peer verifies its hostname. + */ + LDMSD_PRDCR_TYPE_ADVERTISE, + /** + * The producer is generated by LDMSD upon receiving a + * advertise_notification request that the hostname matches + * a regular expression of a listening produce. LDMSD also starts + * the producer automatically after its creation. + * + * Similarly to passive producers, the connection is initiated + * by an advertise producer on the peer. This side initiates the dir request. + */ + LDMSD_PRDCR_TYPE_GENERATED, } type; struct ldmsd_task task; @@ -338,6 +360,36 @@ typedef struct ldmsd_prdcr_ref { struct rbn rbn; } *ldmsd_prdcr_ref_t; +/** + * Listening Producer: Named set of conditions of LDMS metric set providers + */ +typedef struct ldmsd_prdcr_listen { + struct ldmsd_cfgobj obj; + enum ldmsd_listen_prdcr_state_e { + /** Initial listen producer state */ + LDMSD_PRDCR_LISTEN_STATE_STOPPED = 0, + /** Ready for handling advertise_notification and generating producer */ + LDMSD_PRDCR_LISTEN_STATE_RUNNING, + } state; + const char *hostname_regex_s; + uint64_t prdcr_conn_intvl; /* reconnect interval of generated producers */ + regex_t regex; + int rails; /* Rail size */ + int recv_credits; /* bytes */ + int rate_limits; /* bytes/sec */ + + /* Network Address & prefix_len from a given CIDR IP address string */ + struct ldms_addr net_addr; + int prefix_len; + + /* + * For query the prdcr_listen information, ldmsd could report which + * producers were added because their hostnames match the regex of + * this prdcr_listen. + */ + struct rbt prdcr_tree; +} *ldmsd_prdcr_listen_t; + /** * Updater: Named set of rules for updating remote metric sets * @@ -381,6 +433,9 @@ typedef struct ldmsd_updtr { LDMSD_UPDTR_STATE_STOPPING, } state; + /* The list of regular expressions to match producer names. */ + LIST_HEAD(updtr_prdcr_filter, ldmsd_name_match) prdcr_filter; + /* * flag to enable or disable the functionality * that automatically schedules set updates according to diff --git a/ldms/src/ldmsd/ldmsd_cfgobj.c b/ldms/src/ldmsd/ldmsd_cfgobj.c index 6a3d23eea0..2271c110dd 100644 --- a/ldms/src/ldmsd/ldmsd_cfgobj.c +++ b/ldms/src/ldmsd/ldmsd_cfgobj.c @@ -84,12 +84,16 @@ pthread_mutex_t listen_tree_lock = PTHREAD_MUTEX_INITIALIZER; struct rbt auth_tree = RBT_INITIALIZER(cfgobj_cmp); pthread_mutex_t auth_tree_lock = PTHREAD_MUTEX_INITIALIZER; +struct rbt listen_prdcr_tree = RBT_INITIALIZER(cfgobj_cmp); +pthread_mutex_t listen_prdcr_tree_lock = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_t *cfgobj_locks[] = { [LDMSD_CFGOBJ_PRDCR] = &prdcr_tree_lock, [LDMSD_CFGOBJ_UPDTR] = &updtr_tree_lock, [LDMSD_CFGOBJ_STRGP] = &strgp_tree_lock, [LDMSD_CFGOBJ_LISTEN] = &listen_tree_lock, [LDMSD_CFGOBJ_AUTH] = &auth_tree_lock, + [LDMSD_CFGOBJ_PRDCR_LISTEN] = &listen_prdcr_tree_lock, }; struct rbt *cfgobj_trees[] = { @@ -98,6 +102,7 @@ struct rbt *cfgobj_trees[] = { [LDMSD_CFGOBJ_STRGP] = &strgp_tree, [LDMSD_CFGOBJ_LISTEN] = &listen_tree, [LDMSD_CFGOBJ_AUTH] = &auth_tree, + [LDMSD_CFGOBJ_PRDCR_LISTEN] = &listen_prdcr_tree, }; void ldmsd_cfgobj_init(void) @@ -107,6 +112,7 @@ void ldmsd_cfgobj_init(void) rbt_init(&strgp_tree, cfgobj_cmp); rbt_init(&listen_tree, cfgobj_cmp); rbt_init(&auth_tree, cfgobj_cmp); + rbt_init(&listen_prdcr_tree, cfgobj_cmp); } void ldmsd_cfgobj___del(ldmsd_cfgobj_t obj) diff --git a/ldms/src/ldmsd/ldmsd_config.c b/ldms/src/ldmsd/ldmsd_config.c index ab705dc8b1..65f61f67ba 100644 --- a/ldms/src/ldmsd/ldmsd_config.c +++ b/ldms/src/ldmsd/ldmsd_config.c @@ -584,16 +584,10 @@ static uint64_t __get_cfgfile_id() } extern int is_req_id_priority(enum ldmsd_request req_id); -/* - * \param req_filter is a function that returns zero if we want to process the - * request, and returns non-zero otherwise. - */ static int __process_config_file(const char *path, int *lno, int trust, - int (*req_filter)(ldmsd_cfg_xprt_t, ldmsd_req_hdr_t, void *), - void *ctxt) + req_filter_fn req_filter, void *ctxt) { - static uint32_t msg_no = 0; int rc = 0; int lineno = 0; FILE *fin = NULL; @@ -627,6 +621,7 @@ int __process_config_file(const char *path, int *lno, int trust, } xprt.type = LDMSD_CFG_TYPE_FILE; + xprt.file.path = path; xprt.file.cfgfile_id = __get_cfgfile_id(); xprt.send_fn = log_response_fn; xprt.max_msg = LDMSD_CFG_FILE_XPRT_MAX_REC; @@ -710,7 +705,10 @@ int __process_config_file(const char *path, int *lno, int trust, } } - req_array = ldmsd_parse_config_str(line, msg_no, xprt.max_msg); + /* + * The message number is the line number. + */ + req_array = ldmsd_parse_config_str(line, lineno, xprt.max_msg); if (!req_array) { rc = errno; ovis_log(config_log, OVIS_LERROR, "Process config file error at line %d " @@ -743,27 +741,7 @@ int __process_config_file(const char *path, int *lno, int trust, if (xprt.max_msg < ntohl(request->rec_len)) xprt.max_msg = ntohl(request->rec_len); - if (req_filter) { - rc = req_filter(&xprt, request, ctxt); - /* rc = 0, filter OK */ - if (rc == 0) { - __dlog(DLOG_CFGOK, "# deferring line %d (%s): %s\n", - lineno, path, line); - goto next_req; - } - /* rc == errno */ - if (rc > 0) { - ovis_log(config_log, OVIS_LERROR, - "Configuration error at " - "line %d (%s)\n", lineno, path); - goto cleanup; - } else { - /* rc < 0, filter not applied */ - rc = 0; - } - } - - rc = ldmsd_process_config_request(&xprt, request); + rc = ldmsd_process_config_request(&xprt, request, req_filter, ctxt); if (rc || xprt.rsp_err) { if (!rc) rc = xprt.rsp_err; @@ -774,7 +752,6 @@ int __process_config_file(const char *path, int *lno, int trust, next_req: free(request); request = NULL; - msg_no += 1; off = 0; goto next_line; @@ -793,23 +770,17 @@ int __process_config_file(const char *path, int *lno, int trust, return rc; } -int __req_deferred_start_regex(ldmsd_req_hdr_t req, ldmsd_cfgobj_type_t type) +int __req_deferred_start_regex(ldmsd_req_ctxt_t reqc, ldmsd_cfgobj_type_t type) { regex_t regex = {0}; - ldmsd_req_attr_t attr; ldmsd_cfgobj_t obj; int rc; char *val; - attr = ldmsd_req_attr_get_by_id((void*)req, LDMSD_ATTR_REGEX); - if (!attr) { + val = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_REGEX); + if (!val) { ovis_log(NULL, OVIS_LERROR, "`regex` attribute is required.\n"); return EINVAL; } - val = str_repl_env_vars((char *)attr->attr_value); - if (!val) { - ovis_log(NULL, OVIS_LERROR, "Not enough memory.\n"); - return ENOMEM; - } rc = regcomp(®ex, val, REG_NOSUB); if (rc) { ovis_log(NULL, OVIS_LERROR, "Bad regex: %s\n", val); @@ -828,21 +799,15 @@ int __req_deferred_start_regex(ldmsd_req_hdr_t req, ldmsd_cfgobj_type_t type) return 0; } -int __req_deferred_start(ldmsd_req_hdr_t req, ldmsd_cfgobj_type_t type) +int __req_deferred_start(ldmsd_req_ctxt_t reqc, ldmsd_cfgobj_type_t type) { - ldmsd_req_attr_t attr; ldmsd_cfgobj_t obj; char *name; - attr = ldmsd_req_attr_get_by_id((void*)req, LDMSD_ATTR_NAME); - if (!attr) { + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { ovis_log(NULL, OVIS_LERROR, "`name` attribute is required.\n"); return EINVAL; } - name = str_repl_env_vars((char *)attr->attr_value); - if (!name) { - ovis_log(NULL, OVIS_LERROR, "Not enough memory.\n"); - return ENOMEM; - } obj = ldmsd_cfgobj_find(name, type); if (!obj) { ovis_log(NULL, OVIS_LERROR, "Config object not found: %s\n", name); @@ -855,41 +820,75 @@ int __req_deferred_start(ldmsd_req_hdr_t req, ldmsd_cfgobj_type_t type) return 0; } +/* The implementation is in ldmsd_request.c. */ +extern ldmsd_prdcr_t +__prdcr_add_handler(ldmsd_req_ctxt_t reqc, char *verb, char *obj_name); +int __req_deferred_advertise_start(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + ldmsd_prdcr_t prdcr; + char *name; + + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { + ovis_log(config_log, OVIS_LERROR, "`name` attribute is required.\n"); + return EINVAL; + } + + prdcr = ldmsd_prdcr_find(name); + if (!prdcr) { + prdcr = __prdcr_add_handler(reqc, "advertise_start", "advertise"); + if (!prdcr) { + ovis_log(config_log, OVIS_LERROR, "%s", reqc->line_buf); + rc = reqc->errcode; + goto out; + } + ldmsd_prdcr_get(prdcr); + } + + prdcr->obj.perm |= LDMSD_PERM_DSTART; + ldmsd_prdcr_put(prdcr); +out: + free(name); + return rc; +} + /* * rc = 0, filter applied OK * rc > 0, rc == -errno, error * rc = -1, filter not applied (but not an error) */ -int __req_filter_failover(ldmsd_cfg_xprt_t x, ldmsd_req_hdr_t req, void *ctxt) +int __req_filter_failover(ldmsd_req_ctxt_t reqc, void *ctxt) { int *use_failover = ctxt; int rc; - /* req is in network byte order */ - ldmsd_ntoh_req_msg(req); - - switch (req->req_id) { + switch (reqc->req_id) { case LDMSD_FAILOVER_START_REQ: *use_failover = 1; rc = 0; break; case LDMSD_PRDCR_START_REGEX_REQ: - rc = __req_deferred_start_regex(req, LDMSD_CFGOBJ_PRDCR); + rc = __req_deferred_start_regex(reqc, LDMSD_CFGOBJ_PRDCR); + break; + case LDMSD_ADVERTISER_START_REQ: + rc = __req_deferred_advertise_start(reqc); break; case LDMSD_PRDCR_START_REQ: - rc = __req_deferred_start(req, LDMSD_CFGOBJ_PRDCR); + rc = __req_deferred_start(reqc, LDMSD_CFGOBJ_PRDCR); break; case LDMSD_UPDTR_START_REQ: - rc = __req_deferred_start(req, LDMSD_CFGOBJ_UPDTR); + rc = __req_deferred_start(reqc, LDMSD_CFGOBJ_UPDTR); break; case LDMSD_STRGP_START_REQ: - rc = __req_deferred_start(req, LDMSD_CFGOBJ_STRGP); + rc = __req_deferred_start(reqc, LDMSD_CFGOBJ_STRGP); + break; + case LDMSD_PRDCR_LISTEN_START_REQ: + rc = __req_deferred_start(reqc, LDMSD_CFGOBJ_PRDCR_LISTEN); break; default: rc = -1; } - /* convert req back to network byte order */ - ldmsd_hton_req_msg(req); return rc; } @@ -954,11 +953,19 @@ int ldmsd_cfgobjs_start(int (*filter)(ldmsd_cfgobj_t)) ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP); goto out; } - __dlog(DLOG_CFGOK, "strgp_start name=%s # delayed \n", - obj->name); + __dlog(DLOG_CFGOK, "strgp_start name=%s # delayed \n", + obj->name); } ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP); + ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR_LISTEN); + LDMSD_CFGOBJ_FOREACH(obj, LDMSD_CFGOBJ_PRDCR_LISTEN) { + if (filter && filter(obj)) + continue; + ((ldmsd_prdcr_listen_t)obj)->state = LDMSD_PRDCR_LISTEN_STATE_RUNNING; + } + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + out: return rc; } @@ -1048,7 +1055,7 @@ void ldmsd_recv_msg(ldms_t x, char *data, size_t data_len) switch (ntohl(request->type)) { case LDMSD_REQ_TYPE_CONFIG_CMD: - (void)ldmsd_process_config_request(&xprt, request); + (void)ldmsd_process_config_request(&xprt, request, NULL, NULL); break; case LDMSD_REQ_TYPE_CONFIG_RESP: (void)ldmsd_process_config_response(&xprt, request); diff --git a/ldms/src/ldmsd/ldmsd_prdcr.c b/ldms/src/ldmsd/ldmsd_prdcr.c index afcfc89d80..a6f95487dd 100644 --- a/ldms/src/ldmsd/ldmsd_prdcr.c +++ b/ldms/src/ldmsd/ldmsd_prdcr.c @@ -566,6 +566,73 @@ static void __ldmsd_xprt_ctxt_free(void *_ctxt) free(ctxt); } +static ovis_log_t config_log; +static int __advertise_notification_resp_cb(ldmsd_req_cmd_t rcmd) +{ + ldmsd_req_hdr_t resp = (ldmsd_req_hdr_t)(rcmd->reqc->req_buf); + ldmsd_prdcr_t prdcr = (ldmsd_prdcr_t)(rcmd->ctxt); + /* The rsp_err value is set in ldmsd_request.c:advertise_notification_handler() */ + if (resp->rsp_err) { + char *errmsg = ldmsd_req_attr_str_value_get_by_id(rcmd->reqc, + LDMSD_ATTR_STRING); + if (ENOENT== resp->rsp_err) { + /* + * The hostname doesn't match any prdcr_listen on the aggregator. + * Retry! + * + * To simplify producer's state management, I decided to + * disconnect the connection to reset the producer state. + * This avoids the need for an additional state to differentiate + * between 'connected and matching a prdcr_listen on the peer' + * and 'connected but not yet matching any prdcr_listen on the peer'. + */ + if (prdcr->xprt) + ldms_xprt_close(prdcr->xprt); + ovis_log(config_log, OVIS_LINFO, "advertise: %s.\n", errmsg); + } else { + /* + * LDMSD doesn't automatically stop the advertisement to + * keep the consistency that LDMSD does not automatically + * start or stop any configuration objects. + */ + ovis_log(config_log, OVIS_LERROR, + "'advertise': An error occurred on the aggregator. " + "Error: \"%s\" Please stop advertising and restart " + "with updated configuration.\n", errmsg); + } + free(errmsg); + } + return 0; +} + +static int __send_advertise_notification(ldmsd_prdcr_t prdcr) +{ + int rc; + ldmsd_req_cmd_t rcmd; + char my_hostname[HOST_NAME_MAX+1]; + + rcmd = ldmsd_req_cmd_new(prdcr->xprt, + LDMSD_ADVERTISE_NOTIFY_REQ, NULL, + __advertise_notification_resp_cb, prdcr); + if (!rcmd) { + ovis_log(NULL, OVIS_LCRIT, "Memory allocation failure.\n"); + return ENOMEM; + } + + rc = ldmsd_req_cmd_attr_append_str(rcmd, LDMSD_ATTR_NAME, prdcr->obj.name); + if (rc) + goto out; + rc = gethostname(my_hostname, HOST_NAME_MAX+1); + rc = ldmsd_req_cmd_attr_append_str(rcmd, LDMSD_ATTR_HOST, my_hostname); + if (rc) + goto out; + rc = ldmsd_req_cmd_attr_term(rcmd); + if (rc) + goto out; +out: + return rc; +} + static int __sampler_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr) { int is_reset_prdcr = 0; @@ -573,6 +640,10 @@ static int __sampler_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr) case LDMS_XPRT_EVENT_CONNECTED: /* Do nothing */ prdcr->conn_state = LDMSD_PRDCR_STATE_CONNECTED; + if (prdcr->type == LDMSD_PRDCR_TYPE_ADVERTISE) { + __send_advertise_notification(prdcr); + /* TODO: handle the error */ + } break; case LDMS_XPRT_EVENT_DISCONNECTED: case LDMS_XPRT_EVENT_ERROR: @@ -581,6 +652,9 @@ static int __sampler_routine(ldms_t x, ldms_xprt_event_t e, ldmsd_prdcr_t prdcr) is_reset_prdcr = 1; break; case LDMS_XPRT_EVENT_RECV: + /* Receive the response of an advertisement */ + ldmsd_recv_msg(x, e->data, e->data_len); + break; case LDMS_XPRT_EVENT_SEND_COMPLETE: /* Ignore */ break; @@ -692,9 +766,11 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) switch (prdcr->type) { case LDMSD_PRDCR_TYPE_ACTIVE: case LDMSD_PRDCR_TYPE_PASSIVE: + case LDMSD_PRDCR_TYPE_GENERATED: is_reset_prdcr = __agg_routine(x, e, prdcr); break; case LDMSD_PRDCR_TYPE_BRIDGE: + case LDMSD_PRDCR_TYPE_ADVERTISE: is_reset_prdcr = __sampler_routine(x, e, prdcr); break; default: @@ -752,10 +828,11 @@ static void prdcr_connect(ldmsd_prdcr_t prdcr) { int ret; - assert(prdcr->xprt == NULL); switch (prdcr->type) { case LDMSD_PRDCR_TYPE_ACTIVE: case LDMSD_PRDCR_TYPE_BRIDGE: + case LDMSD_PRDCR_TYPE_ADVERTISE: + assert(prdcr->xprt == NULL); prdcr->conn_state = LDMSD_PRDCR_STATE_CONNECTING; prdcr->xprt = ldms_xprt_rail_new(prdcr->xprt_name, prdcr->rail, @@ -780,8 +857,15 @@ static void prdcr_connect(ldmsd_prdcr_t prdcr) } break; case LDMSD_PRDCR_TYPE_PASSIVE: + case LDMSD_PRDCR_TYPE_GENERATED: + assert(prdcr->xprt == NULL); prdcr->xprt = ldms_xprt_by_remote_sin((struct sockaddr *)&prdcr->ss); - /* Call connect callback to advance state and update timers*/ + /* + * The transport endpoint has be assigned in the advertise_notification handler before + * the producer has been started. + * + * Call connect callback to advance state and update timers + */ if (prdcr->xprt) { ldms_xprt_event_cb_set(prdcr->xprt, prdcr_connect_cb, prdcr); ldmsd_prdcr_unlock(prdcr); @@ -833,6 +917,10 @@ int ldmsd_prdcr_str2type(const char *type) prdcr_type = LDMSD_PRDCR_TYPE_LOCAL; else if (0 == strcasecmp(type, "bridge")) prdcr_type = LDMSD_PRDCR_TYPE_BRIDGE; + else if (0 == strcasecmp(type, "advertise")) + prdcr_type = LDMSD_PRDCR_TYPE_ADVERTISE; + else if (0 == strcasecmp(type, "generated")) + prdcr_type = LDMSD_PRDCR_TYPE_GENERATED; else return -EINVAL; return prdcr_type; @@ -848,10 +936,29 @@ const char *ldmsd_prdcr_type2str(enum ldmsd_prdcr_type type) return "local"; else if (LDMSD_PRDCR_TYPE_BRIDGE == type) return "bridge"; + else if (LDMSD_PRDCR_TYPE_ADVERTISE == type) + return "advertise"; + else if (LDMSD_PRDCR_TYPE_GENERATED == type) + return "generated"; else return NULL; } +int prdcr_ref_cmp(void *a, const void *b) +{ + return strcmp(a, b); +} + +ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr) +{ + ldmsd_prdcr_ref_t ref = calloc(1, sizeof *ref); + if (ref) { + ref->prdcr = ldmsd_prdcr_get(prdcr); + rbn_init(&ref->rbn, prdcr->obj.name); + } + return ref; +} + ldmsd_prdcr_t ldmsd_prdcr_new_with_auth(const char *name, const char *xprt_name, const char *host_name, const unsigned short port_no, @@ -885,8 +992,13 @@ ldmsd_prdcr_new_with_auth(const char *name, const char *xprt_name, if (!prdcr->host_name) goto out; prdcr->xprt_name = strdup(xprt_name); - if ((type != LDMSD_PRDCR_TYPE_PASSIVE) && (!prdcr->port_no)) - goto out; + if ((type == LDMSD_PRDCR_TYPE_ACTIVE) || (type == LDMSD_PRDCR_TYPE_BRIDGE)) { + /* The producer needs the port information to send the connection request */ + /* Verify that the port_no exists. */ + if (!prdcr->port_no) { + goto out; + } + } prdcr->ss_len = sizeof(prdcr->ss); if (prdcr_resolve(host_name, port_no, &prdcr->ss, &prdcr->ss_len)) { diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index 5058ff65ff..9c7608eedc 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -119,6 +119,7 @@ extern const char *prdcr_state_str(enum ldmsd_prdcr_state state); extern int ldmsd_credits; /* defined in ldmsd.c */ +#define CONFIG_PLAYBACK_ENABLED(_match_) ((_match_) & ldmsd_req_debug) struct timeval ldmsd_req_last_time; __attribute__((format(printf, 2, 3))) void __dlog(int match, const char *fmt, ...) @@ -315,6 +316,18 @@ static int default_credits_set_handler(ldmsd_req_ctxt_t reqc); static int pid_file_handler(ldmsd_req_ctxt_t reqc); static int banner_mode_handler(ldmsd_req_ctxt_t reqc); +/* Sampler discovery */ +static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc); +static int prdcr_listen_del_handler(ldmsd_req_ctxt_t reqc); +static int prdcr_listen_start_handler(ldmsd_req_ctxt_t reqc); +static int prdcr_listen_stop_handler(ldmsd_req_ctxt_t reqc); +static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc); +static int advertiser_add_handler(ldmsd_req_ctxt_t reqc); +static int advertiser_start_handler(ldmsd_req_ctxt_t reqc); +static int advertiser_stop_handler(ldmsd_req_ctxt_t reqc); +static int advertiser_del_handler(ldmsd_req_ctxt_t reqc); +static int advertise_notification_handler(ldmsd_req_ctxt_t reqc); + /* executable for all */ #define XALL 0111 /* executable for user, and group */ @@ -687,6 +700,38 @@ static struct request_handler_entry request_handler[] = { [LDMSD_BANNER_MODE_REQ] = { LDMSD_BANNER_MODE_REQ, banner_mode_handler, XUG }, + + /* Sampler Discovery */ + [LDMSD_ADVERTISER_ADD_REQ] = { + LDMSD_ADVERTISER_ADD_REQ, advertiser_add_handler, XUG + }, + [LDMSD_ADVERTISER_START_REQ] = { + LDMSD_ADVERTISER_START_REQ, advertiser_start_handler, XUG + }, + [LDMSD_ADVERTISER_STOP_REQ] = { + LDMSD_ADVERTISER_STOP_REQ, advertiser_stop_handler, XUG + }, + [LDMSD_ADVERTISER_DEL_REQ] = { + LDMSD_ADVERTISER_DEL_REQ, advertiser_del_handler, XUG + }, + [LDMSD_PRDCR_LISTEN_ADD_REQ] = { + LDMSD_PRDCR_LISTEN_ADD_REQ, prdcr_listen_add_handler, XUG + }, + [LDMSD_PRDCR_LISTEN_DEL_REQ] = { + LDMSD_PRDCR_LISTEN_DEL_REQ, prdcr_listen_del_handler, XUG + }, + [LDMSD_PRDCR_LISTEN_START_REQ] = { + LDMSD_PRDCR_LISTEN_START_REQ, prdcr_listen_start_handler, XUG | MOD + }, + [LDMSD_PRDCR_LISTEN_STOP_REQ] = { + LDMSD_PRDCR_LISTEN_STOP_REQ, prdcr_listen_stop_handler, XUG | MOD + }, + [LDMSD_PRDCR_LISTEN_STATUS_REQ] = { + LDMSD_PRDCR_LISTEN_STATUS_REQ, prdcr_listen_status_handler, XALL + }, + [LDMSD_ADVERTISE_NOTIFY_REQ] = { + LDMSD_ADVERTISE_NOTIFY_REQ, advertise_notification_handler, XUG + }, }; int is_req_id_priority(enum ldmsd_request req_id) @@ -1238,7 +1283,12 @@ void ldmsd_send_cfg_rec_adv(ldmsd_cfg_xprt_t xprt, uint32_t msg_no, uint32_t rec } extern void cleanup(int x, char *reason); -int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request) +/* + * \param req_filter is a function that returns zero if we want to process the + * request, and returns non-zero otherwise. + */ +int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request, + req_filter_fn req_filter, void *filter_ctxt) { struct req_ctxt_key key; ldmsd_req_ctxt_t reqc = NULL; @@ -1334,6 +1384,23 @@ int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request) ldmsd_ntoh_req_msg((ldmsd_req_hdr_t)reqc->req_buf); reqc->req_id = ((ldmsd_req_hdr_t)reqc->req_buf)->req_id; + if (req_filter) { + rc = req_filter(reqc, filter_ctxt); + /* rc = 0, filter OK */ + if (rc == 0) { + __dlog(DLOG_CFGOK, "# deferring line %d (%s)\n", + reqc->key.msg_no, reqc->xprt->file.path); + goto put_reqc; + } + /* rc == errno */ + if (rc > 0) { + goto put_reqc; + } else { + /* rc < 0, filter not applied */ + rc = 0; + } + } + rc = ldmsd_handle_request(reqc); if (!rc && !reqc->errcode) { @@ -1342,6 +1409,7 @@ int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request) ldmsd_inc_cfg_cntr(); } +put_reqc: if (xprt != reqc->xprt) memcpy(xprt, reqc->xprt, sizeof(*xprt)); @@ -1535,9 +1603,9 @@ static int example_handler(ldmsd_req_ctxt_t reqc) return rc; } -static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) +ldmsd_prdcr_t __prdcr_add_handler(ldmsd_req_ctxt_t reqc, char *verb, char *obj_name) { - ldmsd_prdcr_t prdcr; + ldmsd_prdcr_t prdcr = NULL; char *name, *host, *xprt, *attr_name, *type_s, *port_s, *interval_s, *rail_s, *credits_s, *rx_rate_s; char *auth; @@ -1553,7 +1621,6 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) int rail = 1; char *perm_s = NULL; - reqc->errcode = 0; name = host = xprt = type_s = port_s = interval_s = auth = rail_s = credits_s = NULL; attr_name = "name"; @@ -1572,14 +1639,14 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "The attribute type '%s' is invalid.", type_s); - goto send_reply; + goto out; } if (type == LDMSD_PRDCR_TYPE_LOCAL) { cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "Producer with type 'local' is " - "not supported."); + "%s with type 'local' is " + "not supported.", obj_name); reqc->errcode = EINVAL; - goto send_reply; + goto out; } } attr_name = "xprt"; @@ -1609,8 +1676,8 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) if (port_s) { cnt = snprintf(reqc->line_buf, reqc->line_len, "Ignore the given port %s because " - "prdcr %s's type is passive.", - port_s, name); + "the type of %s %s is passive.", + port_s, obj_name, name); } port_no = -1; } @@ -1624,13 +1691,13 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) if (reqc->errcode) { cnt = snprintf(reqc->line_buf, reqc->line_len, "The given 'reconnect' is invalid."); - goto send_reply; + goto out; } if (interval_us <= 0) { reqc->errcode = EINVAL; cnt = snprintf(reqc->line_buf, reqc->line_len, "The reconnect interval must be a positive number."); - goto send_reply; + goto out; } } @@ -1653,7 +1720,7 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "'rail' attribute must be a positive integer, got '%s'", rail_s); - goto send_reply; + goto out; } } @@ -1664,7 +1731,7 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "'credits' attribute must be greater than -2, got '%s'", credits_s); - goto send_reply; + goto out; } } @@ -1675,10 +1742,9 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "'rx_rate' attribute must be greater than -2, got '%s'", rx_rate_s); - goto send_reply; + goto out; } } - prdcr = ldmsd_prdcr_new_with_auth(name, xprt, host, port_no, type, interval_us, auth, uid, gid, perm, rail, credits, rx_rate); @@ -1692,39 +1758,33 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) else goto enomem; } - __dlog(DLOG_CFGOK, "prdcr_add name=%s xprt=%s host=%s port=%u type=%s " - "reconnect=%ld auth=%s uid=%d gid=%d perm=%o\n", - name, xprt, host, port_no, type_s, - interval_us, auth ? auth : "none", (int)uid, (int)gid, - (unsigned)perm); - goto send_reply; + goto out; ebadauth: reqc->errcode = ENOENT; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "Authentication name not found, check the auth_add configuration."); - goto send_reply; + goto out; enomem: reqc->errcode = ENOMEM; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "Memory allocation failed."); - goto send_reply; + goto out; eexist: reqc->errcode = EEXIST; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "The prdcr %s already exists.", name); - goto send_reply; + goto out; eafnosupport: reqc->errcode = EAFNOSUPPORT; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "Error resolving hostname '%s'\n", host); - goto send_reply; + goto out; einval: reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, "The attribute '%s' is required.", attr_name); -send_reply: - ldmsd_send_req_response(reqc, reqc->line_buf); +out: free(name); free(type_s); free(port_s); @@ -1735,10 +1795,28 @@ static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) free(auth); free(rail_s); free(credits_s); + return prdcr; +} + +static int prdcr_add_handler(ldmsd_req_ctxt_t reqc) +{ + ldmsd_prdcr_t prdcr; + prdcr = __prdcr_add_handler(reqc, "prdcr_add", "producer"); + if (prdcr) { + __dlog(DLOG_CFGOK, "prdcr_add name=%s xprt=%s host=%s port=%u type=%s " + "reconnect=%ld auth=%s uid=%d gid=%d perm=%o\n", + prdcr->obj.name, prdcr->xprt_name, prdcr->host_name, + prdcr->port_no, ldmsd_prdcr_type2str(prdcr->type), + prdcr->conn_intrvl_us, prdcr->conn_auth_dom_name, + (int)prdcr->obj.uid, (int)prdcr->obj.gid, + (unsigned)prdcr->obj.perm); + } + + ldmsd_send_req_response(reqc, reqc->line_buf); return 0; } -static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) +static int __prdcr_del_handler(ldmsd_req_ctxt_t reqc, const char *cmd, const char *obj_type) { char *name = NULL, *attr_name; size_t cnt = 0; @@ -1751,9 +1829,9 @@ static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) if (!name) { reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The attribute '%s' is required by prdcr_del.", - attr_name); - goto send_reply; + "The attribute '%s' is required by %s.", + attr_name, cmd); + goto out; } ldmsd_req_ctxt_sec_get(reqc, &sctxt); @@ -1761,15 +1839,15 @@ static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = ldmsd_prdcr_del(name, &sctxt); switch (reqc->errcode) { case 0: - __dlog(DLOG_CFGOK, "prdcr_del name=%s\n", name); + __dlog(DLOG_CFGOK, "%s name=%s\n", cmd, name); break; case ENOENT: Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer specified does not exist."); + "The %s specified does not exist.", obj_type); break; case EBUSY: Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer is in use."); + "The %s is in use.", obj_type); break; case EACCES: Snprintf(&reqc->line_buf, &reqc->line_len, @@ -1780,14 +1858,19 @@ static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) "Error: %d %s", reqc->errcode, ovis_errno_abbvr(reqc->errcode)); } - -send_reply: - ldmsd_send_req_response(reqc, reqc->line_buf); +out: free(name); return 0; } -static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) +static int prdcr_del_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_del_handler(reqc, "prdcr_del", "producer"); + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int __prdcr_start_handler(ldmsd_req_ctxt_t reqc, const char *cmd, const char *obj_type) { char *name, *interval_str; name = interval_str = NULL; @@ -1800,8 +1883,8 @@ static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) if (!name) { reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The attribute 'name' is required by prdcr_start."); - goto send_reply; + "The attribute 'name' is required by %s.", cmd); + goto out; } ldmsd_req_ctxt_sec_get(reqc, &sctxt); @@ -1810,16 +1893,15 @@ static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = ldmsd_prdcr_start(name, interval_str, &sctxt); switch (reqc->errcode) { case 0: - __dlog(DLOG_CFGOK, "prdcr_start name=%s reconnect=%s\n", - name, interval_str); + /* do nothing */ break; case EBUSY: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer is already running."); + "The %s is already running.", obj_type); break; case ENOENT: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer specified does not exist."); + "The %s specified does not exist.", obj_type); break; case EACCES: Snprintf(&reqc->line_buf, &reqc->line_len, @@ -1839,14 +1921,34 @@ static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) reqc->errcode, ovis_errno_abbvr(reqc->errcode)); } -send_reply: - ldmsd_send_req_response(reqc, reqc->line_buf); +out: free(name); free(interval_str); return 0; } -static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) +static int prdcr_start_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_start_handler(reqc, "prdcr_start", "producer"); + if (CONFIG_PLAYBACK_ENABLED(DLOG_CFGOK)) { + if (!rc && !reqc->errcode) { + char *name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + char *interval_us = ldmsd_req_attr_str_value_get_by_id(reqc, + LDMSD_ATTR_INTERVAL); + if (interval_us) { + __dlog(DLOG_CFGOK, "prdcr_start name=%s reconnect=%s\n", + name, interval_us); + free(interval_us); + } else { + __dlog(DLOG_CFGOK, "prdcr_start name=%s\n", name); + } + } + } + ldmsd_send_req_response(reqc, reqc->line_buf); + return 0; +} + +static int __prdcr_stop_handler(ldmsd_req_ctxt_t reqc, const char *cmd, const char *obj_type) { char *name = NULL; size_t cnt = 0; @@ -1858,8 +1960,8 @@ static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) if (!name) { reqc->errcode = EINVAL; cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The attribute 'name' is required by prdcr_stop."); - goto send_reply; + "The attribute 'name' is required by %s.", cmd); + goto out; } ldmsd_req_ctxt_sec_get(reqc, &sctxt); @@ -1867,15 +1969,15 @@ static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) reqc->errcode = ldmsd_prdcr_stop(name, &sctxt); switch (reqc->errcode) { case 0: - __dlog(DLOG_CFGOK, "prdcr_stop name=%s\n", name); + __dlog(DLOG_CFGOK, "%s name=%s\n", cmd, name); break; case EBUSY: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer is already stopped."); + "The %s is already stopped.", obj_type); break; case ENOENT: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, - "The producer specified does not exist."); + "The %s specified does not exist.", obj_type); break; case EACCES: cnt = Snprintf(&reqc->line_buf, &reqc->line_len, @@ -1887,12 +1989,18 @@ static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) reqc->errcode, ovis_errno_abbvr(reqc->errcode)); } -send_reply: - ldmsd_send_req_response(reqc, reqc->line_buf); +out: free(name); return 0; } +static int prdcr_stop_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_stop_handler(reqc, "prdcr_stop", "producer"); + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + static int prdcr_start_regex_handler(ldmsd_req_ctxt_t reqc) { char *prdcr_regex, *interval_str; @@ -9326,3 +9434,755 @@ static int banner_mode_handler(ldmsd_req_ctxt_t reqc) free(mode_s); return rc; } + +/* Sampler Discovery */ + +/* *2 for the two hex digits needed for each 16-bit value, * 8 for the 8 groups of values, + 7 for the 7 colons */ +#define MAX_IPV6_STR_LEN (sizeof(uint16_t) * 2 * 8 + 7) +static int __cidr2addr6(const char *cdir_str, struct ldms_addr *addr, int *prefix_len) +{ + int rc; + int is_ipv6 = 0; + char netaddr_str[MAX_IPV6_STR_LEN]; + int _prefix_len; + struct ldms_addr s6 = { + .addr = {0,0,0,0,0,0,0,0,0,0,0xff,0xff,0,0,0,0} + }; + if (strchr(cdir_str, ':') != NULL) + is_ipv6 = 1; + + rc = sscanf(cdir_str, "%[^/]/%d", netaddr_str, &_prefix_len); + if (rc != 2) { + return EINVAL; + } + + if (prefix_len) + *prefix_len = _prefix_len; + + if (addr) { + if (is_ipv6) { + rc = inet_pton(AF_INET6, netaddr_str, &addr->addr); + } else { + rc = inet_pton(AF_INET, netaddr_str, &addr->addr); + } + } + + if (rc != 1) + return rc; + if (!is_ipv6) { + /* Make the ipv4-mapped ipv6 format */ + memcpy(&s6.addr[12], &addr->addr, 4); + memcpy(&addr->addr, &s6.addr, 16); + } + addr->sa_family = AF_INET6; + return 0; +} + + +/* Aggregator */ +/* The implementation is in ldmsd_prdcr.c */ +extern int prdcr_ref_cmp(void *a, const void *b); +static int prdcr_listen_add_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + char *name; + char *regex_str; + char *cidr_str; + char *reconnect_str; + char *rail_str; + char *credits_str; + char *rx_rate_str; + char *attr_name; + ldmsd_prdcr_listen_t pl; + long reconnect_us; + int rail; + uint64_t credits; + uint64_t rx_rate; + + name = regex_str = reconnect_str = rail_str = credits_str = rx_rate_str = NULL; + attr_name = "name"; + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) + goto einval; + + regex_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_REGEX); + cidr_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_IP); + + attr_name = "reconnect"; + reconnect_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_INTERVAL); + if (!reconnect_str) { + goto einval; + } else { + reqc->errcode = ovis_time_str2us(reconnect_str, &reconnect_us); + if (reqc->errcode) { + (void) snprintf(reqc->line_buf, reqc->line_len, + "The given 'reconnect' is invalid."); + goto send_reply; + } + if (reconnect_us <= 0) { + reqc->errcode = EINVAL; + (void) snprintf(reqc->line_buf, reqc->line_len, + "The reconnect interval must be a positive number."); + goto send_reply; + } + } + + rail_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_RAIL); + if (rail_str) { + rail = atoi(rail_str); + if (rail <= 0) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "'rail' attribute must be a positive " + "integer, got '%s'", rail_str); + goto send_reply; + } + } + + credits_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_CREDITS); + if (credits_str) { + credits = atol(credits_str); + if (credits <= 2) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "'credits' attribute must be greater " + "than -2, got '%s'", credits_str); + goto send_reply; + } + } + + rx_rate_str = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_RX_RATE); + if (rx_rate_str) { + rx_rate = atol(rx_rate_str); + if (credits <= -2) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "'rx_rate' attribute must be greater " + "than -2, got '%s'", rx_rate_str); + goto send_reply; + } + } + + pl = (ldmsd_prdcr_listen_t) + ldmsd_cfgobj_new_with_auth(name, LDMSD_CFGOBJ_PRDCR_LISTEN, + sizeof(*pl), NULL, 0, 0, 0); + if (!pl) + goto enomem; + + if (regex_str) { + pl->hostname_regex_s = strdup(regex_str); + if (!pl->hostname_regex_s) { + ldmsd_cfgobj_put(&pl->obj); + goto enomem; + } + + rc = ldmsd_compile_regex(&pl->regex, regex_str, reqc->line_buf, reqc->line_len); + if (rc) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The regular expression string " + "'%s' is invalid.", regex_str); + ldmsd_cfgobj_put(&pl->obj); + goto send_reply; + } + + } + + pl->prdcr_conn_intvl = reconnect_us; + if (cidr_str) { + rc = __cidr2addr6(cidr_str, &pl->net_addr, &pl->prefix_len); + if (rc) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The given CIDR string '%s' " + "is invalid.", cidr_str); + ldmsd_cfgobj_put(&pl->obj); + goto send_reply; + } + } + + pl->rails = rail; + pl->rate_limits = rx_rate; + rbt_init(&pl->prdcr_tree, prdcr_ref_cmp); + ldmsd_cfgobj_unlock(&pl->obj); + +send_reply: + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +enomem: + reqc->errcode = ENOMEM; + (void)snprintf(reqc->line_buf, reqc->line_len, + "Memory allocation failed."); + goto send_reply; +einval: + reqc->errcode = EINVAL; + (void) snprintf(reqc->line_buf, reqc->line_len, + "The attribute '%s' is required.", attr_name); + goto send_reply; +} + +/* This is implemented in ldmsd_cfgobj.c */ +extern struct rbt *cfgobj_trees[]; +static int prdcr_listen_del_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + char *name = NULL; + struct ldmsd_sec_ctxt sctxt; + ldmsd_prdcr_listen_t pl; + + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute 'name' is required,"); + goto send_reply; + } + + ldmsd_req_ctxt_sec_get(reqc, &sctxt); + + ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR_LISTEN); + for (pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_first(LDMSD_CFGOBJ_PRDCR_LISTEN); pl; + pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_next(&pl->obj)) { + if (0 != strcmp(name, pl->obj.name)) + continue; + + ldmsd_cfgobj_lock(&pl->obj); + rc = ldmsd_cfgobj_access_check(&pl->obj, 0222, &sctxt); + if (rc) { + ldmsd_cfgobj_unlock(&pl->obj); + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->errcode = EACCES; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Permission denied"); + goto send_reply; + } + + if (pl->state != LDMSD_PRDCR_LISTEN_STATE_STOPPED) { + ldmsd_cfgobj_unlock(&pl->obj); + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->errcode = EBUSY; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The producer listen '%s' is in use.\n", + name); + goto send_reply; + } + + if (ldmsd_cfgobj_refcount(&pl->obj) > 2) { + ldmsd_cfgobj_unlock(&pl->obj); + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->errcode = EBUSY; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The producer listen '%s' is in use.\n", + name); + goto send_reply; + } + + rbt_del(cfgobj_trees[LDMSD_CFGOBJ_PRDCR_LISTEN], &pl->obj.rbn); + ldmsd_cfgobj_put(&pl->obj); /* Put back the reference from the tree */ + ldmsd_cfgobj_unlock(&pl->obj); + goto unlock_tree; + } + + if (!pl) { + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->errcode = ENOENT; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The producer listen '%s' does not exist.\n", + name); + goto send_reply; + } + +unlock_tree: + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + +send_reply: + if (pl) + ldmsd_cfgobj_put(&pl->obj); /* Put back the 'first' or 'next' reference */ + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int prdcr_listen_start_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + char *name = NULL; + struct ldmsd_sec_ctxt sctxt; + ldmsd_prdcr_listen_t pl; + + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute 'name' is required,"); + goto send_reply; + } + + ldmsd_req_ctxt_sec_get(reqc, &sctxt); + + pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_find(name, LDMSD_CFGOBJ_PRDCR_LISTEN); + if (!pl) { + reqc->errcode = ENOENT; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The listen_producer '%s' does not exist.", + name); + goto send_reply; + } + + ldmsd_cfgobj_lock(&pl->obj); + rc = ldmsd_cfgobj_access_check(&pl->obj, 0222, &sctxt); + if (rc) { + ldmsd_cfgobj_unlock(&pl->obj); + reqc->errcode = EACCES; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Permission denied"); + goto send_reply; + } + + pl->obj.perm |= LDMSD_PERM_DSTART; + pl->state = LDMSD_PRDCR_LISTEN_STATE_RUNNING; + ldmsd_cfgobj_put(&pl->obj); /* Put back the 'find' reference */ + ldmsd_cfgobj_unlock(&pl->obj); + +send_reply: + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int prdcr_listen_stop_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + char *name = NULL; + struct ldmsd_sec_ctxt sctxt; + ldmsd_prdcr_listen_t pl; + + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute 'name' is required,"); + goto send_reply; + } + + ldmsd_req_ctxt_sec_get(reqc, &sctxt); + + pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_find(name, LDMSD_CFGOBJ_PRDCR_LISTEN); + if (!pl) { + reqc->errcode = ENOENT; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The listen_producer '%s' does not exist.", + name); + goto send_reply; + } + + ldmsd_cfgobj_lock(&pl->obj); + rc = ldmsd_cfgobj_access_check(&pl->obj, 0222, &sctxt); + if (rc) { + reqc->errcode = EACCES; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Permission denied"); + goto out; + } + + if (pl->state == LDMSD_PRDCR_LISTEN_STATE_STOPPED) + goto out; /* already stopped, return as stop succeeds. */ + + pl->obj.perm &= ~LDMSD_PERM_DSTART; + pl->state = LDMSD_PRDCR_LISTEN_STATE_STOPPED; +out: + ldmsd_cfgobj_put(&pl->obj); /* Put back the 'find' reference */ + ldmsd_cfgobj_unlock(&pl->obj); + +send_reply: + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int prdcr_listen_status_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + int cnt = 0; + int prdcr_cnt = 0; + ldmsd_prdcr_listen_t pl; + ldmsd_prdcr_ref_t pref; + struct rbn *rbn; + struct ldmsd_req_attr_s attr; + + /* + * TODO: It'll be helpfull to list all producers generated by each prdcr_listen. + */ + ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR_LISTEN); + for (pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_first(LDMSD_CFGOBJ_PRDCR_LISTEN); pl; + pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_next(&pl->obj)) { + if (cnt) { + if ((rc = linebuf_printf(reqc, ","))) + goto err; + } + rc = linebuf_printf(reqc, + "{\"name\":\"%s\"," + "\"state\":\"%s\"," + "\"regex\":\"%s\"," + "\"reconnect\":\"%ld\"," + "\"producers\":[", + pl->obj.name, + ((pl->state==LDMSD_PRDCR_LISTEN_STATE_RUNNING)?("running"):("stopped")), + pl->hostname_regex_s, + pl->prdcr_conn_intvl + ); + if (rc) + goto err; + RBT_FOREACH(rbn, &pl->prdcr_tree) { + pref = container_of(rbn, struct ldmsd_prdcr_ref, rbn); + if (prdcr_cnt) { + if ((rc = linebuf_printf(reqc, ","))) + goto err; + } + if ((rc = linebuf_printf(reqc, "\"%s\"", pref->prdcr->obj.name))) + goto err; + prdcr_cnt++; + } + if ((rc = linebuf_printf(reqc, "]}"))) + goto err; + cnt++; + } + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + cnt = reqc->line_off + 2; /* +2 for '[' and ']' */ + + /* Send the json attribute header */ + attr.discrim = 1; + attr.attr_len = cnt; + attr.attr_id = LDMSD_ATTR_JSON; + ldmsd_hton_req_attr(&attr); + rc = ldmsd_append_reply(reqc, (char *)&attr, sizeof(attr), LDMSD_REQ_SOM_F); + if (rc) + goto out; + + /* Send the json object */ + rc = ldmsd_append_reply(reqc, "[", 1, 0); + if (rc) + goto out; + if (reqc->line_off) { + rc = ldmsd_append_reply(reqc, reqc->line_buf, reqc->line_off, 0); + if (rc) + goto out; + } + rc = ldmsd_append_reply(reqc, "]", 1, 0); + if (rc) { + goto out; + } + + /* Send the terminating attribute */ + attr.discrim = 0; + rc = ldmsd_append_reply(reqc, (char *)&attr.discrim, + sizeof(uint32_t), LDMSD_REQ_EOM_F); +out: + return rc; +err: + if (pl) + ldmsd_cfgobj_put(&pl->obj); + ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR_LISTEN); + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Error getting the status: Error %d.", rc); + reqc->errcode = EIO; + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +/* The implementation is in ldmsd_updtr.c */ +extern int __ldmsd_updtr_prdcr_add(ldmsd_updtr_t updtr, ldmsd_prdcr_t prdcr); +/* The implementations are in ldmsd_prdcr.c */ +extern ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr); +static int __get_prdcr(ldmsd_req_ctxt_t reqc, ldmsd_prdcr_listen_t lp) +{ + int rc; + char *name; + char *xprt_s; + char *hostname; + char *attr_name; + ldmsd_prdcr_t prdcr; + ldmsd_prdcr_ref_t pl_pref, updtr_pref; + struct rbn *rbn; + struct ldmsd_sec_ctxt sctxt; + uid_t uid; + gid_t gid; + int is_start; + name = xprt_s = hostname = NULL; + + attr_name = "name"; + name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + if (!name) + goto einval; + + attr_name = "hostname"; + hostname = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_HOST); + if (!hostname) + goto einval; + + xprt_s = (char *)ldms_xprt_type_name(reqc->xprt->ldms.ldms); + + /* TODO: make sure that it makes sense to use uid, gid from the request context. */ + ldmsd_req_ctxt_sec_get(reqc, &sctxt); + uid = sctxt.crd.uid; + gid = sctxt.crd.gid; + + prdcr = ldmsd_prdcr_find(name); + if (!prdcr) { + prdcr = ldmsd_prdcr_new_with_auth(name, xprt_s, hostname, -1, + LDMSD_PRDCR_TYPE_GENERATED, lp->prdcr_conn_intvl, + NULL, uid, gid, 0770, + lp->rails, lp->recv_credits, lp->rate_limits); + if (!prdcr) { + reqc->errcode = ENOMEM; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Aggregator failed to create " + "the producer '%s'", name); + rc = ENOMEM; + goto out; + } + is_start = 1; + } else { + if (prdcr->xprt) { + ovis_log(NULL, OVIS_LERROR, + "Received a duplicate advertise request of producer '%s'. " + "LDMSD ignores the subsequent request.\n", name); + rc = EBUSY; + goto out; + } + } + + rbn = rbt_find(&lp->prdcr_tree, name); + if (!rbn) { + pl_pref = prdcr_ref_new(prdcr); + if (!pl_pref) { + ovis_log(config_log, OVIS_LCRIT, "Memory allocation failure.\n"); + reqc->errcode = ENOMEM; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Aggregator has memory allocation failure."); + rc = ENOMEM; + goto out; + } + rbt_ins(&lp->prdcr_tree, &pl_pref->rbn); + } + + /* Add the producer to any updaters that the producer matches */ + ldmsd_updtr_t updtr; + ldmsd_name_match_t match; + ldmsd_cfg_lock(LDMSD_CFGOBJ_UPDTR); + for (updtr = ldmsd_updtr_first(); updtr; updtr = ldmsd_updtr_next(updtr)) { + updtr_pref = ldmsd_updtr_prdcr_find(updtr, prdcr->obj.name); + if (updtr_pref) + continue; + + LIST_FOREACH(match, &updtr->prdcr_filter, entry) { + if (0 == regexec(&match->regex, prdcr->obj.name, 0, NULL, 0)) { + rc = __ldmsd_updtr_prdcr_add(updtr, prdcr); + if (rc) { + ovis_log(config_log, OVIS_LERROR, + "Failed to add the generated producer " + "'%s' to updater '%s'. Error %d\n", + name, updtr->obj.name, rc); + goto err; + } + break; + } + } + } + ldmsd_cfg_unlock(LDMSD_CFGOBJ_UPDTR); + + /* + * The producer state will be moved in the prdcr_task_cb() path. + */ + if (is_start) { + rc = ldmsd_prdcr_start(name, NULL, &sctxt); + if (rc) { + ovis_log(config_log, OVIS_LERROR, "Failed to start the " + "generated producer '%s'. Error %d.\n", + name, rc); + goto err; + } + } +out: + return rc; +einval: + ovis_log(config_log, OVIS_LERROR, + "The '%s' attribute is missing from " + "an advertise_notification request.\n", attr_name); + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute '%s' is missing from " + "an advertise request to an aggregator.", attr_name); + reqc->errcode = rc = EINVAL; + goto out; +err: + reqc->errcode = rc; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "Aggregator failed to start the " + "generated producer '%s'.", name); + goto out; +} + +/* + * If the producer listen contains both hostname regex and CIDR IP address range, + * the advertiser matches only when its hostname and IP address are matched + * the prdcr_listen's hostname regex and IP range. + */ +int __is_advertiser_matched(ldmsd_prdcr_listen_t pl, struct ldms_addr *advts_addr, + const char *advts_hostname) +{ + int is_host_matched = 1; + int is_ip_matched = 1; + + if (pl->hostname_regex_s) { + if (0 != regexec(&pl->regex, advts_hostname, 0, NULL, 0)) + is_host_matched = 0; + } + + if (pl->prefix_len) { + if (advts_addr->sa_family == AF_INET) { + struct ldms_addr s6 = { + .addr ={0,0,0,0,0,0,0,0,0,0,0xff,0xff,0,0,0,0} + }; + memcpy(&s6.addr[12], &advts_addr->addr, 4); + memcpy(&advts_addr->addr, &s6.addr, 16); + advts_addr->sa_family = AF_INET6; + } + /* A CIDR IP address was given. */ + if (0 == ldms_addr_in_network_addr(advts_addr, &pl->net_addr, pl->prefix_len)) + is_ip_matched = 0; + } + + if (is_host_matched && is_ip_matched) + return 1; + else + return 0; +} + +static int advertise_notification_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + ldmsd_prdcr_listen_t pl; + char *hostname; + char *name; + hostname = name = NULL; + struct ldms_addr rem_addr = {0}; + + rc = ldms_xprt_addr(reqc->xprt->ldms.ldms, NULL, &rem_addr); + if (rc) { + reqc->errcode = rc; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "An error %d occurred on the aggregator " + "while processing the advertisement.", rc); + goto send_reply; + } + + hostname = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_HOST); + if (!hostname) { + reqc->errcode = EINVAL; + reqc->line_off = snprintf(reqc->line_buf, reqc->line_len, + "The attribute 'hostname' is required."); + goto send_reply; + } + + for (pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_first(LDMSD_CFGOBJ_PRDCR_LISTEN); + pl; pl = (ldmsd_prdcr_listen_t)ldmsd_cfgobj_next(&pl->obj)) + { + if (pl->state != LDMSD_PRDCR_LISTEN_STATE_RUNNING) + continue; + if (__is_advertiser_matched(pl, &rem_addr, hostname)) { + /* The hostname matches the regular expression. */ + reqc->errcode = __get_prdcr(reqc, pl); + if (reqc->errcode) { + if (reqc->errcode == EBUSY) { + snprintf(reqc->line_buf, reqc->line_len, + "The client already has a running " + "producer with the given name."); + } else { + snprintf(reqc->line_buf, reqc->line_len, + "An error '%d' occurred on the peer.", reqc->errcode); + } + } + ldmsd_cfgobj_put(&pl->obj); /* Put back the 'first' or 'next' reference */ + goto send_reply; + } + } + /* + * The advertisement doesn't match any listening producers + */ + reqc->errcode = ENOENT; + snprintf(reqc->line_buf, reqc->line_len, + "The given hostname '%s' doesn't match " + "any `prdcr_listen`'s regex.", hostname); + ovis_log(NULL, OVIS_LERROR, "Received a producer advertisement " + "with hostname '%s', which isn't matched any listening producers. " + "Stop the advertisement, update its configuration, and then restart.\n", + hostname); +send_reply: + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int advertiser_add_handler(ldmsd_req_ctxt_t reqc) +{ + ldmsd_prdcr_t prdcr; + prdcr = __prdcr_add_handler(reqc, "advertiser_add", "advertiser"); + if (prdcr) { + __dlog(DLOG_CFGOK, "advertiser_add name=%s xprt=%s host=%s port=%u " + "auth=%s uid=%d gid=%d perm=%o\n", + prdcr->obj.name, prdcr->xprt_name, prdcr->host_name, + prdcr->port_no, prdcr->conn_auth_dom_name, + (int)prdcr->obj.uid, (int)prdcr->obj.gid, + (unsigned)prdcr->obj.perm); + } + + ldmsd_send_req_response(reqc, reqc->line_buf); + return 0; +} + +static int advertiser_start_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = 0; + ldmsd_prdcr_t prdcr; + char *name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); + + prdcr = ldmsd_prdcr_find(name); + if (!prdcr) { + prdcr = __prdcr_add_handler(reqc, "advertiser_start", "advertiser"); + if (!prdcr) { + /* + * Failed to create the producer. + * The error message was prepared in __prdcr_add_handler() + */ + goto send_reply; + } + ldmsd_prdcr_get(prdcr); /* Get a reference to match the find reference */ + } + + rc = __prdcr_start_handler(reqc, "advertiser_start", "advertiser"); + if (CONFIG_PLAYBACK_ENABLED(DLOG_CFGOK)) { + if (!rc && !reqc->errcode) { + __dlog(DLOG_CFGOK, "advertiser_start " + "name=%s xprt=%s host=%s port=%u " + "reconnect=%ld auth=%s uid=%d gid=%d perm=%o\n", + prdcr->obj.name, prdcr->xprt_name, prdcr->host_name, + prdcr->port_no, prdcr->conn_intrvl_us, prdcr->conn_auth_dom_name, + (int)prdcr->obj.uid, (int)prdcr->obj.gid, + (unsigned)prdcr->obj.perm); + } + } + +send_reply: + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int advertiser_stop_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_stop_handler(reqc, "advertiser_stop", "advertiser"); + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} + +static int advertiser_del_handler(ldmsd_req_ctxt_t reqc) +{ + int rc = __prdcr_del_handler(reqc, "advertiser_del", "advertiser"); + ldmsd_send_req_response(reqc, reqc->line_buf); + return rc; +} diff --git a/ldms/src/ldmsd/ldmsd_request.h b/ldms/src/ldmsd/ldmsd_request.h index 4b96bc0960..5d32cffbb6 100644 --- a/ldms/src/ldmsd/ldmsd_request.h +++ b/ldms/src/ldmsd/ldmsd_request.h @@ -88,6 +88,16 @@ enum ldmsd_request { LDMSD_PRDCR_UNSUBSCRIBE_REQ, LDMSD_PRDCR_STREAM_STATUS_REQ, LDMSD_BRIDGE_ADD_REQ, + LDMSD_ADVERTISER_ADD_REQ, + LDMSD_ADVERTISER_START_REQ, + LDMSD_ADVERTISER_STOP_REQ, + LDMSD_ADVERTISER_DEL_REQ, + LDMSD_PRDCR_LISTEN_ADD_REQ, + LDMSD_PRDCR_LISTEN_DEL_REQ, + LDMSD_PRDCR_LISTEN_START_REQ, + LDMSD_PRDCR_LISTEN_STOP_REQ, + LDMSD_PRDCR_LISTEN_STATUS_REQ, + LDMSD_ADVERTISE_NOTIFY_REQ, LDMSD_STRGP_ADD_REQ = 0x200, LDMSD_STRGP_DEL_REQ, LDMSD_STRGP_START_REQ, @@ -244,6 +254,7 @@ enum ldmsd_request_attr { LDMSD_ATTR_RX_RATE, LDMSD_ATTR_SUMMARY, LDMSD_ATTR_SIZE, + LDMSD_ATTR_IP, LDMSD_ATTR_LAST, }; @@ -304,6 +315,7 @@ typedef struct ldmsd_cfg_ldms_s { } *ldmsd_cfg_ldms_t; typedef struct ldmsd_cfg_file_s { + const char *path; /* Point to the path attribute value, don't free() */ uint64_t cfgfile_id; } *ldmsd_cfg_file_t; @@ -542,7 +554,13 @@ void ldmsd_ntoh_req_msg(ldmsd_req_hdr_t msg); * \param rec_len The record length */ void ldmsd_send_cfg_rec_adv(ldmsd_cfg_xprt_t xprt, uint32_t msg_no, uint32_t rec_len); -int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request); +/* + * \param req_filter is a function that returns zero if we want to process the + * request, and returns non-zero otherwise. + */ +typedef int (*req_filter_fn)(ldmsd_req_ctxt_t, void *); +int ldmsd_process_config_request(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t request, + req_filter_fn req_filter, void *filter_ctxt); int ldmsd_process_config_response(ldmsd_cfg_xprt_t xprt, ldmsd_req_hdr_t response); int ldmsd_append_reply(struct ldmsd_req_ctxt *reqc, const char *data, size_t data_len, int msg_flags); void ldmsd_send_error_reply(ldmsd_cfg_xprt_t xprt, uint32_t msg_no, diff --git a/ldms/src/ldmsd/ldmsd_request_util.c b/ldms/src/ldmsd/ldmsd_request_util.c index 8e12e962f0..06da861f64 100644 --- a/ldms/src/ldmsd/ldmsd_request_util.c +++ b/ldms/src/ldmsd/ldmsd_request_util.c @@ -63,6 +63,10 @@ struct req_str_id { const struct req_str_id req_str_id_table[] = { /* This table need to be sorted by keyword for bsearch() */ + { "advertise_add", LDMSD_ADVERTISER_ADD_REQ }, + { "advertise_del", LDMSD_ADVERTISER_DEL_REQ }, + { "advertise_start", LDMSD_ADVERTISER_START_REQ }, + { "advertise_stop", LDMSD_ADVERTISER_STOP_REQ }, { "auth_add", LDMSD_AUTH_ADD_REQ }, { "auth_del", LDMSD_AUTH_DEL_REQ }, { "banner", LDMSD_BANNER_MODE_REQ }, @@ -105,6 +109,10 @@ const struct req_str_id req_str_id_table[] = { { "prdcr_add", LDMSD_PRDCR_ADD_REQ }, { "prdcr_del", LDMSD_PRDCR_DEL_REQ }, { "prdcr_hint_tree", LDMSD_PRDCR_HINT_TREE_REQ }, + { "prdcr_listen_add", LDMSD_PRDCR_LISTEN_ADD_REQ }, + { "prdcr_listen_del", LDMSD_PRDCR_LISTEN_DEL_REQ }, + { "prdcr_listen_start", LDMSD_PRDCR_LISTEN_START_REQ }, + { "prdcr_listen_stop", LDMSD_PRDCR_LISTEN_STOP_REQ }, { "prdcr_set_status", LDMSD_PRDCR_SET_REQ }, { "prdcr_start", LDMSD_PRDCR_START_REQ }, { "prdcr_start_regex", LDMSD_PRDCR_START_REGEX_REQ }, @@ -166,6 +174,7 @@ const struct req_str_id attr_str_id_table[] = { { "auto_interval", LDMSD_ATTR_AUTO_INTERVAL }, { "auto_switch", LDMSD_ATTR_AUTO_SWITCH }, { "base", LDMSD_ATTR_BASE }, + { "cidr", LDMSD_ATTR_IP }, { "container", LDMSD_ATTR_CONTAINER }, { "credits", LDMSD_ATTR_CREDITS }, { "decomposition", LDMSD_ATTR_DECOMP }, @@ -254,6 +263,7 @@ const char *ldmsd_req_id2str(enum ldmsd_request req_id) case LDMSD_PRDCR_HINT_TREE_REQ : return "PRDCR_HINT_TREE_REQ"; case LDMSD_PRDCR_SUBSCRIBE_REQ : return "PRDCR_SUBSCRIBE_REQ"; case LDMSD_PRDCR_UNSUBSCRIBE_REQ : return "PRDCR_UNSUBSCRIBE_REQ"; + case LDMSD_PRDCR_LISTEN_ADD_REQ : return "PRDCR_LISTEN_REQ"; case LDMSD_STRGP_ADD_REQ : return "STRGP_ADD_REQ"; case LDMSD_STRGP_DEL_REQ : return "STRGP_DEL_REQ"; @@ -869,6 +879,64 @@ int __ldmsd_parse_default_auth_req(struct ldmsd_parse_ctxt *ctxt) return rc; } +/* The function adds the attribute 'type' with the 'advertise' value to the request */ +int __ldmsd_parse_advertise_add_req(struct ldmsd_parse_ctxt *ctxt) +{ + char *av = ctxt->av; + size_t len = strlen(av); + size_t cnt = 0; + char *tmp, *name, *value, *ptr, *dummy; + int rc = 0; + dummy = NULL; + tmp = malloc(len); + if (!tmp) { + rc = ENOMEM; + goto out; + } + av = strtok_r(av, __ldmsd_cfg_delim, &ptr); + while (av) { + ctxt->av = av; + dummy = strdup(av); + if (!dummy) { + rc = ENOMEM; + goto out; + } + __get_attr_name_value(dummy, &name, &value); + if (!name) { + /* av is neither attribute value nor keyword */ + rc = EINVAL; + goto out; + } + rc = add_attr_from_attr_str(name, value, + &ctxt->request, + &ctxt->request_sz); + if (rc) + goto out; + av = strtok_r(NULL, __ldmsd_cfg_delim, &ptr); + free(dummy); + dummy = NULL; + } + rc = add_attr_from_attr_str("type", "advertise", + &ctxt->request, + &ctxt->request_sz); + if (rc) + goto out; + + if (cnt) { + tmp[cnt-1] = '\0'; /* Replace the last ' ' with '\0' */ + /* Add an attribute of type 'STRING' */ + rc = add_attr_from_attr_str(NULL, tmp, + &ctxt->request, + &ctxt->request_sz); + } + +out: + if (tmp) + free(tmp); + if (dummy) + free(dummy); + return rc; +} struct ldmsd_req_array * ldmsd_parse_config_str(const char *cfg, uint32_t msg_no, size_t xprt_max_msg) @@ -945,6 +1013,10 @@ ldmsd_parse_config_str(const char *cfg, uint32_t msg_no, size_t xprt_max_msg) case LDMSD_DEFAULT_AUTH_REQ: rc = __ldmsd_parse_default_auth_req(&ctxt); break; + case LDMSD_ADVERTISER_ADD_REQ: + case LDMSD_ADVERTISER_START_REQ: + rc = __ldmsd_parse_advertise_add_req(&ctxt); + break; default: rc = __ldmsd_parse_generic(&ctxt); break; @@ -1053,6 +1125,14 @@ ldmsd_req_attr_t ldmsd_req_attr_get_by_id(char *request, uint32_t attr_id) return NULL; } +char *ldmsd_req_attr_value_by_id(char *request, uint32_t attr_id) +{ + ldmsd_req_attr_t attr = ldmsd_req_attr_get_by_id(request, attr_id); + if (!attr) + return NULL; + return str_repl_env_vars((char *)attr->attr_value); +} + ldmsd_req_attr_t ldmsd_req_attr_get_by_name(char *request, const char *name) { int32_t attr_id = ldmsd_req_attr_str2id(name); diff --git a/ldms/src/ldmsd/ldmsd_updtr.c b/ldms/src/ldmsd/ldmsd_updtr.c index f8542a07a6..58c91a2573 100644 --- a/ldms/src/ldmsd/ldmsd_updtr.c +++ b/ldms/src/ldmsd/ldmsd_updtr.c @@ -890,13 +890,10 @@ static int updtr_tasks_create(ldmsd_updtr_t updtr) return rc; } -int prdcr_ref_cmp(void *a, const void *b) -{ - return strcmp(a, b); -} - #define UPDTR_TREE_MGMT_TASK_INTRVL 3600000000 +/* The implementation is in ldmsd_prdcr.c */ +extern int prdcr_ref_cmp(void *a, const void *b); ldmsd_updtr_t ldmsd_updtr_new_with_auth(const char *name, char *interval_str, char *offset_str, int push_flags, int is_auto_task, @@ -1337,15 +1334,8 @@ int ldmsd_updtr_match_del(const char *updtr_name, const char *regex_str, return rc; } -ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr) -{ - ldmsd_prdcr_ref_t ref = calloc(1, sizeof *ref); - if (ref) { - ref->prdcr = ldmsd_prdcr_get(prdcr); - rbn_init(&ref->rbn, prdcr->obj.name); - } - return ref; -} +/* The implementations are in ldmsd_prdcr.c */ +extern ldmsd_prdcr_ref_t prdcr_ref_new(ldmsd_prdcr_t prdcr); ldmsd_prdcr_ref_t prdcr_ref_find(ldmsd_updtr_t updtr, const char *name) { @@ -1356,8 +1346,6 @@ ldmsd_prdcr_ref_t prdcr_ref_find(ldmsd_updtr_t updtr, const char *name) return container_of(rbn, struct ldmsd_prdcr_ref, rbn); } - - ldmsd_prdcr_ref_t prdcr_ref_find_regex(ldmsd_updtr_t updtr, regex_t *regex) { struct rbn *rbn; @@ -1380,10 +1368,10 @@ int __ldmsd_updtr_prdcr_add(ldmsd_updtr_t updtr, ldmsd_prdcr_t prdcr) ldmsd_prdcr_ref_t ref; ldmsd_updtr_lock(updtr); - if (updtr->state != LDMSD_UPDTR_STATE_STOPPED) { - rc = EBUSY; - goto out; - } +// if (updtr->state != LDMSD_UPDTR_STATE_STOPPED) { +// rc = EBUSY; +// goto out; +// } ref = prdcr_ref_find(updtr, prdcr->obj.name); if (ref) { rc = EEXIST; @@ -1403,20 +1391,15 @@ int __ldmsd_updtr_prdcr_add(ldmsd_updtr_t updtr, ldmsd_prdcr_t prdcr) int ldmsd_updtr_prdcr_add(const char *updtr_name, const char *prdcr_regex, char *rep_buf, size_t rep_len, ldmsd_sec_ctxt_t ctxt) { - regex_t regex; ldmsd_updtr_t updtr; ldmsd_prdcr_t prdcr; + ldmsd_name_match_t prd_match; int rc; - rc = ldmsd_compile_regex(®ex, prdcr_regex, rep_buf, rep_len); - if (rc) - return EINVAL; - updtr = ldmsd_updtr_find(updtr_name); if (!updtr) { sprintf(rep_buf, "%dThe updater specified does not " "exist\n", ENOENT); - regfree(®ex); return ENOENT; } @@ -1430,9 +1413,32 @@ int ldmsd_updtr_prdcr_add(const char *updtr_name, const char *prdcr_regex, rc = EBUSY; goto out_1; } + + prd_match = calloc(1, sizeof(*prd_match)); + if (!prd_match) { + ovis_log(NULL, OVIS_LCRIT, "Memory allocation failure.\n"); + rc = ENOMEM; + goto unlock; + } + prd_match->regex_str = strdup(prdcr_regex); + if (!prd_match->regex_str) { + ovis_log(NULL, OVIS_LCRIT, "Memory allocation failure.\n"); + rc = ENOMEM; + free(prd_match); + goto unlock; + } + + rc = ldmsd_compile_regex(&prd_match->regex, prdcr_regex, rep_buf, rep_len); + if (rc) { + rc = EINVAL; + free(prd_match); + goto unlock; + } + + LIST_INSERT_HEAD(&updtr->prdcr_filter, prd_match, entry); ldmsd_cfg_lock(LDMSD_CFGOBJ_PRDCR); for (prdcr = ldmsd_prdcr_first(); prdcr; prdcr = ldmsd_prdcr_next(prdcr)) { - if (regexec(®ex, prdcr->obj.name, 0, NULL, 0)) + if (regexec(&prd_match->regex, prdcr->obj.name, 0, NULL, 0)) continue; /* See if this match is already in the list */ ldmsd_prdcr_ref_t ref = prdcr_ref_find(updtr, prdcr->obj.name); @@ -1451,7 +1457,7 @@ int ldmsd_updtr_prdcr_add(const char *updtr_name, const char *prdcr_regex, ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR); sprintf(rep_buf, "0\n"); out_1: - regfree(®ex); +unlock: ldmsd_updtr_unlock(updtr); ldmsd_updtr_put(updtr); return rc;