Skip to content

Commit

Permalink
Add support for Avro arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
tom95858 committed Mar 30, 2023
1 parent 7f51672 commit 21baf08
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 55 deletions.
64 changes: 20 additions & 44 deletions ldms/src/ldmsd/ldmsd_decomp.c
Original file line number Diff line number Diff line change
Expand Up @@ -752,33 +752,29 @@ static const char *col_type_str(enum ldms_value_type type)
[LDMS_V_S8] = "int",
[LDMS_V_U16] = "int",
[LDMS_V_S16] = "int",
[LDMS_V_U32] = "int",
[LDMS_V_U32] = "long",
[LDMS_V_S32] = "int",
[LDMS_V_U64] = "long",
[LDMS_V_S64] = "long",
[LDMS_V_F32] = "float",
[LDMS_V_D64] = "double",
[LDMS_V_CHAR_ARRAY] = "string",
[LDMS_V_U8_ARRAY] = "string",
[LDMS_V_S8_ARRAY] = "string",
[LDMS_V_U16_ARRAY] = "array",
[LDMS_V_S16_ARRAY] = "array",
[LDMS_V_U32_ARRAY] = "array",
[LDMS_V_S32_ARRAY] = "array",
[LDMS_V_U64_ARRAY] = "array",
[LDMS_V_S64_ARRAY] = "array",
[LDMS_V_F32_ARRAY] = "array",
[LDMS_V_D64_ARRAY] = "array",
[LDMS_V_LIST] = "array",
[LDMS_V_LIST_ENTRY] = "null",
[LDMS_V_RECORD_TYPE] = "map",
[LDMS_V_RECORD_INST] = "null",
[LDMS_V_RECORD_ARRAY] = "array",
#if 0
[LDMS_V_TIMESTAMP] = "record"
#else
[LDMS_V_TIMESTAMP] = "int"
#endif
[LDMS_V_U8_ARRAY] = "int",
[LDMS_V_S8_ARRAY] = "int",
[LDMS_V_U16_ARRAY] = "int",
[LDMS_V_S16_ARRAY] = "int",
[LDMS_V_U32_ARRAY] = "long",
[LDMS_V_S32_ARRAY] = "int",
[LDMS_V_U64_ARRAY] = "long",
[LDMS_V_S64_ARRAY] = "long",
[LDMS_V_F32_ARRAY] = "float",
[LDMS_V_D64_ARRAY] = "double",
[LDMS_V_LIST] = "nosup",
[LDMS_V_LIST_ENTRY] = "nosup",
[LDMS_V_RECORD_TYPE] = "nosup",
[LDMS_V_RECORD_INST] = "nosup",
[LDMS_V_RECORD_ARRAY] = "nosup",
[LDMS_V_TIMESTAMP] = "long"
};
return type_str[type];
}
Expand Down Expand Up @@ -807,28 +803,13 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
}
switch (col->type) {
case LDMS_V_TIMESTAMP:
#if 0
rc = strbuf_printf(&h,
"{\"name\":\"%s\",\"type\":{"
"\"type\":\"record\","
"\"name\":\"timestamp\","
"\"doc\":\"An LDMS timestamp encoded as { seconds, micro-seconds }\","
"\"fields\":["
"{\"name\":\"sec\",\"type\":\"int\"},"
"{\"name\":\"usec\",\"type\":\"int\"}"
"]}}",
col->name);
if (rc)
goto err_0;
#else
rc = strbuf_printf(&h,
"{\"name\":\"%s\",\"type\":\"long\","
"\"logicalType\":\"timestamp-millis\""
"}",
col->name);
if (rc)
goto err_0;
#endif
break;
case LDMS_V_CHAR:
case LDMS_V_U8:
Expand All @@ -841,19 +822,14 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
case LDMS_V_S64:
case LDMS_V_F32:
case LDMS_V_D64:
case LDMS_V_CHAR_ARRAY:
rc = strbuf_printf(&h, "{\"name\":\"%s\",\"type\":\"%s\"}",
col->name, col_type_str(col->type));
if (rc)
goto err_0;
break;
case LDMS_V_CHAR_ARRAY:
case LDMS_V_U8_ARRAY:
case LDMS_V_S8_ARRAY:
rc = strbuf_printf(&h, "{\"name\":\"%s\",\"type\":\"%s\"}",
col->name, col_type_str(col->type));
if (rc)
goto err_0;
break;
case LDMS_V_U16_ARRAY:
case LDMS_V_S16_ARRAY:
case LDMS_V_U32_ARRAY:
Expand All @@ -864,8 +840,7 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
case LDMS_V_D64_ARRAY:
rc = strbuf_printf(&h,
"{\"name\":\"%s\","
"\"type\":\"array\","
"\"items\":\"%s\",\"default\":[]}",
"\"type\":{ \"type\" : \"array\", \"items\": \"%s\" }}",
col->name, col_type_str(col->type));
if (rc)
goto err_0;
Expand All @@ -876,6 +851,7 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
case LDMS_V_RECORD_INST:
case LDMS_V_RECORD_ARRAY:
default:
rc = EINVAL;
goto err_0;
}
}
Expand Down
3 changes: 3 additions & 0 deletions ldms/src/sampler/examples/test_sampler/test_sampler.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include <time.h>
#include <pthread.h>
#include <assert.h>
#include <ctype.h>
#include "ldms.h"
#include "ovis_json/ovis_json.h"
#include "ldmsd.h"
Expand Down Expand Up @@ -1122,6 +1123,8 @@ static int config_add_set(struct attr_value_list *avl)
union ldms_value v;
char *endptr;
if (compid) {
/* Skip non isdigit prefix */
while (*compid != '\0' && !isdigit(*compid)) compid++;
v.v_u64 = strtoull(compid, &endptr, 0);
if (*endptr != '\0') {
msglog(LDMSD_LERROR, "test_sampler: invalid "
Expand Down
102 changes: 99 additions & 3 deletions ldms/src/store/avro_kafka/Plugin_avro_kafka_store.man
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ The set parameter's \fIschema\fR name.
.IP "\fB%I \fR" 1c
The instance name of the set, e.g. "orion-01/meminfo".
.IP "\fB%P \fR" 1c
The set parameter's \fIproducer\fR name.
The set parameter's \fIproducer\fR name, e.g. "orion-01."
.IP "\fB%u \fR" 1c
The user-name string for the owner of the set.
If the user-name is not known on the system, the user-id is used.
Expand Down Expand Up @@ -101,11 +101,107 @@ have no affect on how data is stored. If the \fIcontainer\fR parameter is set to
value other than an empty string, it will override the bootstrap.servers Kafka configuration
parameter in the kafka_conf file if present.
.PP
.SH "BUGS "
.SH "JSON Mode"
.PP
JSON mode encodes messages as self describing text objects. Each message is a JSON dictionary
based on the following template:
RS 4
.nf
{
"<attr-name-1>" : <attr-value-1>,
"<attr-name-2>" : <attr-value-2>,
...
}
.fi
.RE
.PP
Each row in the decomposition is encoded as shown. The \fBattr-value\fR types are mapped to either
quoted strings, floating-point, or integers as defined by the source metric type in the LDMS
metric set. The mapping is as follows:
.TS
tab(@) allbox;
l l l .
\fBMetric Type\fR@\fBFormat Specifier\fR@\fBDescription\fR
LDMS_V_TIMESTAMP@%u.%06u@Floating point number in seconds
LDMS_V_U8@%hhu@Unsigned integer
LDMS_V_S8@%hhd@Signed integer
LDMS_V_U16@%hu@Unsigned integer
LDMS_V_S16@%hd@Signed integer
LDMS_V_U32@%u@Unsigned integer
LDMS_V_S32@%d@Signed integer
LDMS_V_U64@%lu@Unsigned integer
LDMS_V_S64@%ld@Signed integer
LDMS_V_FLOAT@%.9g@Floating point
LDMS_V_DOUBLE@%.17g@Floating point
LDMS_V_STRING@"%s"@Double quoted string
LDMS_V_ARRAY_xxx@[ v0, v1, ... ]@Comma separated value list surrounding by '[]'
.TE
.SS "Example JSON Object"
{"timestamp":1679682808.001751,"component_id":8,"dev_name":"veth1709f8b","rx_packets":0,"rx_err_packets":0,"rx_drop_packets":0,"tx_packets":858,"tx_err_packets":0,"tx_drop_packets":0}
.fi
.RE
.PP
.SH "Avro Mode"
.PP
In Avro mode, LDMS metric set values are first converted to Avro values. The table below
describes how each LDMS metric set value is represented by an Avro value.
.PP
Each row in the decomposition is encoded as a sequence of Avro values. The target
Avro type is governed by the Avro schema. The mapping is as follows:
.TS
tab(@) allbox;
l l l .
\fBMetric Type\fR@\fBAvro Type\fR@\fBDescription\fR
LDMS_V_TIMESTAMP@AVRO_INT32@Seconds portion of timestamp value is stored in the Avro integer
LDMS_V_TIMESTAMP@AVRO_INT64@tv_secs + 1000 * tv_usecs is stored in Avro long integer
LDMS_V_TIMESTAMP@AVRO_RECORD@Seconds portion is stored in seconds portion of record, usecs is stored in the micro-seconds portion of the record
LDMS_V_U8@AVRO_INT32@avro_value_set_int
LDMS_V_S8@AVRO_INT32@avro_value_set_int
LDMS_V_U16@AVRO_INT32@avro_value_set_int
LDMS_V_S16@AVRO_INT32@avro_value_set_int
LDMS_V_U32@AVRO_INT64@avro_value_set_long
LDMS_V_S32@AVRO_INT32@avro_value_set_int
LDMS_V_U64@AVRO_INT64@avro_value_set_long
LDMS_V_S64@AVRO_INT64@avro_value_set_long
LDMS_V_FLOAT@AVRO_FLOAT@avro_value_set_float
LDMS_V_DOUBLE@AVRO_DOUBLE@avro_value_set_double
LDMS_V_CHAR_ARRAY@AVRO_STRING@avro_value_set_string
LDMS_V_ARRAY_xxx@AVRO_ARRAY@Comma separated value list or primitive type surrounded by '[]'
.TE
.SS "Schema Creation"
.PP
Each row in the LDMS metric set presented for storage is used to generate an
Avro schema definition. The table above shows the Avro types that are used
to store each LDMS metric type. Note that currently, all LDMS_V_TIMESTAMP values in
a metric set are stored as the Avro logical type "timestamp-millis" and encoded
as an Avro long.
.PP
Unsigned types are currently encoded as signed types. The case that could cause issues
is LDMS_V_U64 which when encoded as AVRO_LONG will result in a negative number. One way
to deal with this is to encode these as AVRO_BYTES[8] and let the consumer perform
the appropriate cast. This, however, seems identical to simply encoding it as a signed
long and allow the consumer to cast the signed long to an unsigned long.
.SS "Schema Registration"
.PP
The Avro schema are generated from the row instances presented to the
commit() storage strategy routine. The \fBschema_name\fR that is contained in the
row instance is used to search for a serdes schema. This name is first searched for
in a local RBT and if not found, the Avro Schema Registry is consulted. If the
schema is not present in the registry, a new Avro schema is constructed per the
table above, registered with the schema registry and stored in the local cache.
.PP
.SS "Encoding"
.PP
No known bugs\&.
After the schema is located, constructed, and or registered for the row, the schema
in conjunction with libserdes is used to binary encode the Avro values for
each column in the row. Once encoded, the message is submitted to Kafka.
.SS "Client Side Decoding"
.PP
Consumers of topics encoded with libserdes will need to perform the above procedure
in reverse. The message received via Kafka will have the schema-id present
in the message header. The client will use this schema-id to query the Schema
registry for a schema. Once found, the client will construct a serdes from the
schema definition and use this serdes to decode the message into Avro values.
.SH "EXAMPLES "
.PP
.PP
Expand Down
94 changes: 86 additions & 8 deletions ldms/src/store/avro_kafka/store_avro_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,11 @@ static aks_handle_t __handle_new(ldmsd_strgp_t strgp)
static int set_avro_value_from_col(avro_value_t *col_value,
ldmsd_col_t col)
{
int rc;
int rc, idx;
struct ldms_timestamp ts;
avro_value_t val;
avro_value_t val, item_val;
long ms;
size_t item_idx;
enum avro_type_t t = avro_value_get_type(col_value);
switch (col->type) {
case LDMS_V_TIMESTAMP:
Expand Down Expand Up @@ -579,30 +580,107 @@ static int set_avro_value_from_col(avro_value_t *col_value,
ldms_mval_get_double(col->mval));
break;
case LDMS_V_CHAR_ARRAY:
case LDMS_V_U8_ARRAY:
case LDMS_V_S8_ARRAY:
rc = avro_value_set_string(col_value,
ldms_mval_array_get_str(col->mval));
break;
case LDMS_V_S8_ARRAY:
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_int(&item_val,
(int)ldms_mval_array_get_s8(col->mval, idx));
}
break;
case LDMS_V_U8_ARRAY:
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_int(&item_val,
(unsigned int)ldms_mval_array_get_u8(col->mval, idx));
}
break;
case LDMS_V_U16_ARRAY:
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_int(&item_val,
(unsigned int)ldms_mval_array_get_u16(col->mval, idx));
}
break;
case LDMS_V_S16_ARRAY:
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_int(&item_val,
(int)ldms_mval_array_get_s16(col->mval, idx));
}
break;
case LDMS_V_U32_ARRAY:
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_long(&item_val,
(long)ldms_mval_array_get_u32(col->mval, idx));
}
break;
case LDMS_V_S32_ARRAY:
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_int(&item_val,
ldms_mval_array_get_s64(col->mval, idx));
}
break;
case LDMS_V_U64_ARRAY:
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_long(&item_val,
(long)ldms_mval_array_get_u64(col->mval, idx));
}
break;
case LDMS_V_S64_ARRAY:
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_long(&item_val,
(long)ldms_mval_array_get_u64(col->mval, idx));
}
break;
case LDMS_V_F32_ARRAY:
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_float(&item_val,
ldms_mval_array_get_float(col->mval, idx));
}
break;
case LDMS_V_D64_ARRAY:
assert(0 == "Not yet");
for (idx = 0; idx < col->array_len; idx++) {
rc = avro_value_append(col_value, &item_val, &item_idx);
if (rc)
break;
rc = avro_value_set_double(&item_val,
ldms_mval_array_get_double(col->mval, idx));
}
break;
case LDMS_V_LIST:
case LDMS_V_LIST_ENTRY:
assert(0 == "Not Yet");
break;
case LDMS_V_RECORD_TYPE:
case LDMS_V_RECORD_INST:
case LDMS_V_RECORD_ARRAY:
default:
assert(0 == "Not yet");
rc = ENOTSUP;
break;
}
return rc;
}
Expand Down

0 comments on commit 21baf08

Please sign in to comment.