From 9c915c5a86a2612ad95e581300c715bc794e6bd8 Mon Sep 17 00:00:00 2001 From: Ben Allan Date: Fri, 19 Apr 2024 14:14:03 -0600 Subject: [PATCH] add rollover to blob_stream_writer in b4.4 this will help reduce incomplete file writes as well. --- ldms/scripts/examples/blob_writer_rollover | 35 ++ ldms/scripts/examples/blob_writer_rollover.1 | 3 + ldms/scripts/examples/blob_writer_rollover.2 | 17 + ldms/src/sampler/blob_stream/Makefile.am | 7 +- .../blob_stream/Plugin_blob_stream_writer.man | 40 +- .../sampler/blob_stream/blob_stream_writer.c | 481 +++++++++++++++--- 6 files changed, 495 insertions(+), 88 deletions(-) create mode 100644 ldms/scripts/examples/blob_writer_rollover create mode 100644 ldms/scripts/examples/blob_writer_rollover.1 create mode 100644 ldms/scripts/examples/blob_writer_rollover.2 diff --git a/ldms/scripts/examples/blob_writer_rollover b/ldms/scripts/examples/blob_writer_rollover new file mode 100644 index 000000000..a47880205 --- /dev/null +++ b/ldms/scripts/examples/blob_writer_rollover @@ -0,0 +1,35 @@ +export plugname=dstat +export dsname=$(ldms_dstat_schema_name mmalloc=1 io=1 fd=1 auto-schema=1) +export dstat_schema=$dsname +portbase=61060 +# memcheck +VGARGS="--trace-children=yes --track-origins=yes --leak-check=full --show-leak-kinds=all" +# drd +#VGARGS="--trace-children=yes --tool=drd --trace-mutex=yes" +# track everything notifier config: +${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 60 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs -v 1 & +LDMSD 1 +#vgon +LDMSD 2 +#vgoff +MESSAGE ldms_ls on host 1: +LDMS_LS 1 -l +MESSAGE ldms_ls on host 2: +LDMS_LS 2 -l +for kt2 in $(seq 5); do +#MESSAGE "trying rollover via reconfig: $(date +%s)" +#echo "config name=blob_stream_writer path=${STOREDIR} container=blobs stream=slurm timing=1 types=1 spool=1" | \ +#ldmsctl -p 61062 -a none -x sock -h localhost +for kt in $(seq 4); do + SLEEP 2 +done +done +SLEEP 1 +SLEEP 2 +SLEEP 2 +SLEEP 2 +SLEEP 1 +KILL_LDMSD 1 2 +file_created $STOREDIR/blobs/spool/slurm.TIMING.1* +file_created $STOREDIR/blobs/spool/slurm.DAT.1* +file_created $STOREDIR/blobs/spool/slurm.OFFSET.1* diff --git a/ldms/scripts/examples/blob_writer_rollover.1 b/ldms/scripts/examples/blob_writer_rollover.1 new file mode 100644 index 000000000..b68882d80 --- /dev/null +++ b/ldms/scripts/examples/blob_writer_rollover.1 @@ -0,0 +1,3 @@ +#load name=dstat +#config name=dstat producer=localhost${i} instance=localhost${i}/${dstat_schema} component_id=${i} mmalloc=1 io=1 fd=1 auto-schema=1) +#start name=dstat interval=1000000 offset=0 diff --git a/ldms/scripts/examples/blob_writer_rollover.2 b/ldms/scripts/examples/blob_writer_rollover.2 new file mode 100644 index 000000000..e0530311b --- /dev/null +++ b/ldms/scripts/examples/blob_writer_rollover.2 @@ -0,0 +1,17 @@ +load name=blob_stream_writer plugin=blob_stream_writer +config name=blob_stream_writer path=${STOREDIR} container=blobs stream=slurm timing=1 types=1 spool=1 rolltype=3 rollover=10 + +prdcr_add name=localhost1 host=${HOST} type=active xprt=${XPRT} port=${port1} interval=2000000 +prdcr_subscribe regex=.* stream=slurm +prdcr_start name=localhost1 + +updtr_add name=allhosts interval=1000000 offset=100000 +updtr_prdcr_add name=allhosts regex=.* +updtr_start name=allhosts + +# load name=store_csv +# config name=store_csv path=${STOREDIR} altheader=0 + +# strgp_add name=store_${testname} plugin=store_csv schema=${dstat_schema} container=node +# strgp_prdcr_add name=store_${testname} regex=.* +# strgp_start name=store_${testname} diff --git a/ldms/src/sampler/blob_stream/Makefile.am b/ldms/src/sampler/blob_stream/Makefile.am index 569cab848..df9608ead 100644 --- a/ldms/src/sampler/blob_stream/Makefile.am +++ b/ldms/src/sampler/blob_stream/Makefile.am @@ -6,9 +6,10 @@ dist_man7_MANS= AM_LDFLAGS = @OVIS_LIB_ABS@ AM_CPPFLAGS = @OVIS_INCLUDE_ABS@ -STORE_LIBADD = $(top_builddir)/ldms/src/core/libldms.la \ - $(top_builddir)/lib/src/coll/libcoll.la \ - $(top_builddir)/lib/src/ovis_util/libovis_util.la +STORE_LIBADD = $(top_builddir)/ldms/src/ldmsd/libldmsd_plugattr.la \ + $(top_builddir)/ldms/src/core/libldms.la \ + $(top_builddir)/lib/src/coll/libcoll.la \ + $(top_builddir)/lib/src/ovis_util/libovis_util.la diff --git a/ldms/src/sampler/blob_stream/Plugin_blob_stream_writer.man b/ldms/src/sampler/blob_stream/Plugin_blob_stream_writer.man index 2680e5e2c..a2ff90e9e 100644 --- a/ldms/src/sampler/blob_stream/Plugin_blob_stream_writer.man +++ b/ldms/src/sampler/blob_stream/Plugin_blob_stream_writer.man @@ -47,11 +47,46 @@ Enable logging of messages stored to the log file. timing=1 .br Enable writing timestamps to a separate file. -.RE .TP spool=1 .br Move closed files to the directory //spool/. +.TP +rolltype= +.br +By default, the store does not rollover and the data is written to a continously open filehandle. Rolltype and rollover are used in conjunction to enable the store to manage rollover, including flushing before rollover. The header will be rewritten when a roll occurs. Valid options are: +.RS +.TP +1 +.br +wake approximately every rollover seconds and roll. +Rollover is suppressed if no data at all has been written and rollempty=0. +.TP +2 +.br +wake daily at rollover seconds after midnight (>=0) and roll. +Rollover is suppressed if no data at all has been written and rollempty=0. +.TP +3 +.br +roll after approximately rollover records are written. +.TP +4 +roll after approximately rollover bytes are written. +.TP +5 +.br +wake at rollover seconds after midnight (>=0) and roll, then repeat every rollagain (> rollover) seconds during the day. For example "rollagain=3600 rollover=0 rolltype=5" rolls files hourly. +Rollover is suppressed if no data at all has been written and rollempty=0. +.RE +.TP +rollover= +.br +Rollover value controls the frequency of rollover (e.g., number of bytes, number of records, time interval, seconds after midnight). Note that these values are estimates. +.TP +rollempty=0 +.br +Turn off rollover of empty files. Default value is 1 (create extra empty files). .RE .SH OUTPUT FORMAT @@ -86,7 +121,8 @@ This writer is in development and may be changed at any time. Cannot support stream=.* as there is no corresponding regex subscription policy currently available in the C stream API. .PP -The config operation may called at any time or repeated. +The config operation may called at any time or repeated, though the use of rollover +policies is recommended instead. Repeated configuration of rollover is silently ignored. The start and stop operations will start and stop storage of all streams. .PP The plugin appears in C code as a sampler plugin, since the storage policy and store diff --git a/ldms/src/sampler/blob_stream/blob_stream_writer.c b/ldms/src/sampler/blob_stream/blob_stream_writer.c index 107e1ab30..c5ee71faa 100644 --- a/ldms/src/sampler/blob_stream/blob_stream_writer.c +++ b/ldms/src/sampler/blob_stream/blob_stream_writer.c @@ -1,8 +1,8 @@ /** - * Copyright (c) 2021 National Technology & Engineering Solutions + * Copyright (c) 2021-24 National Technology & Engineering Solutions * of Sandia, LLC (NTESS). Under the terms of Contract DE-NA0003525 with * NTESS, the U.S. Government retains certain rights in this software. - * Copyright (c) 2021 Open Grid Computing, Inc. All rights reserved. + * Copyright (c) 2021-24 Open Grid Computing, Inc. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -47,6 +47,8 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#define _GNU_SOURCE + #include #include #include @@ -67,9 +69,49 @@ #include "ldms.h" #include "ldmsd.h" #include "ldmsd_stream.h" +#include "ldmsd_plugattr.h" #define PNAME "blob_stream_writer" +struct plugattr *pa = NULL; /* plugin attributes from config */ +#define KEY_PLUG_ATTR 1, "container" +static int rollover; +static int rollagain; +/** rollempty determines if timed rollovers are performed even + * when no data has been written (producing empty files). + */ +static bool rollempty = true; +/** rolltype determines how to interpret rollover values > 0. */ +static int rolltype = -1; +/** ROLLTYPES documents rolltype and is used in help output. Also used for buffering */ +#define ROLLTYPES \ +" 1: wake approximately every rollover seconds and roll.\n" \ +" 2: wake daily at rollover seconds after midnight (>=0) and roll.\n" \ +" 3: roll after approximately rollover records are written.\n" \ +" 4: roll after approximately rollover bytes are written.\n" \ +" 5: wake daily at rollover seconds after midnight and every rollagain seconds thereafter.\n" + +#define MAXROLLTYPE 5 +#define MINROLLTYPE 1 +/** default -- do not roll */ +#define DEFAULT_ROLLTYPE -1 +/** minimum rollover for type 1; + rolltype==1 and rollover < MIN_ROLL_1 -> rollover = MIN_ROLL_1 + also used for minimum sleep time for type 2; + rolltype==2 and rollover results in sleep < MIN_ROLL_SLEEPTIME -> skip this roll and do it the next day */ +#define MIN_ROLL_1 10 +/** minimum rollover for type 3; + rolltype==3 and rollover < MIN_ROLL_RECORDS -> rollover = MIN_ROLL_RECORDS */ +#define MIN_ROLL_RECORDS 3 +/** minimum rollover for type 4; + rolltype==4 and rollover < MIN_ROLL_BYTES -> rollover = MIN_ROLL_BYTES */ +#define MIN_ROLL_BYTES 1024 +/** Interval to check for passing the record or byte count limits. */ +#define ROLL_LIMIT_INTERVAL 60 + +static pthread_t rothread; +static int rothread_used = 0; + static ldmsd_msg_log_f msglog; static pthread_mutex_t cfg_lock; static int closing; @@ -99,6 +141,9 @@ typedef struct stream_data { FILE* timingfile; FILE* typefile; long offset; + int64_t store_count; + int64_t byte_count; + LIST_ENTRY(stream_data) entry; } *stream_data_t; @@ -112,6 +157,17 @@ static int timing; static int types; static int spool; +void dump_config() { + msglog(LDMSD_LDEBUG, PNAME ": debug %d\n", debug); + msglog(LDMSD_LDEBUG, PNAME ": timing %d\n", timing); + msglog(LDMSD_LDEBUG, PNAME ": types %d\n", types); + msglog(LDMSD_LDEBUG, PNAME ": spool %d\n", spool); + msglog(LDMSD_LDEBUG, PNAME ": rollover %d\n", rollover); + msglog(LDMSD_LDEBUG, PNAME ": rollagain %d\n", rollagain); + msglog(LDMSD_LDEBUG, PNAME ": rolltype %d\n", rolltype); + msglog(LDMSD_LDEBUG, PNAME ": rollempty %d\n", rollempty); +} + char blob_stream_char_to_type(char c) { switch (c) { @@ -148,6 +204,79 @@ char blob_stream_type_to_char(ldmsd_stream_type_t stream_type) } } +#define SD_FINAL 1 +#define SD_REUSE 0 +/* close f, free fname (if final), and rename fname if spool=1 */ +static void fclose_and_spool(FILE* *f, char* *fname, int final) +{ + fclose(*f); + *f = NULL; + if (spool) { + int mode = 0750; + size_t n = strlen(*fname) + 20; + + char *dbuf = alloca(n); + strcpy(dbuf, *fname); + char *dirn = dirname(dbuf); + + char *bbuf = alloca(n); + strcpy(bbuf, *fname); + char *base = basename(bbuf); + + char *rbuf = alloca(n); + sprintf(rbuf, "%s/spool", dirn); + int err = f_mkdir_p(rbuf, mode); + if (err) { + switch (err) { + case EEXIST: + break; + default: + msglog(LDMSD_LERROR, + "create_outdir: failed to create" + " directory for %s: %s\n", + rbuf, STRERROR(err)); + goto out; + } + } + sprintf(rbuf, "%s/spool/%s", dirn, base); + err = rename(*fname, rbuf); + if (err) { + msglog(LDMSD_LERROR, PNAME + ": rename_output: failed rename(%s, %s):" + " %s\n", *fname, rbuf, STRERROR(err)); + } else { + msglog(LDMSD_LDEBUG, PNAME + ": renamed: %s to %s\n", + *fname, rbuf); + } + } +out: + if (final == SD_FINAL) { + free(*fname); + *fname = NULL; + } +} + + +static void reset_paths(stream_data_t sd) +{ + if (!sd) + return; + if (sd->timingfile) { + fclose_and_spool(&sd->timingfile, &sd->timingfile_name, SD_REUSE); + } + if (sd->typefile) { + fclose_and_spool(&sd->typefile, &sd->typefile_name, SD_REUSE); + } + if (sd->streamfile) { + fclose_and_spool(&sd->streamfile, &sd->streamfile_name, SD_REUSE); + } + if (sd->offsetfile) { + fclose_and_spool(&sd->offsetfile, &sd->offsetfile_name, SD_REUSE); + } + sd->ws = WS_NEW; +} + /* open, if not open or already closed, and write to stream files. */ static int stream_cb(ldmsd_stream_client_t c, void *ctxt, ldmsd_stream_type_t stream_type, @@ -162,6 +291,9 @@ static int stream_cb(ldmsd_stream_client_t c, void *ctxt, } pthread_mutex_lock(&sd->write_lock); + if (sd->ws == WS_REOPEN) { + reset_paths(sd); + } if (sd->ws == WS_NEW) { stream_data_open(sd); } @@ -202,7 +334,9 @@ static int stream_cb(ldmsd_stream_client_t c, void *ctxt, } } rc = fwrite(msg, 1, msg_len, sd->streamfile); + sd->store_count++; sd->offset += rc; + sd->byte_count += rc; if (rc != msg_len) { int ferr = ferror(sd->streamfile); msglog(LDMSD_LERROR, PNAME ": short write starting at %s:%ld: %s\n", @@ -239,10 +373,10 @@ static void stream_data_open(stream_data_t sd) return; } time_t t = time(NULL); - size_t end = strlen(sd->offsetfile_name); - sprintf(sd->offsetfile_name + end, "%ld", (long)t); - end = strlen(sd->streamfile_name); - sprintf(sd->streamfile_name + end, "%ld", (long)t); + char *end = strrchr(sd->offsetfile_name, '.'); + sprintf(end, ".%ld", (long)t); + end = strrchr(sd->streamfile_name, '.'); + sprintf(end, ".%ld", (long)t); sd->offsetfile = fopen_perm(sd->offsetfile_name, "w", 0640); sd->streamfile = fopen_perm(sd->streamfile_name, "w", 0640); @@ -253,8 +387,8 @@ static void stream_data_open(stream_data_t sd) return; } if (sd->timingfile_name) { - end = strlen(sd->timingfile_name); - sprintf(sd->timingfile_name + end, "%ld", (long)t); + end = strrchr(sd->timingfile_name, '.'); + sprintf(end, ".%ld", (long)t); sd->timingfile = fopen_perm(sd->timingfile_name, "w", 0640); if (!sd->timingfile) { msglog(LDMSD_LERROR, PNAME ": Error '%s' opening the file %s.\n", @@ -264,8 +398,8 @@ static void stream_data_open(stream_data_t sd) } } if (sd->typefile_name) { - end = strlen(sd->typefile_name); - sprintf(sd->typefile_name + end, "%ld", (long)t); + end = strrchr(sd->typefile_name, '.'); + sprintf(end, ".%ld", (long)t); sd->typefile = fopen_perm(sd->typefile_name, "w", 0640); if (!sd->typefile) { msglog(LDMSD_LERROR, PNAME ": Error '%s' opening the file %s.\n", @@ -314,79 +448,12 @@ static void stream_data_open(stream_data_t sd) } } -/* close f, free fname, and rename fname if spool=1 */ -static void fclose_and_spool(FILE* *f, char* *fname) -{ - fclose(*f); - *f = NULL; - if (spool) { - int mode = 0750; - size_t n = strlen(*fname) + 20; - - char *dbuf = alloca(n); - strcpy(dbuf, *fname); - char *dirn = dirname(dbuf); - - char *bbuf = alloca(n); - strcpy(bbuf, *fname); - char *base = basename(bbuf); - - char *rbuf = alloca(n); - sprintf(rbuf, "%s/spool", dirn); - int err = f_mkdir_p(rbuf, mode); - if (err) { - switch (err) { - case EEXIST: - break; - default: - msglog(LDMSD_LERROR, - "create_outdir: failed to create" - " directory for %s: %s\n", - rbuf, STRERROR(err)); - goto out; - } - } - sprintf(rbuf, "%s/spool/%s", dirn, base); - err = rename(*fname, rbuf); - if (err) { - msglog(LDMSD_LERROR, PNAME - ": rename_output: failed rename(%s, %s):" - " %s\n", *fname, rbuf, STRERROR(err)); - } else { - msglog(LDMSD_LDEBUG, PNAME - ": renamed: %s to %s\n", - *fname, rbuf); - } - } -out: - free(*fname); - *fname = NULL; -} - - -static void reset_paths(stream_data_t sd) -{ - if (!sd) - return; - if (sd->timingfile) { - fclose_and_spool(&sd->timingfile, &sd->timingfile_name); - } - if (sd->typefile) { - fclose_and_spool(&sd->typefile, &sd->typefile_name); - } - if (sd->streamfile) { - fclose_and_spool(&sd->streamfile, &sd->streamfile_name); - } - if (sd->offsetfile) { - fclose_and_spool(&sd->offsetfile, &sd->offsetfile_name); - } - sd->ws = WS_NEW; -} /* base directory path name space for: * path/container/stream.OFFSET.time()_at_open * time substring must be reset at open. * write_lock must be held when this is called. + * All values set must end in '.' so time value can be reset at that location. */ static int set_paths(stream_data_t sd) { @@ -406,6 +473,7 @@ static int set_paths(stream_data_t sd) } if (timing) { + free(sd->timingfile_name); sd->timingfile_name = malloc(pathlen); if (!sd->timingfile_name) { sd->ws = WS_ERR; @@ -416,6 +484,7 @@ static int set_paths(stream_data_t sd) } if (types) { + free(sd->typefile_name); sd->typefile_name = malloc(pathlen); if (!sd->typefile_name) { sd->ws = WS_ERR; @@ -425,6 +494,7 @@ static int set_paths(stream_data_t sd) container, sd->stream_name); } + free(sd->streamfile_name); sd->streamfile_name = malloc(pathlen); if (!sd->streamfile_name) { sd->ws = WS_ERR; @@ -432,7 +502,7 @@ static int set_paths(stream_data_t sd) } snprintf(sd->streamfile_name, pathlen, "%s/%s/%s.DAT.", root_path, container, sd->stream_name); - + free(sd->offsetfile_name); sd->offsetfile_name = malloc(pathlen); if (!sd->offsetfile_name) { sd->ws = WS_ERR; @@ -475,6 +545,8 @@ static int add_stream(const char *stream) return 0; } +static void* rolloverThreadInit(void* m); + /** * \brief Configuration */ @@ -482,6 +554,13 @@ static int config(struct ldmsd_plugin *self, struct attr_value_list *kwl, struct { char* s; int rc; + int rollmethod = DEFAULT_ROLLTYPE; + static const char *attributes[] = { + "container", "spool", "stream", "debug", "timing", "types", "path", + "rollagain", "rollover", "rolltype", "rollempty", + NULL + }; + static const char *keywords[] = { NULL }; if (!self || !avl) return EINVAL; @@ -491,6 +570,100 @@ static int config(struct ldmsd_plugin *self, struct attr_value_list *kwl, struct return EINVAL; } + pa = ldmsd_plugattr_create(NULL, PNAME, avl, kwl, + NULL, NULL, NULL, KEY_PLUG_ATTR); + + rc = ldmsd_plugattr_config_check(attributes, keywords, avl, kwl, NULL, PNAME); + if (rc != 0) { + int warnon = (ldmsd_loglevel_get() > LDMSD_LWARNING); + msglog(LDMSD_LERROR, PNAME " config arguments unexpected.%s\n", + (warnon ? " Enable log level WARNING for details." : "")); + return EINVAL; + } + + if (rolltype == -1) { + int ragain = 0; + int roll = -1; + int cvt; + cvt = ldmsd_plugattr_s32(pa, "rollagain", NULL, &ragain); + if (!cvt) { + if (ragain < 0) { + msglog(LDMSD_LERROR, PNAME + ": bad rollagain= value %d\n", ragain); + rc = EINVAL; + goto out; + } + } + if (cvt == ENOTSUP) { + msglog(LDMSD_LERROR, PNAME ": improper rollagain= input.\n"); + rc = EINVAL; + goto out; + } + + cvt = ldmsd_plugattr_s32(pa, "rollover", NULL, &roll); + if (!cvt) { + if (roll < 0) { + msglog(LDMSD_LERROR, PNAME + ": Error: bad rollover value %d\n", roll); + rc = EINVAL; + goto out; + } + } + if (cvt == ENOTSUP) { + msglog(LDMSD_LERROR, PNAME ": improper rollover= input.\n"); + rc = EINVAL; + goto out; + } + + cvt = ldmsd_plugattr_s32(pa, "rolltype", NULL, &rollmethod); + if (!cvt) { + if (roll < 0) { + /* rolltype not valid without rollover also */ + msglog(LDMSD_LERROR, PNAME + ": rolltype given without rollover.\n"); + rc = EINVAL; + goto out; + } + if (rollmethod < MINROLLTYPE || rollmethod > MAXROLLTYPE) { + msglog(LDMSD_LERROR, PNAME + ": rolltype out of range.\n"); + rc = EINVAL; + goto out; + } + if (rollmethod == 5 && (roll < 0 || ragain < roll || + ragain < MIN_ROLL_1)) { + msglog(LDMSD_LERROR, PNAME + ": rollagain=%d rollover=%d\n", + roll, ragain); + msglog(LDMSD_LERROR, PNAME ": rolltype=5 needs" + " rollagain > max(rollover,10)\n"); + rc = EINVAL; + goto out; + } + } + if (cvt == ENOTSUP) { + msglog(LDMSD_LERROR, PNAME + ": improper rolltype= input.\n"); + rc = EINVAL; + goto out; + } + cvt = ldmsd_plugattr_bool(pa, "rollempty", NULL, &rollempty); + if (cvt == -1) { + msglog(LDMSD_LERROR, PNAME + ": expected boole for rollempty= input.\n"); + rc = EINVAL; + goto out; + } + if (rollmethod >= MINROLLTYPE && !rothread_used) { + rolltype = rollmethod; + rollover = roll; + rollagain = ragain; + pthread_create(&rothread, NULL, rolloverThreadInit, NULL); + rothread_used = 1; + } + } + + spool = 0; s = av_value(avl, "spool"); if (s) { @@ -574,9 +747,11 @@ static int config(struct ldmsd_plugin *self, struct attr_value_list *kwl, struct pthread_mutex_unlock(&sd->write_lock); } + out: pthread_mutex_unlock(&cfg_lock); + dump_config(); return rc; } @@ -586,16 +761,16 @@ static void stream_data_close( stream_data_t sd ) return; pthread_mutex_lock(&sd->write_lock); if (sd->timingfile) { - fclose_and_spool(&sd->timingfile, &sd->timingfile_name); + fclose_and_spool(&sd->timingfile, &sd->timingfile_name, SD_FINAL); } if (sd->typefile) { - fclose_and_spool(&sd->typefile, &sd->typefile_name); + fclose_and_spool(&sd->typefile, &sd->typefile_name, SD_FINAL); } if (sd->streamfile) { - fclose_and_spool(&sd->streamfile, &sd->streamfile_name); + fclose_and_spool(&sd->streamfile, &sd->streamfile_name, SD_FINAL); } if (sd->offsetfile) { - fclose_and_spool(&sd->offsetfile, &sd->offsetfile_name); + fclose_and_spool(&sd->offsetfile, &sd->offsetfile_name, SD_FINAL); } free(sd->stream_name); sd->stream_name = NULL; @@ -627,10 +802,140 @@ static void term(struct ldmsd_plugin *self) return; } +static void roll_stream_files(stream_data_t sd) +{ + if (!sd) + return; + switch (rolltype) { + case 1: + case 2: + case 5: + if (!sd->store_count && !rollempty) + /* skip rollover of empty files */ + return; + break; + case 3: + if (sd->store_count < rollover) { + return; + } else { + sd->store_count = 0; + } + break; + case 4: + if (sd->byte_count < rollover) { + return; + } else { + sd->byte_count = 0; + } + break; + default: + msglog(LDMSD_LDEBUG, PNAME ": Error: unexpected rolltype in store(%d)\n", + rolltype); + break; + } + + sd->store_count = 0; + int rc = add_stream(sd->stream_name); /* forces reset */ + if (rc) { + msglog(LDMSD_LERROR, PNAME ": failed to readd" + " stream %s.\n", sd->stream_name); + } + +} + +/* Time-based rolltypes will always roll the files when this +function is called. +Volume-based rolltypes must check and shortcircuit within this +function. +*/ +static int handleRollover(void *cps){ + + pthread_mutex_lock(&cfg_lock); + + stream_data_t sd = NULL; + LIST_FOREACH(sd, &data_list, entry) { + roll_stream_files(sd); + } + + pthread_mutex_unlock(&cfg_lock); + + return 0; + +} + +static void* rolloverThreadInit(void* m){ + + while(1){ + int tsleep; + switch (rolltype) { + case 1: + tsleep = (rollover < MIN_ROLL_1) ? + MIN_ROLL_1 : rollover; + break; + case 2: { + time_t rawtime; + struct tm info; + + time( &rawtime ); + localtime_r( &rawtime, &info ); + int secSinceMidnight = info.tm_hour*3600 + + info.tm_min*60 + info.tm_sec; + tsleep = 86400 - secSinceMidnight + rollover; + if (tsleep < MIN_ROLL_1){ + /* if we just did a roll then skip this one */ + tsleep+=86400; + } + } + break; + case 3: + if (rollover < MIN_ROLL_RECORDS) + rollover = MIN_ROLL_RECORDS; + tsleep = ROLL_LIMIT_INTERVAL; + break; + case 4: + if (rollover < MIN_ROLL_BYTES) + rollover = MIN_ROLL_BYTES; + tsleep = ROLL_LIMIT_INTERVAL; + break; + case 5: { + time_t rawtime; + struct tm info; + + time( &rawtime ); + localtime_r( &rawtime, &info ); + int secSinceMidnight = info.tm_hour*3600 + + info.tm_min*60 + info.tm_sec; + + if (secSinceMidnight < rollover) { + tsleep = rollover - secSinceMidnight; + } else { + int y = secSinceMidnight - rollover; + int z = y / rollagain; + tsleep = (z + 1)*rollagain + rollover - secSinceMidnight; + } + if (tsleep < MIN_ROLL_1) { + tsleep += rollagain; + } + } + break; + default: + tsleep = 60; + break; + } + sleep(tsleep); + int oldstate = 0; + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate); + handleRollover(NULL); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + } + + return NULL; +} static const char *usage(struct ldmsd_plugin *self) { return " config name=blob_stream_writer path= container= stream= \n" " timing=1 types=1 debug=1 spool=1\n" + " [rollover= rolltype= rollempty= rollagain=\n" " - Set the root path for the storage of csvs and some default parameters\n" " - path The path to the root of the csv directory\n" " - container The directory under the path\n" @@ -640,6 +945,11 @@ static const char *usage(struct ldmsd_plugin *self) " - types=1 Enabling TYPES output file\n" " - spool=1 Roll output to //spool/\n" " - debug=1 Enabling certain debug statements.\n" + " - rollover Greater than or equal to zero; enables file rollover and sets interval\n" + " - rollempty 0/1; 0 suppresses rollover of empty files, 1 allows (default)\n" + " - rollagain Repeat interval for rolltype == 5.\n" + " - rolltype [1-n] Defines the policy used to schedule rollover events.\n" + ROLLTYPES ; } @@ -681,5 +991,10 @@ static void blob_stream_writer_init() static void __attribute__ ((destructor)) blob_stream_writer_fini(void); static void blob_stream_writer_fini() { + if (rothread_used) { + void * dontcare = NULL; + pthread_cancel(rothread); + pthread_join(rothread, &dontcare); + } pthread_mutex_destroy(&cfg_lock); }