From 9dd76c499c73edb2ad952a1d79e0c5eb179b259d Mon Sep 17 00:00:00 2001 From: Ben Allan Date: Fri, 26 Jan 2024 13:43:08 -0700 Subject: [PATCH] ldms-notify enhancements needed for production convenience. While not always calculable, the duration of tracked pids is of great interest and rather hard to compute downstream. This adds the duration field to process end messages and a format option to suppress it for sites not ready to migrate to the expanded schema. Unlike ldmsd, the status of ldms-notify cannot be determined remotely, so this also adds a heartbeat message, which by default is not generated. --- ldms/scripts/examples/linux_proc_sampler | 11 +- ldms/src/sampler/netlink/netlink-notifier.c | 316 ++++++++++++++---- ldms/src/sampler/netlink/netlink-notifier.man | 19 ++ 3 files changed, 275 insertions(+), 71 deletions(-) diff --git a/ldms/scripts/examples/linux_proc_sampler b/ldms/scripts/examples/linux_proc_sampler index 2a13424bd..b5e1208c4 100644 --- a/ldms/scripts/examples/linux_proc_sampler +++ b/ldms/scripts/examples/linux_proc_sampler @@ -67,12 +67,15 @@ cat << EOF > $LDMSD_RUN/metrics.input } EOF rm -f $LOGDIR/json*.log -#valgrind -v --tool=drd --log-file=$LOGDIR/vg.netlink.txt ${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked & -#valgrind -v --leak-check=full --track-origins=yes --trace-children=yes --log-file=$LOGDIR/vg.netlink.txt ${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked & -${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked -x -e exec,clone,exit & +drd="valgrind -v --tool=drd --log-file=$LOGDIR/vg.netlink.drd.txt --trace-cond=yes --trace-fork-join=yes" +memcheck="valgrind -v --leak-check=full --track-origins=yes --trace-children=yes --log-file=$LOGDIR/vg.netlink.memcheck.txt --keep-debuginfo=yes --malloc-fill=3b" +#${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked & + +${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked -x -e exec,clone,exit -L $LOGDIR/nl.log --heartbeat 1 -v 0 & + # uncomment next one to test duplicate handling #${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json2.log --exclude-dir-path= --exclude-short-path= --exclude-programs & -VGARGS="--tool=drd --suppressions=/scratch1/baallan/ovis/ldms/scripts/examples/linux_proc_sampler.drd.supp" +VGARGS="--tool=drd --trace-cond=yes --trace-fork-join=yes" VGARGS="--leak-check=full --track-origins=yes --trace-children=yes --show-leak-kinds=definite --time-stamp=yes --keep-debuginfo=yes --malloc-fill=3b" #vgon LDMSD 1 diff --git a/ldms/src/sampler/netlink/netlink-notifier.c b/ldms/src/sampler/netlink/netlink-notifier.c index 6a661b214..036e0e380 100644 --- a/ldms/src/sampler/netlink/netlink-notifier.c +++ b/ldms/src/sampler/netlink/netlink-notifier.c @@ -230,6 +230,7 @@ typedef struct proc_info { bool excluded; /* true if matches excluded path lists or uid bound */ bool excluded_short; /* true if matches excluded_short path list */ int emitted; /* values from EMIT_* below */ + double duration; /* -1 if unknown or seconds pid endured (only valid at end) */ } proc_info_t; #define EMIT_NONE 0x0 @@ -320,6 +321,9 @@ typedef struct forkstat { double opt_wake_interval; char *compid_field; /* compid field formatted, if requested */ char *prod_field; /* ProducerName field formatted, if requested */ + unsigned long format; /* stream formatting version to use */ + int heartbeat; /* approximate seconds between heartbeats */ + time_t lastbeat; /* time of last beat */ } forkstat_t; @@ -405,6 +409,8 @@ static const int signals[] = { }; /* add several structures related to filtering. */ +#define default_format "1" /* this is also the max format known */ +#define default_heartbeat NULL /* none by default*/ #define default_component_id NULL #define default_ProducerName NULL #define default_track_dir "/var/run/ldms-netlink-tracked" @@ -2105,8 +2111,10 @@ static void *monitor(void *vp) d1 = timeval_to_double(&info1->start); d2 = timeval_to_double(&tv); (void)snprintf(duration, sizeof(duration), "%8s", secs_to_str(d2 - d1)); + info1->duration = d2 - d1; } else { (void)snprintf(duration, sizeof(duration), "unknown"); + info1->duration = -1.0; } if (ft->opt_flags & (OPT_UMIN | OPT_EXTRA)) { if (info1->uid < ft->opt_uidmin) { @@ -2350,6 +2358,8 @@ static struct exclude_arg excludes[] = { {default_track_dir, VT_DIR, 0, "NOTIFIER_TRACK_DIR", NULL, 0, PLINIT}, {default_ProducerName, VT_SCALAR, 0, "NOTIFIER_PRODUCERNAME", NULL, 0, PLINIT}, {default_component_id, VT_SCALAR, 0, "NOTIFIER_COMPONENT_ID", NULL, 0, PLINIT}, + {default_format, VT_SCALAR, 0, "NOTIFIER_FORMAT", NULL, 0, PLINIT}, + {default_heartbeat, VT_SCALAR, 0, "NOTIFIER_HEARTBEAT", NULL, 0, PLINIT}, }; static struct exclude_arg *bin_exclude = &excludes[0]; static struct exclude_arg *dir_exclude = &excludes[1]; @@ -2366,6 +2376,8 @@ static struct exclude_arg *send_log_arg = &excludes[11]; static struct exclude_arg *track_dir_arg = &excludes[12]; static struct exclude_arg *prod_arg = &excludes[13]; static struct exclude_arg *compid_arg = &excludes[14]; +static struct exclude_arg *format_arg = &excludes[15]; +static struct exclude_arg *heartbeat_arg = &excludes[16]; static struct option long_options[] = { {"exclude-programs", optional_argument, 0, 0}, @@ -2383,6 +2395,8 @@ static struct option long_options[] = { {"track-dir", required_argument, 0, 0}, {"ProducerName", required_argument, 0, 0}, {"component_id", required_argument, 0, 0}, + {"format", required_argument, 0, 0}, + {"heartbeat", required_argument, 0, 0}, {0, 0, 0, 0} }; @@ -2539,6 +2553,41 @@ static int set_interval(char *optarg, forkstat_t *ft) return 0; } +static int set_heartbeat(char *optarg, forkstat_t *ft) +{ + if (!optarg || !ft) + return EINVAL; + char *end = NULL; + ft->heartbeat = strtol(optarg, &end, 10); + if (ft->heartbeat < 0) { + PRINTF("Illegal --heartbeat: %s, must be positive\n", optarg); + return EINVAL; + } + if (*end != '\0') { + PRINTF("--heartbeat needed, not %s\n", optarg); + return EINVAL; + } + return 0; +} + +static int set_format(char *optarg, forkstat_t *ft) +{ + if (!optarg || !ft) + return EINVAL; + char *end = NULL; + unsigned long max_format = strtoul(default_format, &end, 10); + ft->format = strtoul(optarg, &end, 10); + if (ft->format > max_format) { + PRINTF("Illegal --format: %s, must be <= %s\n", optarg, default_format); + return EINVAL; + } + if (*end != '\0') { + PRINTF("--format needed, not %s\n", optarg); + return EINVAL; + } + return 0; +} + static int set_duration_min(char *optarg, forkstat_t *ft) { if (!optarg || !ft) @@ -3029,6 +3078,7 @@ static int add_env_attr(struct env_attr *a, jbuf_t *jb, const struct proc_info * return 0; } + static jbuf_t make_process_start_data_linux(forkstat_t *ft, const struct proc_info *info, #if DEBUG_EMITTER const char *type, @@ -3087,42 +3137,78 @@ static jbuf_t make_process_start_data_linux(forkstat_t *ft, const struct proc_in } + static jbuf_t make_process_end_data_linux(forkstat_t *ft, const struct proc_info *info, jbuf_t jbd) { jbuf_t jb = jbd; - (void)ft; if (!jb) return NULL; - jb = jbuf_append_str(jb, - "{" - "\"msgno\":%" PRIu64 "," - "\"schema\":\"linux_task_data\"," - "\"event\":\"task_exit\"," - "\"timestamp\":%d," - "\"context\":\"*\"," - "\"data\":" + + if (ft->format == 0) + jb = jbuf_append_str(jb, "{" - "%s%s" - "\"start\":\"%lu.%06lu\"," - /* format start_tick as string because u64 - * is out of ovis_json signed int range */ - "\"start_tick\":\"%" PRIu64 "\"," - "\"job_id\":\"%s\"," - "\"serial\":%" PRId64 "," - "\"os_pid\":%" PRId64 "," - "\"task_pid\":%d" - "}" - "}", - forkstat_get_serial(ft), - time(NULL), - ft->prod_field, ft->compid_field, - info->start.tv_sec, info->start.tv_usec, - info->start_tick, - info_jobid_str(info), - info->serno, - (int64_t)info->pid, - (int)info->pid); + "\"msgno\":%" PRIu64 "," + "\"schema\":\"linux_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"start\":\"%lu.%06lu\"," + /* format start_tick as string because u64 + * is out of ovis_json signed int range */ + "\"start_tick\":\"%" PRIu64 "\"," + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 "," + "\"task_pid\":%d" + "}" + "}", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info->start.tv_sec, info->start.tv_usec, + info->start_tick, + info_jobid_str(info), + info->serno, + (int64_t)info->pid, + (int)info->pid); + else if (ft->format == 1) + jb = jbuf_append_str(jb, + "{" + "\"msgno\":%" PRIu64 "," + "\"schema\":\"linux_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"start\":\"%lu.%06lu\"," + /* format start_tick as string because u64 + * is out of ovis_json signed int range */ + "\"start_tick\":\"%" PRIu64 "\"," + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 "," + "\"task_pid\":%d," + "\"duration\":%.17g" + "}" + "}", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info->start.tv_sec, info->start.tv_usec, + info->start_tick, + info_jobid_str(info), + info->serno, + (int64_t)info->pid, + (int)info->pid, + info->duration); + else + jb = NULL; return jb; } @@ -3183,27 +3269,54 @@ static jbuf_t make_process_end_data_lsf(forkstat_t *ft, const struct proc_info * jbuf_t jb = jbd; if (!jb) return NULL; - jb = jbuf_append_str(jb, - "{" - "\"msgno\":%" PRIu64 "," - "\"schema\":\"lsf_task_data\"," - "\"event\":\"task_exit\"," - "\"timestamp\":%d," - "\"context\":\"*\"," - "\"data\":" + if (ft->format == 0) + jb = jbuf_append_str(jb, "{" - "%s%s" - "\"start\":\"%lu.%06lu\"," - "\"job_id\":\"%s\"," - "\"serial\":%" PRId64 "," - "\"os_pid\":%" PRId64 ",", - forkstat_get_serial(ft), - time(NULL), - ft->prod_field, ft->compid_field, - info->start.tv_sec, info->start.tv_usec, - info_jobid_str(info), - info->serno, - (int64_t)info->pid); + "\"msgno\":%" PRIu64 "," + "\"schema\":\"lsf_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"start\":\"%lu.%06lu\"," + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 ",", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info->start.tv_sec, info->start.tv_usec, + info_jobid_str(info), + info->serno, + (int64_t)info->pid); + else if (ft->format == 1) + jb = jbuf_append_str(jb, + "{" + "\"msgno\":%" PRIu64 "," + "\"schema\":\"lsf_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"start\":\"%lu.%06lu\"," + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 "," + "\"duration\":%.17g,", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info->start.tv_sec, info->start.tv_usec, + info_jobid_str(info), + info->serno, + (int64_t)info->pid, + info->duration); + else + jb = NULL; if (!jb) goto out_1; size_t i, iend; @@ -3285,25 +3398,52 @@ static jbuf_t make_process_end_data_slurm(forkstat_t *ft, const struct proc_info if (!jb) return NULL; - jb = jbuf_append_str(jb, - "{" - "\"msgno\":%" PRIu64 "," - "\"schema\":\"slurm_task_data\"," - "\"event\":\"task_exit\"," - "\"timestamp\":%d," - "\"context\":\"*\"," - "\"data\":" + if (ft->format == 0) + jb = jbuf_append_str(jb, "{" - "%s%s" - "\"job_id\":\"%s\"," - "\"serial\":%" PRId64 "," - "\"os_pid\":%" PRId64 ",", - forkstat_get_serial(ft), - time(NULL), - ft->prod_field, ft->compid_field, - info_jobid_str(info), - info->serno, - (int64_t)info->pid); + "\"msgno\":%" PRIu64 "," + "\"schema\":\"slurm_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 ",", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info_jobid_str(info), + info->serno, + (int64_t)info->pid); + else if (ft->format == 1) + jb = jbuf_append_str(jb, + "{" + "\"msgno\":%" PRIu64 "," + "\"schema\":\"slurm_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"job_id\":\"%s\"," + "\"start\":\"%lu.%06lu\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 "," + "\"duration\":%.17g,", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info_jobid_str(info), + info->start.tv_sec, info->start.tv_usec, + info->serno, + (int64_t)info->pid, + info->duration); + else + jb = NULL; if (!jb) goto out_1; int i, iend; @@ -3546,6 +3686,33 @@ static int forkstat_set_json_log(forkstat_t *ft, const char *fname) static forkstat_t *shft; +void heartbeat(forkstat_t *ft, time_t now, jbuf_t *jbd) +{ + if (!ft || !ft->heartbeat || !jbd || !*jbd) + return; + if (now - ft->lastbeat >= ft->heartbeat) { + ft->lastbeat = now; + jbuf_t jb = *jbd; + /* format similar to others without data object */ + jbuf_reset(jb); + jb = jbuf_append_str(jb, + "{" + "\"msgno\":%" PRIu64 "," + "\"schema\":\"ldms-notify-status\"," + "%s%s" + "\"timestamp\":%" PRId64 "," + "\"event\":\"heartbeat\",\"data\":{}}", + forkstat_get_serial(ft), + ft->prod_field, ft->compid_field, + (int64_t)ft->lastbeat + ); + if (jb) { + send_ldms_message(ft, jb); + } + *jbd = jb; + } +} + #define DFLT_SORT_SZ 128 static void *dump_pids(void *vp) { @@ -3608,6 +3775,7 @@ static void *dump_pids(void *vp) if (ft->opt_trace) PRINTF("end UPDATE\n"); + heartbeat(ft, tv.tv_sec, &jb); nanosleep(&s, NULL); } jbuf_free(jb); @@ -3788,6 +3956,12 @@ int main(int argc, char * argv[]) } normalize_exclude(&excludes[c]); } + if (set_format(format_arg->paths[0].n, ft)) { + fprintf(stderr, "Bad value %s for %s (>=0 & <= %s).\n", + format_arg->paths[0].n, "format", format_arg[0].defval); + ret = EXIT_FAILURE; + goto abort_sock; + } if (compid_arg[0].parsed && compid_arg->paths[0].n) { size_t csz = strlen(compid_arg->paths[0].n) + 20; char ctmp[csz]; @@ -3796,6 +3970,13 @@ int main(int argc, char * argv[]) } else { ft->compid_field = strdup(""); } + if (heartbeat_arg[0].parsed && heartbeat_arg->paths[0].n && + set_heartbeat(heartbeat_arg->paths[0].n, ft)) { // NPR fixme + fprintf(stderr, "Bad value %s for %s.\n", + heartbeat_arg->paths[0].n, "heartbeat. need seconds."); + ret = EXIT_FAILURE; + goto abort_sock; + } if (prod_arg[0].parsed && prod_arg->paths[0].n) { size_t csz = strlen(prod_arg->paths[0].n) + 20; char ctmp[csz]; @@ -3813,6 +3994,7 @@ int main(int argc, char * argv[]) fprintf(stderr, "Bad value %s for %s or %s.\n", duration_exclude->paths[0].n, long_options[3].name, duration_exclude->env); ret = EXIT_FAILURE; + goto abort_sock; } if (ft->opt_trace) diff --git a/ldms/src/sampler/netlink/netlink-notifier.man b/ldms/src/sampler/netlink/netlink-notifier.man index 4a54b784e..088f269b4 100644 --- a/ldms/src/sampler/netlink/netlink-notifier.man +++ b/ldms/src/sampler/netlink/netlink-notifier.man @@ -90,6 +90,8 @@ The 'short' options do not override the exclude entirely options. If not set, the component_id field is not included in the stream formats produced. --ProducerName= set the value of ProducerName If not set, the ProducerName field is not included in the stream formats produced. +--format=N change the format of messages to version N. + If not set, the highest available format is used. See MESSAGE FORMATS. .fi .SH ENVIRONMENT @@ -107,6 +109,8 @@ NOTIFIER_LDMS_XPRT=sock NOTIFIER_LDMS_HOST=localhost NOTIFIER_LDMS_PORT=411 NOTIFIER_LDMS_AUTH=munge +NOTIFIER_FORMAT=1 +NOTIFIER_HEARTBEAT=(none) .fi Omitting (nullexe): from NOTIFIER_EXCLUDE_PROGRAMS may cause incomplete output related to processes no longer present. In exotic circumstances, this may be desirable anyway. @@ -127,6 +131,15 @@ Client applications may validate a file by checking the contents against the /proc/$pid/stat content, if it exists. Invalid files should be removed by clients or system scripts. +.SH MESSAGE FORMATS +Message formats tuned to SLURM, LSF, and Linux without a batch scheduler +are published, based on what the notifier detects and the users choice of +ProducerName and component_id. The version of the tuned formats is specified by number. +.PP +Format 0 omits the start time from slurm process end messages (since it is only sometimes known) and omits process duration, which depend on the start time. +.PP +Format 1 includes the start time for slurm process or the dummy value 0 when unknown) and includes process duration for all end messages. When the start time is unavailable, duration of -1.0 is published. Merging data from other sources may allow durations flagged as -1 to be computed in some later data cleanup step. + .SH NOTES .PP @@ -138,7 +151,13 @@ If not used with a sampler, the --component_id or --ProducerName options are nee to add a node identifier to the messages. Normally a process-following sampler that creates sets will add the node identifier automatically. .PP +When the daemon is started after a process is started, the process start time and therefore process +duration may not be available. In message formats which report start time, 0 indicates +data was unavailable. For processes without completely known time bounds, the duration is reported +as -1.0. +.PP Options are still in development. Several options affect only the trace output. + .SH EXAMPLES .PP Run for 30 seconds with screen and json.log test output connecting to the ldmsd from 'ldms-static-test.sh blobwriter' test: