Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support avro schema evolution #1305

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions ldms/src/ldmsd/ldmsd_decomp.c
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,41 @@ static const char *col_type_str(enum ldms_value_type type)
return type_str[type];
}

static const char *col_default(enum ldms_value_type type)
{
static char *default_str[] = {
[LDMS_V_CHAR] = "\" \"",
[LDMS_V_U8] = "0",
[LDMS_V_S8] = "0",
[LDMS_V_U16] = "0",
[LDMS_V_S16] = "0",
[LDMS_V_U32] = "0",
[LDMS_V_S32] = "0",
[LDMS_V_U64] = "0",
[LDMS_V_S64] = "0",
[LDMS_V_F32] = "0.0",
[LDMS_V_D64] = "0.0",
[LDMS_V_CHAR_ARRAY] = "\"\"",
[LDMS_V_U8_ARRAY] = "[]",
[LDMS_V_S8_ARRAY] = "[]",
[LDMS_V_U16_ARRAY] = "[]",
[LDMS_V_S16_ARRAY] = "[]",
[LDMS_V_U32_ARRAY] = "[]",
[LDMS_V_S32_ARRAY] = "[]",
[LDMS_V_U64_ARRAY] = "[]",
[LDMS_V_S64_ARRAY] = "[]",
[LDMS_V_F32_ARRAY] = "[]",
[LDMS_V_D64_ARRAY] = "[]",
[LDMS_V_LIST] = "nosup",
[LDMS_V_LIST_ENTRY] = "nosup",
[LDMS_V_RECORD_TYPE] = "nosup",
[LDMS_V_RECORD_INST] = "nosup",
[LDMS_V_RECORD_ARRAY] = "nosup",
[LDMS_V_TIMESTAMP] = "0"
};
return default_str[type];
}

static char get_avro_char(char c)
{
if (isalnum(c))
Expand Down Expand Up @@ -844,6 +879,12 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
}
switch (col->type) {
case LDMS_V_TIMESTAMP:
rc = strbuf_printf(&h,
"{\"name\":\"%s\",\"type\":%s}",
avro_name, col_type_str(col->type));
if (rc)
goto err_0;
break;
case LDMS_V_CHAR:
case LDMS_V_U8:
case LDMS_V_S8:
Expand All @@ -856,8 +897,11 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
case LDMS_V_F32:
case LDMS_V_D64:
case LDMS_V_CHAR_ARRAY:
rc = strbuf_printf(&h, "{\"name\":\"%s\",\"type\":%s}",
avro_name, col_type_str(col->type));
rc = strbuf_printf(&h,
"{\"name\":\"%s\",\"type\":%s,"
"\"default\":%s}",
avro_name, col_type_str(col->type),
col_default(col->type));
if (rc)
goto err_0;
break;
Expand All @@ -873,8 +917,10 @@ int ldmsd_row_to_json_avro_schema(ldmsd_row_t row, char **str, size_t *len)
case LDMS_V_D64_ARRAY:
rc = strbuf_printf(&h,
"{\"name\":\"%s\","
"\"type\":{ \"type\" : \"array\", \"items\": %s }}",
avro_name, col_type_str(col->type));
"\"type\":{ \"type\" : \"array\", \"items\": %s },"
"\"default\":%s}",
avro_name, col_type_str(col->type),
col_default(col->type));
if (rc)
goto err_0;
break;
Expand Down
63 changes: 44 additions & 19 deletions ldms/src/store/avro_kafka/store_avro_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,48 +107,73 @@ serdes_schema_find(aks_handle_t sh, char *schema_name,
ldms_schema_t lschema, ldmsd_row_t row)
{
struct rbn *rbn;
serdes_schema_t *sschema = NULL;
serdes_schema_t *previous_schema = NULL;
serdes_schema_t *current_schema = NULL;
struct schema_entry *entry;
char *json_buf = NULL;
size_t json_len;
char errstr[512];
int rc;

pthread_mutex_lock(&schema_rbt_lock);
/* Check if the schema is already cached */
/* Check if the schema is already cached in this plugin */
rbn = rbt_find(&schema_tree, schema_name);
if (rbn) {
entry = container_of(rbn, struct schema_entry, rbn);
sschema = entry->serdes_schema;
current_schema = entry->serdes_schema;
goto out;
}
entry = calloc(1, sizeof(*entry));
if (!entry)
goto out;

/* Check if the schema is already present in the registry */
sschema = serdes_schema_get(sh->serdes, schema_name, -1,
errstr, sizeof(errstr));
if (sschema)
/* Yes, cache it */
goto cache;
/* Look up the schema by name in the registry.
Name alone does not tell us if the schema matches this row, so we
will still need to continue on and try pushing an updated schema.
The registry should handle duplicates or schema evolution testing.
*/
previous_schema = serdes_schema_get(sh->serdes,
schema_name, -1,
errstr, sizeof(errstr));

/* Create a new schema from the row specification and LDMS schema */
/* Generate a new schema from the row specification and LDMS schema */
rc = ldmsd_row_to_json_avro_schema(row, &json_buf, &json_len);
if (rc)
goto out;
sschema =
serdes_schema_add(sh->serdes,
schema_name, -1,
json_buf, json_len,
errstr, sizeof(errstr));
if (!sschema) {

/* Push the generated schema to the registry */
current_schema = serdes_schema_add(sh->serdes,
schema_name, -1,
json_buf, json_len,
errstr, sizeof(errstr));
if (!current_schema) {
LOG_ERROR("%s\n", json_buf);
LOG_ERROR("Error '%s' creating schema '%s'\n", errstr, schema_name);
goto out;
}
cache:
entry->serdes_schema = sschema;

/* Log information about which schema was used */
if (previous_schema != NULL) {
if (serdes_schema_id(current_schema)
== serdes_schema_id(previous_schema)) {
LOG_INFO("Using existing id %d for schema name '%s'\n",
serdes_schema_id(current_schema),
schema_name);
} else {
LOG_WARN("Using replacement id %d for schema name '%s' (previous id %d)\n",
serdes_schema_id(current_schema),
schema_name,
serdes_schema_id(previous_schema));
serdes_schema_destroy(previous_schema);
}
} else {
LOG_INFO("Using brand new id %d for schema name '%s'\n",
serdes_schema_id(current_schema),
schema_name);
}

/* Cache the schema in this plugin */
entry->serdes_schema = current_schema;
entry->ldms_schema = lschema;
entry->schema_name = strdup(schema_name);
rbn_init(&entry->rbn, entry->schema_name);
Expand All @@ -157,7 +182,7 @@ serdes_schema_find(aks_handle_t sh, char *schema_name,
pthread_mutex_unlock(&schema_rbt_lock);
if (json_buf)
free(json_buf);
return sschema;
return current_schema;
}

pthread_mutex_t sk_lock = PTHREAD_MUTEX_INITIALIZER;
Expand Down