diff --git a/ldms/src/store/avro_kafka/store_avro_kafka.c b/ldms/src/store/avro_kafka/store_avro_kafka.c index e78c25d81..1edddb9cf 100644 --- a/ldms/src/store/avro_kafka/store_avro_kafka.c +++ b/ldms/src/store/avro_kafka/store_avro_kafka.c @@ -107,7 +107,8 @@ serdes_schema_find(aks_handle_t sh, char *schema_name, ldms_schema_t lschema, ldmsd_row_t row) { struct rbn *rbn; - serdes_schema_t *sschema = NULL; + serdes_schema_t *previous_schema = NULL; + serdes_schema_t *current_schema = NULL; struct schema_entry *entry; char *json_buf = NULL; size_t json_len; @@ -115,40 +116,64 @@ serdes_schema_find(aks_handle_t sh, char *schema_name, int rc; pthread_mutex_lock(&schema_rbt_lock); - /* Check if the schema is already cached */ + /* Check if the schema is already cached in this plugin */ rbn = rbt_find(&schema_tree, schema_name); if (rbn) { entry = container_of(rbn, struct schema_entry, rbn); - sschema = entry->serdes_schema; + current_schema = entry->serdes_schema; goto out; } entry = calloc(1, sizeof(*entry)); if (!entry) goto out; - /* Check if the schema is already present in the registry */ - sschema = serdes_schema_get(sh->serdes, schema_name, -1, - errstr, sizeof(errstr)); - if (sschema) - /* Yes, cache it */ - goto cache; + /* Look up the schema by name in the registry. + Name alone does not tell us if the schema matches this row, so we + will still need to continue on and try pushing an updated schema. + The registry should handle duplicates or schema evolution testing. + */ + previous_schema = serdes_schema_get(sh->serdes, + schema_name, -1, + errstr, sizeof(errstr)); - /* Create a new schema from the row specification and LDMS schema */ + /* Generate a new schema from the row specification and LDMS schema */ rc = ldmsd_row_to_json_avro_schema(row, &json_buf, &json_len); if (rc) goto out; - sschema = - serdes_schema_add(sh->serdes, - schema_name, -1, - json_buf, json_len, - errstr, sizeof(errstr)); - if (!sschema) { + + /* Push the generated schema to the registry */ + current_schema = serdes_schema_add(sh->serdes, + schema_name, -1, + json_buf, json_len, + errstr, sizeof(errstr)); + if (!current_schema) { LOG_ERROR("%s\n", json_buf); LOG_ERROR("Error '%s' creating schema '%s'\n", errstr, schema_name); goto out; } -cache: - entry->serdes_schema = sschema; + + /* Log information about which schema was used */ + if (previous_schema != NULL) { + if (serdes_schema_id(current_schema) + == serdes_schema_id(previous_schema)) { + LOG_INFO("Using existing id %d for schema name '%s'\n", + serdes_schema_id(current_schema), + schema_name); + } else { + LOG_WARN("Using replacement id %d for schema name '%s' (previous id %d)\n", + serdes_schema_id(current_schema), + schema_name, + serdes_schema_id(previous_schema)); + serdes_schema_destroy(previous_schema); + } + } else { + LOG_INFO("Using brand new id %d for schema name '%s'\n", + serdes_schema_id(current_schema), + schema_name); + } + + /* Cache the schema in this plugin */ + entry->serdes_schema = current_schema; entry->ldms_schema = lschema; entry->schema_name = strdup(schema_name); rbn_init(&entry->rbn, entry->schema_name); @@ -157,7 +182,7 @@ serdes_schema_find(aks_handle_t sh, char *schema_name, pthread_mutex_unlock(&schema_rbt_lock); if (json_buf) free(json_buf); - return sschema; + return current_schema; } pthread_mutex_t sk_lock = PTHREAD_MUTEX_INITIALIZER;