diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java index 5f612547fc5..860f3ea5874 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GaaSJobObservabilityEventProducer.java @@ -76,6 +76,7 @@ public abstract class GaaSJobObservabilityEventProducer implements Closeable { public static final String ISSUES_READ_FAILED_METRIC_NAME = GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount"; public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = GAAS_JOB_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics"; public static final String GAAS_OBSERVABILITY_JOB_SUCCEEDED_METRIC_NAME = "jobSucceeded"; + private static final String DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE = "-"; protected MetricContext metricContext; protected State state; @@ -138,16 +139,31 @@ public void emitObservabilityEvent(final State jobState) { } public Attributes getEventAttributes(GaaSJobObservabilityEvent event) { - Attributes tags = Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, event.getFlowName()) - .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, event.getFlowGroup()) - .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, event.getJobName()) + Attributes tags = Attributes.builder().put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, getOrDefault(event.getFlowName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE)) + .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, getOrDefault(event.getFlowGroup(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE)) + .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, getOrDefault(event.getJobName(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE)) .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, event.getFlowExecutionId()) - .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, event.getExecutorId()) - .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, event.getFlowEdgeId()) + .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, getOrDefault(event.getExecutorId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE)) + .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, getOrDefault(event.getFlowEdgeId(), DEFAULT_OPENTELEMETRY_ATTRIBUTE_VALUE)) .build(); return tags; } + /** + * Returns the given string value if it is not empty (i.e., not null and not empty). + * Otherwise, returns the specified default value. + * + *

This method utilizes {@link org.apache.commons.lang3.StringUtils#isNotEmpty(CharSequence)} + * to check if the string is non-empty.

+ * + * @param value the string to check + * @param defaultValue the default value to return if the provided string is empty or null + * @return the original string if it is not empty; otherwise, the provided default value + */ + private String getOrDefault(String value, String defaultValue) { + return StringUtils.isNotEmpty(value) ? value : defaultValue; + } + /** * Emits the GaaSJobObservabilityEvent with the mechanism that the child class is built upon e.g. Kafka * @param event