diff --git a/ldms/src/ldmsd/ldmsd_decomp.c b/ldms/src/ldmsd/ldmsd_decomp.c index 84106d5de..44924a6d4 100644 --- a/ldms/src/ldmsd/ldmsd_decomp.c +++ b/ldms/src/ldmsd/ldmsd_decomp.c @@ -752,33 +752,29 @@ static const char *col_type_str(enum ldms_value_type type) [LDMS_V_S8] = "int", [LDMS_V_U16] = "int", [LDMS_V_S16] = "int", - [LDMS_V_U32] = "int", + [LDMS_V_U32] = "long", [LDMS_V_S32] = "int", [LDMS_V_U64] = "long", [LDMS_V_S64] = "long", [LDMS_V_F32] = "float", [LDMS_V_D64] = "double", [LDMS_V_CHAR_ARRAY] = "string", - [LDMS_V_U8_ARRAY] = "string", - [LDMS_V_S8_ARRAY] = "string", - [LDMS_V_U16_ARRAY] = "array", - [LDMS_V_S16_ARRAY] = "array", - [LDMS_V_U32_ARRAY] = "array", - [LDMS_V_S32_ARRAY] = "array", - [LDMS_V_U64_ARRAY] = "array", - [LDMS_V_S64_ARRAY] = "array", - [LDMS_V_F32_ARRAY] = "array", - [LDMS_V_D64_ARRAY] = "array", - [LDMS_V_LIST] = "array", - [LDMS_V_LIST_ENTRY] = "null", - [LDMS_V_RECORD_TYPE] = "map", - [LDMS_V_RECORD_INST] = "null", - [LDMS_V_RECORD_ARRAY] = "array", -#if 0 - [LDMS_V_TIMESTAMP] = "record" -#else - [LDMS_V_TIMESTAMP] = "int" -#endif + [LDMS_V_U8_ARRAY] = "int", + [LDMS_V_S8_ARRAY] = "int", + [LDMS_V_U16_ARRAY] = "int", + [LDMS_V_S16_ARRAY] = "int", + [LDMS_V_U32_ARRAY] = "long", + [LDMS_V_S32_ARRAY] = "int", + [LDMS_V_U64_ARRAY] = "long", + [LDMS_V_S64_ARRAY] = "long", + [LDMS_V_F32_ARRAY] = "float", + [LDMS_V_D64_ARRAY] = "double", + [LDMS_V_LIST] = "nosup", + [LDMS_V_LIST_ENTRY] = "nosup", + [LDMS_V_RECORD_TYPE] = "nosup", + [LDMS_V_RECORD_INST] = "nosup", + [LDMS_V_RECORD_ARRAY] = "nosup", + [LDMS_V_TIMESTAMP] = "long" }; return type_str[type]; } @@ -807,20 +803,6 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len) } switch (col->type) { case LDMS_V_TIMESTAMP: -#if 0 - rc = strbuf_printf(&h, - "{\"name\":\"%s\",\"type\":{" - "\"type\":\"record\"," - "\"name\":\"timestamp\"," - "\"doc\":\"An LDMS timestamp encoded as { seconds, micro-seconds }\"," - "\"fields\":[" - "{\"name\":\"sec\",\"type\":\"int\"}," - "{\"name\":\"usec\",\"type\":\"int\"}" - "]}}", - col->name); - if (rc) - goto err_0; -#else rc = strbuf_printf(&h, "{\"name\":\"%s\",\"type\":\"long\"," "\"logicalType\":\"timestamp-millis\"" @@ -828,7 +810,6 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len) col->name); if (rc) goto err_0; -#endif break; case LDMS_V_CHAR: case LDMS_V_U8: @@ -841,19 +822,14 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len) case LDMS_V_S64: case LDMS_V_F32: case LDMS_V_D64: + case LDMS_V_CHAR_ARRAY: rc = strbuf_printf(&h, "{\"name\":\"%s\",\"type\":\"%s\"}", col->name, col_type_str(col->type)); if (rc) goto err_0; break; - case LDMS_V_CHAR_ARRAY: case LDMS_V_U8_ARRAY: case LDMS_V_S8_ARRAY: - rc = strbuf_printf(&h, "{\"name\":\"%s\",\"type\":\"%s\"}", - col->name, col_type_str(col->type)); - if (rc) - goto err_0; - break; case LDMS_V_U16_ARRAY: case LDMS_V_S16_ARRAY: case LDMS_V_U32_ARRAY: @@ -864,8 +840,7 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len) case LDMS_V_D64_ARRAY: rc = strbuf_printf(&h, "{\"name\":\"%s\"," - "\"type\":\"array\"," - "\"items\":\"%s\",\"default\":[]}", + "\"type\":{ \"type\" : \"array\", \"items\": \"%s\" }}", col->name, col_type_str(col->type)); if (rc) goto err_0; @@ -876,6 +851,7 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len) case LDMS_V_RECORD_INST: case LDMS_V_RECORD_ARRAY: default: + rc = EINVAL; goto err_0; } } diff --git a/ldms/src/sampler/examples/test_sampler/test_sampler.c b/ldms/src/sampler/examples/test_sampler/test_sampler.c index 1f289942a..75d700ae2 100644 --- a/ldms/src/sampler/examples/test_sampler/test_sampler.c +++ b/ldms/src/sampler/examples/test_sampler/test_sampler.c @@ -59,6 +59,7 @@ #include #include #include +#include #include "ldms.h" #include "ovis_json/ovis_json.h" #include "ldmsd.h" @@ -1122,6 +1123,8 @@ static int config_add_set(struct attr_value_list *avl) union ldms_value v; char *endptr; if (compid) { + /* Skip non isdigit prefix */ + while (*compid != '\0' && !isdigit(*compid)) compid++; v.v_u64 = strtoull(compid, &endptr, 0); if (*endptr != '\0') { msglog(LDMSD_LERROR, "test_sampler: invalid " diff --git a/ldms/src/store/avro_kafka/Plugin_avro_kafka_store.man b/ldms/src/store/avro_kafka/Plugin_avro_kafka_store.man index 5ed6173ac..f6becf2a8 100644 --- a/ldms/src/store/avro_kafka/Plugin_avro_kafka_store.man +++ b/ldms/src/store/avro_kafka/Plugin_avro_kafka_store.man @@ -70,7 +70,7 @@ The set parameter's \fIschema\fR name. .IP "\fB%I \fR" 1c The instance name of the set, e.g. "orion-01/meminfo". .IP "\fB%P \fR" 1c -The set parameter's \fIproducer\fR name. +The set parameter's \fIproducer\fR name, e.g. "orion-01." .IP "\fB%u \fR" 1c The user-name string for the owner of the set. If the user-name is not known on the system, the user-id is used. @@ -101,11 +101,107 @@ have no affect on how data is stored. If the \fIcontainer\fR parameter is set to value other than an empty string, it will override the bootstrap.servers Kafka configuration parameter in the kafka_conf file if present. .PP -.SH "BUGS " +.SH "JSON Mode" .PP +JSON mode encodes messages as self describing text objects. Each message is a JSON dictionary +based on the following template: +RS 4 +.nf +{ + "" : , + "" : , + ... +} +.fi +.RE +.PP +Each row in the decomposition is encoded as shown. The \fBattr-value\fR types are mapped to either +quoted strings, floating-point, or integers as defined by the source metric type in the LDMS +metric set. The mapping is as follows: +.TS +tab(@) allbox; +l l l . +\fBMetric Type\fR@\fBFormat Specifier\fR@\fBDescription\fR +LDMS_V_TIMESTAMP@%u.%06u@Floating point number in seconds +LDMS_V_U8@%hhu@Unsigned integer +LDMS_V_S8@%hhd@Signed integer +LDMS_V_U16@%hu@Unsigned integer +LDMS_V_S16@%hd@Signed integer +LDMS_V_U32@%u@Unsigned integer +LDMS_V_S32@%d@Signed integer +LDMS_V_U64@%lu@Unsigned integer +LDMS_V_S64@%ld@Signed integer +LDMS_V_FLOAT@%.9g@Floating point +LDMS_V_DOUBLE@%.17g@Floating point +LDMS_V_STRING@"%s"@Double quoted string +LDMS_V_ARRAY_xxx@[ v0, v1, ... ]@Comma separated value list surrounding by '[]' +.TE +.SS "Example JSON Object" +{"timestamp":1679682808.001751,"component_id":8,"dev_name":"veth1709f8b","rx_packets":0,"rx_err_packets":0,"rx_drop_packets":0,"tx_packets":858,"tx_err_packets":0,"tx_drop_packets":0} +.fi +.RE +.PP +.SH "Avro Mode" +.PP +In Avro mode, LDMS metric set values are first converted to Avro values. The table below +describes how each LDMS metric set value is represented by an Avro value. +.PP +Each row in the decomposition is encoded as a sequence of Avro values. The target +Avro type is governed by the Avro schema. The mapping is as follows: +.TS +tab(@) allbox; +l l l . +\fBMetric Type\fR@\fBAvro Type\fR@\fBDescription\fR +LDMS_V_TIMESTAMP@AVRO_INT32@Seconds portion of timestamp value is stored in the Avro integer +LDMS_V_TIMESTAMP@AVRO_INT64@tv_secs + 1000 * tv_usecs is stored in Avro long integer +LDMS_V_TIMESTAMP@AVRO_RECORD@Seconds portion is stored in seconds portion of record, usecs is stored in the micro-seconds portion of the record +LDMS_V_U8@AVRO_INT32@avro_value_set_int +LDMS_V_S8@AVRO_INT32@avro_value_set_int +LDMS_V_U16@AVRO_INT32@avro_value_set_int +LDMS_V_S16@AVRO_INT32@avro_value_set_int +LDMS_V_U32@AVRO_INT64@avro_value_set_long +LDMS_V_S32@AVRO_INT32@avro_value_set_int +LDMS_V_U64@AVRO_INT64@avro_value_set_long +LDMS_V_S64@AVRO_INT64@avro_value_set_long +LDMS_V_FLOAT@AVRO_FLOAT@avro_value_set_float +LDMS_V_DOUBLE@AVRO_DOUBLE@avro_value_set_double +LDMS_V_CHAR_ARRAY@AVRO_STRING@avro_value_set_string +LDMS_V_ARRAY_xxx@AVRO_ARRAY@Comma separated value list or primitive type surrounded by '[]' +.TE +.SS "Schema Creation" +.PP +Each row in the LDMS metric set presented for storage is used to generate an +Avro schema definition. The table above shows the Avro types that are used +to store each LDMS metric type. Note that currently, all LDMS_V_TIMESTAMP values in +a metric set are stored as the Avro logical type "timestamp-millis" and encoded +as an Avro long. +.PP +Unsigned types are currently encoded as signed types. The case that could cause issues +is LDMS_V_U64 which when encoded as AVRO_LONG will result in a negative number. One way +to deal with this is to encode these as AVRO_BYTES[8] and let the consumer perform +the appropriate cast. This, however, seems identical to simply encoding it as a signed +long and allow the consumer to cast the signed long to an unsigned long. +.SS "Schema Registration" +.PP +The Avro schema are generated from the row instances presented to the +commit() storage strategy routine. The \fBschema_name\fR that is contained in the +row instance is used to search for a serdes schema. This name is first searched for +in a local RBT and if not found, the Avro Schema Registry is consulted. If the +schema is not present in the registry, a new Avro schema is constructed per the +table above, registered with the schema registry and stored in the local cache. +.PP +.SS "Encoding" .PP -No known bugs\&. +After the schema is located, constructed, and or registered for the row, the schema +in conjunction with libserdes is used to binary encode the Avro values for +each column in the row. Once encoded, the message is submitted to Kafka. +.SS "Client Side Decoding" .PP +Consumers of topics encoded with libserdes will need to perform the above procedure +in reverse. The message received via Kafka will have the schema-id present +in the message header. The client will use this schema-id to query the Schema +registry for a schema. Once found, the client will construct a serdes from the +schema definition and use this serdes to decode the message into Avro values. .SH "EXAMPLES " .PP .PP diff --git a/ldms/src/store/avro_kafka/store_avro_kafka.c b/ldms/src/store/avro_kafka/store_avro_kafka.c index 7a4a286a7..f4e46fe9e 100644 --- a/ldms/src/store/avro_kafka/store_avro_kafka.c +++ b/ldms/src/store/avro_kafka/store_avro_kafka.c @@ -494,10 +494,11 @@ static aks_handle_t __handle_new(ldmsd_strgp_t strgp) static int set_avro_value_from_col(avro_value_t *col_value, ldmsd_col_t col) { - int rc; + int rc, idx; struct ldms_timestamp ts; - avro_value_t val; + avro_value_t val, item_val; long ms; + size_t item_idx; enum avro_type_t t = avro_value_get_type(col_value); switch (col->type) { case LDMS_V_TIMESTAMP: @@ -579,30 +580,107 @@ static int set_avro_value_from_col(avro_value_t *col_value, ldms_mval_get_double(col->mval)); break; case LDMS_V_CHAR_ARRAY: - case LDMS_V_U8_ARRAY: - case LDMS_V_S8_ARRAY: rc = avro_value_set_string(col_value, ldms_mval_array_get_str(col->mval)); break; + case LDMS_V_S8_ARRAY: + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_int(&item_val, + (int)ldms_mval_array_get_s8(col->mval, idx)); + } + break; + case LDMS_V_U8_ARRAY: + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_int(&item_val, + (unsigned int)ldms_mval_array_get_u8(col->mval, idx)); + } + break; case LDMS_V_U16_ARRAY: + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_int(&item_val, + (unsigned int)ldms_mval_array_get_u16(col->mval, idx)); + } + break; case LDMS_V_S16_ARRAY: + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_int(&item_val, + (int)ldms_mval_array_get_s16(col->mval, idx)); + } + break; case LDMS_V_U32_ARRAY: + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_long(&item_val, + (long)ldms_mval_array_get_u32(col->mval, idx)); + } + break; case LDMS_V_S32_ARRAY: + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_int(&item_val, + ldms_mval_array_get_s64(col->mval, idx)); + } + break; case LDMS_V_U64_ARRAY: + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_long(&item_val, + (long)ldms_mval_array_get_u64(col->mval, idx)); + } + break; case LDMS_V_S64_ARRAY: + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_long(&item_val, + (long)ldms_mval_array_get_u64(col->mval, idx)); + } + break; case LDMS_V_F32_ARRAY: + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_float(&item_val, + ldms_mval_array_get_float(col->mval, idx)); + } + break; case LDMS_V_D64_ARRAY: - assert(0 == "Not yet"); + for (idx = 0; idx < col->array_len; idx++) { + rc = avro_value_append(col_value, &item_val, &item_idx); + if (rc) + break; + rc = avro_value_set_double(&item_val, + ldms_mval_array_get_double(col->mval, idx)); + } break; case LDMS_V_LIST: case LDMS_V_LIST_ENTRY: - assert(0 == "Not Yet"); - break; case LDMS_V_RECORD_TYPE: case LDMS_V_RECORD_INST: case LDMS_V_RECORD_ARRAY: default: - assert(0 == "Not yet"); + rc = ENOTSUP; + break; } return rc; }