diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 3345651351..7916d70c64 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -446,12 +446,6 @@ public StoreIngestionTask( getRecordTransformer != null ? getRecordTransformer.apply(store.getCurrentVersion()) : null; this.recordTransformer = clientRecordTransformer != null ? new BlockingDaVinciRecordTransformer(clientRecordTransformer) : null; - if (this.recordTransformer != null) { - versionedIngestionStats.registerTransformerLatencySensor(storeName, versionNumber); - versionedIngestionStats.registerTransformerLifecycleStartLatency(storeName, versionNumber); - versionedIngestionStats.registerTransformerLifecycleEndLatency(storeName, versionNumber); - versionedIngestionStats.registerTransformerErrorSensor(storeName, versionNumber); - } this.localKafkaServer = this.kafkaProps.getProperty(KAFKA_BOOTSTRAP_SERVERS); this.localKafkaServerSingletonSet = Collections.singleton(localKafkaServer); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java index 6301af96fb..abaa4a0b6d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java @@ -235,24 +235,4 @@ public void recordTransformerError(String storeName, int version, double value, public void recordMaxIdleTime(String storeName, int version, long idleTimeMs) { getStats(storeName, version).recordIdleTime(idleTimeMs); } - - public void registerTransformerLatencySensor(String storeName, int version) { - getStats(storeName, version).registerTransformerLatencySensor(); - getTotalStats(storeName).registerTransformerLatencySensor(); - } - - public void registerTransformerLifecycleStartLatency(String storeName, int version) { - getStats(storeName, version).registerTransformerLifecycleStartLatencySensor(); - getTotalStats(storeName).registerTransformerLifecycleStartLatencySensor(); - } - - public void registerTransformerLifecycleEndLatency(String storeName, int version) { - getStats(storeName, version).registerTransformerLifecycleEndLatencySensor(); - getTotalStats(storeName).registerTransformerLifecycleEndLatencySensor(); - } - - public void registerTransformerErrorSensor(String storeName, int version) { - getStats(storeName, version).registerTransformerErrorSensor(); - getTotalStats(storeName).registerTransformerErrorSensor(); - } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java index 7816f47010..05c9ce20e8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java @@ -7,6 +7,7 @@ import com.linkedin.davinci.kafka.consumer.StoreIngestionTask; import com.linkedin.venice.stats.LongAdderRateGauge; import com.linkedin.venice.utils.RegionUtils; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricConfig; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; @@ -93,9 +94,9 @@ public class IngestionStats { private final WritePathLatencySensor consumedRecordEndToEndProcessingLatencySensor; private final WritePathLatencySensor nearlineProducerToLocalBrokerLatencySensor; private final WritePathLatencySensor nearlineLocalBrokerToReadyToServeLatencySensor; - private WritePathLatencySensor transformerLatencySensor; - private WritePathLatencySensor transformerLifecycleStartLatencySensor; - private WritePathLatencySensor transformerLifecycleEndLatencySensor; + private final WritePathLatencySensor transformerLatencySensor; + private final WritePathLatencySensor transformerLifecycleStartLatencySensor; + private final WritePathLatencySensor transformerLifecycleEndLatencySensor; private final WritePathLatencySensor producerCallBackLatency; private final WritePathLatencySensor leaderPreprocessingLatency; private final WritePathLatencySensor internalPreprocessingLatency; @@ -109,14 +110,14 @@ public class IngestionStats { /** Record a version-level offset rewind events for VTs across all stores. */ private final Count versionTopicEndOffsetRewindCount = new Count(); - private final Sensor versionTopicEndOffsetRewindSensor; + private final Lazy versionTopicEndOffsetRewindSensor; private final MetricsRepository localMetricRepository; // Measure the max idle time among partitions for a given the store on this host private final LongAdderRateGauge idleTimeSensor = new LongAdderRateGauge(); - private Count transformerErrorCount = new Count(); - private Sensor transformerErrorSensor; + private final Count transformerErrorCount = new Count(); + private final Lazy transformerErrorSensor; public IngestionStats(VeniceServerConfig serverConfig) { @@ -171,8 +172,11 @@ public IngestionStats(VeniceServerConfig serverConfig) { registerSensor(localMetricRepository, LEADER_RECORDS_PRODUCED_METRIC_NAME, leaderRecordsProducedSensor); registerSensor(localMetricRepository, LEADER_BYTES_PRODUCED_METRIC_NAME, leaderBytesProducedSensor); - versionTopicEndOffsetRewindSensor = localMetricRepository.sensor(VERSION_TOPIC_END_OFFSET_REWIND_COUNT); - versionTopicEndOffsetRewindSensor.add(VERSION_TOPIC_END_OFFSET_REWIND_COUNT, versionTopicEndOffsetRewindCount); + versionTopicEndOffsetRewindSensor = Lazy.of(() -> { + Sensor versionTopicEndOffsetRewindSensor = localMetricRepository.sensor(VERSION_TOPIC_END_OFFSET_REWIND_COUNT); + versionTopicEndOffsetRewindSensor.add(VERSION_TOPIC_END_OFFSET_REWIND_COUNT, versionTopicEndOffsetRewindCount); + return versionTopicEndOffsetRewindSensor; + }); producerSourceBrokerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, "producer_to_source_broker_latency"); @@ -207,6 +211,17 @@ public IngestionStats(VeniceServerConfig serverConfig) { registerSensor(localMetricRepository, OFFSET_REGRESSION_DCR_ERROR, offsetRegressionDCRErrorSensor); registerSensor(localMetricRepository, TOMBSTONE_CREATION_DCR, tombstoneCreationDCRSensor); registerSensor(localMetricRepository, IDLE_TIME, idleTimeSensor); + + transformerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LATENCY); + transformerLifecycleStartLatencySensor = + new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_START_LATENCY); + transformerLifecycleEndLatencySensor = + new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_END_LATENCY); + transformerErrorSensor = Lazy.of(() -> { + Sensor transformerErrorSensor = localMetricRepository.sensor(TRANSFORMER_ERROR_COUNT); + transformerErrorSensor.add(TRANSFORMER_ERROR_COUNT, transformerErrorCount); + return transformerErrorSensor; + }); } private void registerSensor(MetricsRepository localMetricRepository, String sensorName, LongAdderRateGauge gauge) { @@ -376,7 +391,7 @@ public void recordInternalPreprocessingLatency(double value, long currentTimeMs) } public void recordVersionTopicEndOffsetRewind() { - versionTopicEndOffsetRewindSensor.record(); + versionTopicEndOffsetRewindSensor.get().record(); } public double getVersionTopicEndOffsetRewindCount() { @@ -568,55 +583,25 @@ public void recordNearlineLocalBrokerToReadyToServeLatency(double value, long cu } public void recordTransformerError(double value, long currentTimeMs) { - transformerErrorSensor.record(value, currentTimeMs); - } - - public void registerTransformerErrorSensor() { - if (transformerErrorSensor == null) { - transformerErrorSensor = localMetricRepository.sensor(TRANSFORMER_ERROR_COUNT); - transformerErrorSensor.add(TRANSFORMER_ERROR_COUNT, transformerErrorCount); - } + transformerErrorSensor.get().record(value, currentTimeMs); } public double getTransformerErrorCount() { - if (transformerErrorCount != null) { - return transformerErrorCount.measure(METRIC_CONFIG, System.currentTimeMillis()); - } - return 0; + return transformerErrorCount.measure(METRIC_CONFIG, System.currentTimeMillis()); } public void recordTransformerLatency(double value, long currentTimeMs) { transformerLatencySensor.record(value, currentTimeMs); } - public void registerTransformerLatencySensor() { - if (transformerLatencySensor == null) { - transformerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LATENCY); - } - } - public void recordTransformerLifecycleStartLatency(double value, long currentTimeMs) { transformerLifecycleStartLatencySensor.record(value, currentTimeMs); } - public void registerTransformerLifecycleStartLatencySensor() { - if (transformerLifecycleStartLatencySensor == null) { - transformerLifecycleStartLatencySensor = - new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_START_LATENCY); - } - } - public void recordTransformerLifecycleEndLatency(double value, long currentTimeMs) { transformerLifecycleEndLatencySensor.record(value, currentTimeMs); } - public void registerTransformerLifecycleEndLatencySensor() { - if (transformerLifecycleEndLatencySensor == null) { - transformerLifecycleEndLatencySensor = - new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_END_LATENCY); - } - } - public void recordIdleTime(long value) { idleTimeSensor.record(value); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IsolatedIngestionProcessHeartbeatStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IsolatedIngestionProcessHeartbeatStats.java index a0419bc6de..bc2f8ba0e0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IsolatedIngestionProcessHeartbeatStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IsolatedIngestionProcessHeartbeatStats.java @@ -1,6 +1,7 @@ package com.linkedin.davinci.stats; import com.linkedin.venice.stats.AbstractVeniceStats; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; import io.tehuti.metrics.stats.Gauge; @@ -10,17 +11,17 @@ public class IsolatedIngestionProcessHeartbeatStats extends AbstractVeniceStats { private static final String METRICS_PREFIX = "ingestion_isolation_heartbeat"; // Delay in millis since last successful heartbeat query. - private final Sensor heartbeatAgeSensor; + private final Lazy heartbeatAgeSensor; private final Sensor forkedProcessRestartSensor; public IsolatedIngestionProcessHeartbeatStats(MetricsRepository metricsRepository) { super(metricsRepository, METRICS_PREFIX); - heartbeatAgeSensor = registerSensor("heartbeat_age", new Gauge()); + heartbeatAgeSensor = registerLazySensor("heartbeat_age", new Gauge()); forkedProcessRestartSensor = registerSensor("forked_process_restart", new OccurrenceRate()); } public void recordHeartbeatAge(long heartbeatAgeInMs) { - heartbeatAgeSensor.record(heartbeatAgeInMs); + heartbeatAgeSensor.get().record(heartbeatAgeInMs); } public void recordForkedProcessRestart() { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/KafkaConsumerServiceStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/KafkaConsumerServiceStats.java index c68240a398..d0c2ab427b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/KafkaConsumerServiceStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/KafkaConsumerServiceStats.java @@ -3,6 +3,7 @@ import com.linkedin.venice.stats.AbstractVeniceStats; import com.linkedin.venice.stats.LongAdderRateGauge; import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; import io.tehuti.metrics.stats.AsyncGauge; @@ -24,25 +25,25 @@ public class KafkaConsumerServiceStats extends AbstractVeniceStats { private final LongAdderRateGauge pollRequestSensor; - private final Sensor pollRequestLatencySensor; + private final Lazy pollRequestLatencySensor; private final Sensor pollResultNumSensor; private final LongAdderRateGauge pollNonZeroResultNumSensor; private final Sensor pollRequestError; - private final Sensor consumerRecordsProducingToWriterBufferLatencySensor; - private final Sensor detectedDeletedTopicNumSensor; - private final Sensor detectedNoRunningIngestionTopicPartitionNumSensor; - private final Sensor delegateSubscribeLatencySensor; - private final Sensor updateCurrentAssignmentLatencySensor; - private final Sensor maxPartitionsPerConsumer; - private final Sensor minPartitionsPerConsumer; - private final Sensor avgPartitionsPerConsumer; + private final Lazy consumerRecordsProducingToWriterBufferLatencySensor; + private final Lazy detectedDeletedTopicNumSensor; + private final Lazy detectedNoRunningIngestionTopicPartitionNumSensor; + private final Lazy delegateSubscribeLatencySensor; + private final Lazy updateCurrentAssignmentLatencySensor; + private final Lazy maxPartitionsPerConsumer; + private final Lazy minPartitionsPerConsumer; + private final Lazy avgPartitionsPerConsumer; private final Sensor getOffsetLagIsAbsentSensor; private final Sensor getOffsetLagIsPresentSensor; private final Sensor getLatestOffsetIsAbsentSensor; private final Sensor getLatestOffsetIsPresentSensor; private final Sensor byteSizeSensor; - private final Sensor idleTimeSensor; + private final Lazy idleTimeSensor; public KafkaConsumerServiceStats( MetricsRepository metricsRepository, @@ -71,12 +72,12 @@ public KafkaConsumerServiceStats( */ // the consumer idle time - idleTimeSensor = registerSensor("idle_time", new Max()); + idleTimeSensor = registerLazySensor("idle_time", new Max()); // the number of poll requests pollRequestSensor = registerOnlyTotalRate("consumer_poll_request", totalStats, () -> totalStats.pollRequestSensor, time); // Notice that "pollRequestLatencySensor" only reports correct data when consumer task threads are not stuck - pollRequestLatencySensor = registerSensor("consumer_poll_request_latency", new Avg(), new Max()); + pollRequestLatencySensor = registerLazySensor("consumer_poll_request_latency", new Avg(), new Max()); pollNonZeroResultNumSensor = registerOnlyTotalRate( "consumer_poll_non_zero_result_num", totalStats, @@ -96,16 +97,17 @@ public KafkaConsumerServiceStats( pollRequestError = registerSensor("consumer_poll_error", new OccurrenceRate()); // To measure 'put' latency of consumer records blocking queue consumerRecordsProducingToWriterBufferLatencySensor = - registerSensor("consumer_records_producing_to_write_buffer_latency", new Avg(), new Max()); - detectedDeletedTopicNumSensor = registerSensor("detected_deleted_topic_num", new Total()); + registerLazySensor("consumer_records_producing_to_write_buffer_latency", new Avg(), new Max()); + detectedDeletedTopicNumSensor = registerLazySensor("detected_deleted_topic_num", new Total()); detectedNoRunningIngestionTopicPartitionNumSensor = - registerSensor("detected_no_running_ingestion_topic_partition_num", new Total()); - delegateSubscribeLatencySensor = registerSensor("delegate_subscribe_latency", new Avg(), new Max()); - updateCurrentAssignmentLatencySensor = registerSensor("update_current_assignment_latency", new Avg(), new Max()); + registerLazySensor("detected_no_running_ingestion_topic_partition_num", new Total()); + delegateSubscribeLatencySensor = registerLazySensor("delegate_subscribe_latency", new Avg(), new Max()); + updateCurrentAssignmentLatencySensor = + registerLazySensor("update_current_assignment_latency", new Avg(), new Max()); - minPartitionsPerConsumer = registerSensor("min_partitions_per_consumer", new Gauge()); - maxPartitionsPerConsumer = registerSensor("max_partitions_per_consumer", new Gauge()); - avgPartitionsPerConsumer = registerSensor("avg_partitions_per_consumer", new Gauge()); + minPartitionsPerConsumer = registerLazySensor("min_partitions_per_consumer", new Gauge()); + maxPartitionsPerConsumer = registerLazySensor("max_partitions_per_consumer", new Gauge()); + avgPartitionsPerConsumer = registerLazySensor("avg_partitions_per_consumer", new Gauge()); Sensor getOffsetLagSensor = registerSensor("getOffsetLag", new OccurrenceRate()); Sensor[] offsetLagParent = new Sensor[] { getOffsetLagSensor }; @@ -122,7 +124,7 @@ public KafkaConsumerServiceStats( public void recordPollRequestLatency(double latency) { pollRequestSensor.record(); - pollRequestLatencySensor.record(latency); + pollRequestLatencySensor.get().record(latency); } public void recordPollResultNum(int count) { @@ -134,7 +136,7 @@ public void recordNonZeroPollResultNum(int count) { } public void recordConsumerRecordsProducingToWriterBufferLatency(double latency) { - consumerRecordsProducingToWriterBufferLatencySensor.record(latency); + consumerRecordsProducingToWriterBufferLatencySensor.get().record(latency); } public void recordPollError() { @@ -142,31 +144,31 @@ public void recordPollError() { } public void recordDetectedDeletedTopicNum(int count) { - detectedDeletedTopicNumSensor.record(count); + detectedDeletedTopicNumSensor.get().record(count); } public void recordDetectedNoRunningIngestionTopicPartitionNum(int count) { - detectedNoRunningIngestionTopicPartitionNumSensor.record(count); + detectedNoRunningIngestionTopicPartitionNumSensor.get().record(count); } public void recordDelegateSubscribeLatency(double value) { - delegateSubscribeLatencySensor.record(value); + delegateSubscribeLatencySensor.get().record(value); } public void recordUpdateCurrentAssignmentLatency(double value) { - updateCurrentAssignmentLatencySensor.record(value); + updateCurrentAssignmentLatencySensor.get().record(value); } public void recordMinPartitionsPerConsumer(int count) { - minPartitionsPerConsumer.record(count); + minPartitionsPerConsumer.get().record(count); } public void recordMaxPartitionsPerConsumer(int count) { - maxPartitionsPerConsumer.record(count); + maxPartitionsPerConsumer.get().record(count); } public void recordAvgPartitionsPerConsumer(int count) { - avgPartitionsPerConsumer.record(count); + avgPartitionsPerConsumer.get().record(count); } public void recordOffsetLagIsAbsent() { @@ -190,6 +192,6 @@ public void recordByteSizePerPoll(double count) { } public void recordConsumerIdleTime(double time) { - idleTimeSensor.record(time); + idleTimeSensor.get().record(time); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/MetadataUpdateStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/MetadataUpdateStats.java index ef4a1d2b4b..08d9c6065e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/MetadataUpdateStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/MetadataUpdateStats.java @@ -2,6 +2,7 @@ import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService; import com.linkedin.venice.stats.AbstractVeniceStats; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; import io.tehuti.metrics.stats.Gauge; @@ -14,23 +15,23 @@ public class MetadataUpdateStats extends AbstractVeniceStats { private static final String METRICS_PREFIX = "ingestion_isolation_metadata_updates"; // Number of remaining elements inside metadata update queue. - private final Sensor metadataUpdateQueueLengthSensor; + private final Lazy metadataUpdateQueueLengthSensor; // If we encountered unknown exception during metadata update, we will set the Gauge value to 1 - private final Sensor metadataUpdateQueueErrorSensor; + private final Lazy metadataUpdateQueueErrorSensor; public MetadataUpdateStats(MetricsRepository metricsRepository) { super(metricsRepository, METRICS_PREFIX); - metadataUpdateQueueLengthSensor = registerSensor("queue_length", new Gauge()); - metadataUpdateQueueErrorSensor = registerSensor("queue_update_error", new Gauge()); + metadataUpdateQueueLengthSensor = registerLazySensor("queue_length", new Gauge()); + metadataUpdateQueueErrorSensor = registerLazySensor("queue_update_error", new Gauge()); // Reset metadata update queue error Gauge. recordMetadataQueueUpdateError(0.0); } public void recordMetadataUpdateQueueLength(int queueLength) { - metadataUpdateQueueLengthSensor.record(queueLength); + metadataUpdateQueueLengthSensor.get().record(queueLength); } public void recordMetadataQueueUpdateError(double value) { - metadataUpdateQueueErrorSensor.record(value); + metadataUpdateQueueErrorSensor.get().record(value); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ParticipantStoreConsumptionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ParticipantStoreConsumptionStats.java index 23e40fa8d0..a6f4109633 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ParticipantStoreConsumptionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ParticipantStoreConsumptionStats.java @@ -1,6 +1,7 @@ package com.linkedin.davinci.stats; import com.linkedin.venice.stats.AbstractVeniceStats; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; import io.tehuti.metrics.stats.Avg; @@ -32,35 +33,35 @@ public class ParticipantStoreConsumptionStats extends AbstractVeniceStats { private static final String HEARTBEAT = "heartbeat"; - private final Sensor killPushJobLatencySensor; - private final Sensor killedPushJobsSensor; - private final Sensor failedInitializationSensor; - private final Sensor killPushJobFailedConsumption; + private final Lazy killPushJobLatencySensor; + private final Lazy killedPushJobsSensor; + private final Lazy failedInitializationSensor; + private final Lazy killPushJobFailedConsumption; private final Sensor heartbeatSensor; public ParticipantStoreConsumptionStats(MetricsRepository metricsRepository, String clusterName) { super(metricsRepository, clusterName + NAME_SUFFIX); - killPushJobLatencySensor = registerSensorIfAbsent(KILL_PUSH_JOB_LATENCY, new Avg(), new Max()); - killedPushJobsSensor = registerSensorIfAbsent(KILLED_PUSH_JOBS, new Count()); - failedInitializationSensor = registerSensorIfAbsent(FAILED_INITIALIZATION, new Count()); - killPushJobFailedConsumption = registerSensorIfAbsent(KILL_PUSH_JOB_FAILED_CONSUMPTION, new Count()); + killPushJobLatencySensor = registerLazySensorIfAbsent(KILL_PUSH_JOB_LATENCY, new Avg(), new Max()); + killedPushJobsSensor = registerLazySensorIfAbsent(KILLED_PUSH_JOBS, new Count()); + failedInitializationSensor = registerLazySensorIfAbsent(FAILED_INITIALIZATION, new Count()); + killPushJobFailedConsumption = registerLazySensorIfAbsent(KILL_PUSH_JOB_FAILED_CONSUMPTION, new Count()); heartbeatSensor = registerSensorIfAbsent(HEARTBEAT, new OccurrenceRate()); } public void recordKillPushJobLatency(double latencyInMs) { - killPushJobLatencySensor.record(latencyInMs); + killPushJobLatencySensor.get().record(latencyInMs); } public void recordKilledPushJobs() { - killedPushJobsSensor.record(); + killedPushJobsSensor.get().record(); } public void recordFailedInitialization() { - failedInitializationSensor.record(); + failedInitializationSensor.get().record(); } public void recordKillPushJobFailedConsumption() { - killPushJobFailedConsumption.record(); + killPushJobFailedConsumption.get().record(); } public void recordHeartbeat() { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StoreBufferServiceStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StoreBufferServiceStats.java index 3f587f9105..94d3fbe1ae 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StoreBufferServiceStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/StoreBufferServiceStats.java @@ -1,6 +1,7 @@ package com.linkedin.davinci.stats; import com.linkedin.venice.stats.AbstractVeniceStats; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; import io.tehuti.metrics.stats.AsyncGauge; @@ -15,8 +16,8 @@ public class StoreBufferServiceStats extends AbstractVeniceStats { private final Sensor totalRemainingMemorySensor; private final Sensor maxMemoryUsagePerWriterSensor; private final Sensor minMemoryUsagePerWriterSensor; - private final Sensor internalProcessingLatencySensor; - private final Sensor internalProcessingErrorSensor; + private final Lazy internalProcessingLatencySensor; + private final Lazy internalProcessingErrorSensor; public StoreBufferServiceStats( MetricsRepository metricsRepository, @@ -38,15 +39,15 @@ public StoreBufferServiceStats( (ignored, ignored2) -> minMemoryUsagePerDrainerSupplier.getAsLong(), "min_memory_usage_per_writer")); - internalProcessingLatencySensor = registerSensor("internal_processing_latency", new Avg(), new Max()); - internalProcessingErrorSensor = registerSensor("internal_processing_error", new OccurrenceRate()); + internalProcessingLatencySensor = registerLazySensor("internal_processing_latency", new Avg(), new Max()); + internalProcessingErrorSensor = registerLazySensor("internal_processing_error", new OccurrenceRate()); } public void recordInternalProcessingError() { - internalProcessingErrorSensor.record(); + internalProcessingErrorSensor.get().record(); } public void recordInternalProcessingLatency(long latency) { - internalProcessingLatencySensor.record(latency); + internalProcessingLatencySensor.get().record(latency); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/WritePathLatencySensor.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/WritePathLatencySensor.java index bb82fd4edd..536d9cca5c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/WritePathLatencySensor.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/WritePathLatencySensor.java @@ -1,5 +1,6 @@ package com.linkedin.davinci.stats; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricConfig; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; @@ -13,16 +14,19 @@ * information than just max and average, e.g. 50/99 percentile numbers. */ public class WritePathLatencySensor { - private final Sensor sensor; + private final Lazy sensor; private final MetricConfig metricConfig; private final Avg avgStat = new Avg(); private final Max maxStat = new Max(); public WritePathLatencySensor(MetricsRepository metricsRepo, MetricConfig metricConfig, String sensorName) { this.metricConfig = metricConfig; - this.sensor = metricsRepo.sensor(sensorName); - this.sensor.add(sensorName + Avg.class.getSimpleName(), avgStat); - this.sensor.add(sensorName + Max.class.getSimpleName(), maxStat); + this.sensor = Lazy.of(() -> { + Sensor sensor = metricsRepo.sensor(sensorName); + sensor.add(sensorName + Avg.class.getSimpleName(), avgStat); + sensor.add(sensorName + Max.class.getSimpleName(), maxStat); + return sensor; + }); } /** @@ -43,6 +47,6 @@ public double getMax() { * Record the latency value. */ public void record(double value, long currentTimeMs) { - sensor.record(value, currentTimeMs); + sensor.get().record(value, currentTimeMs); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/venice/stats/LeakedResourceCleanerStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/venice/stats/LeakedResourceCleanerStats.java index c09c336112..c5a86f078f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/venice/stats/LeakedResourceCleanerStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/venice/stats/LeakedResourceCleanerStats.java @@ -1,6 +1,7 @@ package com.linkedin.venice.stats; import com.linkedin.venice.cleaner.LeakedResourceCleaner; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; import io.tehuti.metrics.stats.Total; @@ -10,15 +11,15 @@ * {@code LeakedResourceCleanerStats} records the occurrences of store resources get removed by {@link LeakedResourceCleaner}. */ public class LeakedResourceCleanerStats extends AbstractVeniceStats { - private final Sensor leakedVersionTotalSensor; + private final Lazy leakedVersionTotalSensor; public LeakedResourceCleanerStats(MetricsRepository metricsRepository) { super(metricsRepository, "LeakedResourceCleaner"); - this.leakedVersionTotalSensor = registerSensor("leaked_version", new Total()); + this.leakedVersionTotalSensor = registerLazySensor("leaked_version", new Total()); } public void recordLeakedVersion() { - leakedVersionTotalSensor.record(); + leakedVersionTotalSensor.get().record(); } } diff --git a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/FastClientStats.java b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/FastClientStats.java index 7ddc844124..97da012781 100644 --- a/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/FastClientStats.java +++ b/clients/venice-client/src/main/java/com/linkedin/venice/fastclient/stats/FastClientStats.java @@ -5,6 +5,7 @@ import com.linkedin.venice.stats.StatsUtils; import com.linkedin.venice.stats.TehutiUtils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.Metric; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; @@ -30,11 +31,11 @@ public class FastClientStats extends com.linkedin.venice.client.stats.ClientStat private final String storeName; private final Sensor noAvailableReplicaRequestCountSensor; - private final Sensor dualReadFastClientSlowerRequestCountSensor; + private final Lazy dualReadFastClientSlowerRequestCountSensor; private final Sensor dualReadFastClientSlowerRequestRatioSensor; - private final Sensor dualReadFastClientErrorThinClientSucceedRequestCountSensor; + private final Lazy dualReadFastClientErrorThinClientSucceedRequestCountSensor; private final Sensor dualReadFastClientErrorThinClientSucceedRequestRatioSensor; - private final Sensor dualReadThinClientFastClientLatencyDeltaSensor; + private final Lazy dualReadThinClientFastClientLatencyDeltaSensor; private final Sensor leakedRequestCountSensor; @@ -67,14 +68,14 @@ private FastClientStats(MetricsRepository metricsRepository, String storeName, R Rate requestRate = getRequestRate(); Rate fastClientSlowerRequestRate = new OccurrenceRate(); this.dualReadFastClientSlowerRequestCountSensor = - registerSensor("dual_read_fastclient_slower_request_count", fastClientSlowerRequestRate); + registerLazySensor("dual_read_fastclient_slower_request_count", fastClientSlowerRequestRate); this.dualReadFastClientSlowerRequestRatioSensor = registerSensor( new TehutiUtils.SimpleRatioStat( fastClientSlowerRequestRate, requestRate, "dual_read_fastclient_slower_request_ratio")); Rate fastClientErrorThinClientSucceedRequestRate = new OccurrenceRate(); - this.dualReadFastClientErrorThinClientSucceedRequestCountSensor = registerSensor( + this.dualReadFastClientErrorThinClientSucceedRequestCountSensor = registerLazySensor( "dual_read_fastclient_error_thinclient_succeed_request_count", fastClientErrorThinClientSucceedRequestRate); this.dualReadFastClientErrorThinClientSucceedRequestRatioSensor = registerSensor( @@ -82,8 +83,10 @@ private FastClientStats(MetricsRepository metricsRepository, String storeName, R fastClientErrorThinClientSucceedRequestRate, requestRate, "dual_read_fastclient_error_thinclient_succeed_request_ratio")); - this.dualReadThinClientFastClientLatencyDeltaSensor = - registerSensorWithDetailedPercentiles("dual_read_thinclient_fastclient_latency_delta", new Max(), new Avg()); + this.dualReadThinClientFastClientLatencyDeltaSensor = registerLazySensorWithDetailedPercentiles( + "dual_read_thinclient_fastclient_latency_delta", + new Max(), + new Avg()); this.leakedRequestCountSensor = registerSensor("leaked_request_count", new OccurrenceRate()); this.longTailRetryRequestSensor = registerSensor("long_tail_retry_request", new OccurrenceRate()); this.errorRetryRequestSensor = registerSensor("error_retry_request", new OccurrenceRate()); @@ -103,15 +106,15 @@ public void recordNoAvailableReplicaRequest() { } public void recordFastClientSlowerRequest() { - dualReadFastClientSlowerRequestCountSensor.record(); + dualReadFastClientSlowerRequestCountSensor.get().record(); } public void recordFastClientErrorThinClientSucceedRequest() { - dualReadFastClientErrorThinClientSucceedRequestCountSensor.record(); + dualReadFastClientErrorThinClientSucceedRequestCountSensor.get().record(); } public void recordThinClientFastClientLatencyDelta(double latencyDelta) { - dualReadThinClientFastClientLatencyDeltaSensor.record(latencyDelta); + dualReadThinClientFastClientLatencyDeltaSensor.get().record(latencyDelta); } private RouteStats getRouteStats(String instanceUrl) { @@ -213,60 +216,61 @@ public List getMetricValues(String sensorName, String... stats) { * Per-route request metrics. */ private static class RouteStats extends AbstractVeniceStats { - private final Sensor requestCountSensor; - private final Sensor responseWaitingTimeSensor; - private final Sensor healthyRequestCountSensor; - private final Sensor quotaExceededRequestCountSensor; - private final Sensor internalServerErrorRequestCountSensor; - private final Sensor serviceUnavailableRequestCountSensor; - private final Sensor leakedRequestCountSensor; - private final Sensor otherErrorRequestCountSensor; + private final Lazy requestCountSensor; + private final Lazy responseWaitingTimeSensor; + private final Lazy healthyRequestCountSensor; + private final Lazy quotaExceededRequestCountSensor; + private final Lazy internalServerErrorRequestCountSensor; + private final Lazy serviceUnavailableRequestCountSensor; + private final Lazy leakedRequestCountSensor; + private final Lazy otherErrorRequestCountSensor; public RouteStats(MetricsRepository metricsRepository, String storeName, String instanceName) { super(metricsRepository, storeName + "." + StatsUtils.convertHostnameToMetricName(instanceName)); - this.requestCountSensor = registerSensor("request_count", new OccurrenceRate()); - this.responseWaitingTimeSensor = - registerSensor("response_waiting_time", TehutiUtils.getPercentileStat(getName(), "response_waiting_time")); - this.healthyRequestCountSensor = registerSensor("healthy_request_count", new OccurrenceRate()); - this.quotaExceededRequestCountSensor = registerSensor("quota_exceeded_request_count", new OccurrenceRate()); + this.requestCountSensor = registerLazySensor("request_count", new OccurrenceRate()); + this.responseWaitingTimeSensor = registerLazySensor( + "response_waiting_time", + TehutiUtils.getPercentileStat(getName(), "response_waiting_time")); + this.healthyRequestCountSensor = registerLazySensor("healthy_request_count", new OccurrenceRate()); + this.quotaExceededRequestCountSensor = registerLazySensor("quota_exceeded_request_count", new OccurrenceRate()); this.internalServerErrorRequestCountSensor = - registerSensor("internal_server_error_request_count", new OccurrenceRate()); + registerLazySensor("internal_server_error_request_count", new OccurrenceRate()); this.serviceUnavailableRequestCountSensor = - registerSensor("service_unavailable_request_count", new OccurrenceRate()); - this.leakedRequestCountSensor = registerSensor("leaked_request_count", new OccurrenceRate()); - this.otherErrorRequestCountSensor = registerSensor("other_error_request_count", new OccurrenceRate()); + registerLazySensor("service_unavailable_request_count", new OccurrenceRate()); + this.leakedRequestCountSensor = registerLazySensor("leaked_request_count", new OccurrenceRate()); + this.otherErrorRequestCountSensor = registerLazySensor("other_error_request_count", new OccurrenceRate()); } public void recordRequest() { - requestCountSensor.record(); + requestCountSensor.get().record(); } public void recordResponseWaitingTime(double latency) { - responseWaitingTimeSensor.record(latency); + responseWaitingTimeSensor.get().record(latency); } public void recordHealthyRequest() { - healthyRequestCountSensor.record(); + healthyRequestCountSensor.get().record(); } public void recordQuotaExceededRequest() { - quotaExceededRequestCountSensor.record(); + quotaExceededRequestCountSensor.get().record(); } public void recordInternalServerErrorRequest() { - internalServerErrorRequestCountSensor.record(); + internalServerErrorRequestCountSensor.get().record(); } public void recordServiceUnavailableRequest() { - serviceUnavailableRequestCountSensor.record(); + serviceUnavailableRequestCountSensor.get().record(); } public void recordLeakedRequest() { - leakedRequestCountSensor.record(); + leakedRequestCountSensor.get().record(); } public void recordOtherErrorRequest() { - otherErrorRequestCountSensor.record(); + otherErrorRequestCountSensor.get().record(); } } } diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/stats/ClientStats.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/stats/ClientStats.java index 3fd6321235..eaf7960b65 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/stats/ClientStats.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/stats/ClientStats.java @@ -4,6 +4,7 @@ import com.linkedin.venice.read.RequestType; import com.linkedin.venice.stats.TehutiUtils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.Sensor; import io.tehuti.metrics.stats.Avg; @@ -19,18 +20,18 @@ public class ClientStats extends BasicClientStats { private final Map httpStatusSensorMap = new VeniceConcurrentHashMap<>(); private final Sensor requestRetryCountSensor; private final Sensor successRequestDuplicateKeyCountSensor; - private final Sensor requestSerializationTime; - private final Sensor requestSubmissionToResponseHandlingTime; + private final Lazy requestSerializationTime; + private final Lazy requestSubmissionToResponseHandlingTime; private final Sensor responseDeserializationTime; - private final Sensor responseDecompressionTimeSensor; - private final Sensor streamingResponseTimeToReceiveFirstRecord; - private final Sensor streamingResponseTimeToReceive50PctRecord; - private final Sensor streamingResponseTimeToReceive90PctRecord; - private final Sensor streamingResponseTimeToReceive95PctRecord; - private final Sensor streamingResponseTimeToReceive99PctRecord; + private final Lazy responseDecompressionTimeSensor; + private final Lazy streamingResponseTimeToReceiveFirstRecord; + private final Lazy streamingResponseTimeToReceive50PctRecord; + private final Lazy streamingResponseTimeToReceive90PctRecord; + private final Lazy streamingResponseTimeToReceive95PctRecord; + private final Lazy streamingResponseTimeToReceive99PctRecord; private final Sensor appTimedOutRequestSensor; - private final Sensor appTimedOutRequestResultRatioSensor; - private final Sensor clientFutureTimeoutSensor; + private final Lazy appTimedOutRequestResultRatioSensor; + private final Lazy clientFutureTimeoutSensor; private final Sensor retryRequestKeyCountSensor; private final Sensor retryRequestSuccessKeyCountSensor; private final Sensor retryKeySuccessRatioSensor; @@ -66,13 +67,13 @@ protected ClientStats(MetricsRepository metricsRepository, String storeName, Req * on the caller's thread. */ requestSerializationTime = - registerSensorWithDetailedPercentiles("request_serialization_time", new Avg(), new Max()); + registerLazySensorWithDetailedPercentiles("request_serialization_time", new Avg(), new Max()); /** * The time it took between sending the request to the router and beginning to process the response. */ requestSubmissionToResponseHandlingTime = - registerSensorWithDetailedPercentiles("request_submission_to_response_handling_time", new Avg(), new Max()); + registerLazySensorWithDetailedPercentiles("request_submission_to_response_handling_time", new Avg(), new Max()); /** * The total time it took to process the response. @@ -81,16 +82,16 @@ protected ClientStats(MetricsRepository metricsRepository, String storeName, Req registerSensorWithDetailedPercentiles("response_deserialization_time", new Avg(), new Max()); responseDecompressionTimeSensor = - registerSensorWithDetailedPercentiles("response_decompression_time", new Avg(), new Max()); + registerLazySensorWithDetailedPercentiles("response_decompression_time", new Avg(), new Max()); /** * Metrics to track the latency of each proportion of results received. */ - streamingResponseTimeToReceiveFirstRecord = registerSensorWithDetailedPercentiles("response_ttfr", new Avg()); - streamingResponseTimeToReceive50PctRecord = registerSensorWithDetailedPercentiles("response_tt50pr", new Avg()); - streamingResponseTimeToReceive90PctRecord = registerSensorWithDetailedPercentiles("response_tt90pr", new Avg()); - streamingResponseTimeToReceive95PctRecord = registerSensorWithDetailedPercentiles("response_tt95pr", new Avg()); - streamingResponseTimeToReceive99PctRecord = registerSensorWithDetailedPercentiles("response_tt99pr", new Avg()); + streamingResponseTimeToReceiveFirstRecord = registerLazySensorWithDetailedPercentiles("response_ttfr", new Avg()); + streamingResponseTimeToReceive50PctRecord = registerLazySensorWithDetailedPercentiles("response_tt50pr", new Avg()); + streamingResponseTimeToReceive90PctRecord = registerLazySensorWithDetailedPercentiles("response_tt90pr", new Avg()); + streamingResponseTimeToReceive95PctRecord = registerLazySensorWithDetailedPercentiles("response_tt95pr", new Avg()); + streamingResponseTimeToReceive99PctRecord = registerLazySensorWithDetailedPercentiles("response_tt99pr", new Avg()); /** * Metrics to track the timed-out requests. @@ -101,9 +102,12 @@ protected ClientStats(MetricsRepository metricsRepository, String storeName, Req * This timeout behavior could actually happen before the D2 timeout, which is specified/configured in a different way. */ appTimedOutRequestSensor = registerSensor("app_timed_out_request", new OccurrenceRate()); - appTimedOutRequestResultRatioSensor = - registerSensorWithDetailedPercentiles("app_timed_out_request_result_ratio", new Avg(), new Min(), new Max()); - clientFutureTimeoutSensor = registerSensor("client_future_timeout", new Avg(), new Min(), new Max()); + appTimedOutRequestResultRatioSensor = registerLazySensorWithDetailedPercentiles( + "app_timed_out_request_result_ratio", + new Avg(), + new Min(), + new Max()); + clientFutureTimeoutSensor = registerLazySensor("client_future_timeout", new Avg(), new Min(), new Max()); /* Metrics relevant to track long tail retry efficacy for batch get*/ Rate retryRequestKeyCount = new Rate(); retryRequestKeyCountSensor = registerSensor("retry_request_key_count", retryRequestKeyCount, new Avg(), new Max()); @@ -137,11 +141,11 @@ public void recordSuccessDuplicateRequestKeyCount(int duplicateKeyCount) { } public void recordRequestSerializationTime(double latency) { - requestSerializationTime.record(latency); + requestSerializationTime.get().record(latency); } public void recordRequestSubmissionToResponseHandlingTime(double latency) { - requestSubmissionToResponseHandlingTime.record(latency); + requestSubmissionToResponseHandlingTime.get().record(latency); } public void recordResponseDeserializationTime(double latency) { @@ -149,27 +153,27 @@ public void recordResponseDeserializationTime(double latency) { } public void recordResponseDecompressionTime(double latency) { - responseDecompressionTimeSensor.record(latency); + responseDecompressionTimeSensor.get().record(latency); } public void recordStreamingResponseTimeToReceiveFirstRecord(double latency) { - streamingResponseTimeToReceiveFirstRecord.record(latency); + streamingResponseTimeToReceiveFirstRecord.get().record(latency); } public void recordStreamingResponseTimeToReceive50PctRecord(double latency) { - streamingResponseTimeToReceive50PctRecord.record(latency); + streamingResponseTimeToReceive50PctRecord.get().record(latency); } public void recordStreamingResponseTimeToReceive90PctRecord(double latency) { - streamingResponseTimeToReceive90PctRecord.record(latency); + streamingResponseTimeToReceive90PctRecord.get().record(latency); } public void recordStreamingResponseTimeToReceive95PctRecord(double latency) { - streamingResponseTimeToReceive95PctRecord.record(latency); + streamingResponseTimeToReceive95PctRecord.get().record(latency); } public void recordStreamingResponseTimeToReceive99PctRecord(double latency) { - streamingResponseTimeToReceive99PctRecord.record(latency); + streamingResponseTimeToReceive99PctRecord.get().record(latency); } public void recordAppTimedOutRequest() { @@ -177,11 +181,11 @@ public void recordAppTimedOutRequest() { } public void recordAppTimedOutRequestResultRatio(double ratio) { - appTimedOutRequestResultRatioSensor.record(ratio); + appTimedOutRequestResultRatioSensor.get().record(ratio); } public void recordClientFutureTimeout(long clientFutureTimeout) { - clientFutureTimeoutSensor.record(clientFutureTimeout); + clientFutureTimeoutSensor.get().record(clientFutureTimeout); } public void recordRetryRequestKeyCount(int numberOfKeysSentInRetryRequest) { diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceHttpStats.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceHttpStats.java index 669d552401..4c2a444a85 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceHttpStats.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceHttpStats.java @@ -1,6 +1,7 @@ package com.linkedin.venice.stats; import com.linkedin.venice.read.RequestType; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MeasurableStat; import io.tehuti.metrics.MetricsRepository; import io.tehuti.metrics.NamedMeasurableStat; @@ -34,6 +35,11 @@ protected Sensor registerSensor(String sensorName, MeasurableStat... stats) { return super.registerSensor(getFullMetricName(sensorName), null, stats); } + @Override + protected Lazy registerLazySensor(String sensorName, MeasurableStat... stats) { + return Lazy.of(() -> registerSensor(getFullMetricName(sensorName), stats)); + } + protected Sensor registerSensor(String sensorName, Sensor[] parents, MeasurableStat... stats) { return super.registerSensor(getFullMetricName(sensorName), parents, stats); } @@ -59,4 +65,8 @@ protected Sensor registerSensorWithDetailedPercentiles(String sensorName, Measur newStats[stats.length] = TehutiUtils.getPercentileStatForNetworkLatency(getName(), getFullMetricName(sensorName)); return registerSensor(sensorName, newStats); } + + protected Lazy registerLazySensorWithDetailedPercentiles(String sensorName, MeasurableStat... stats) { + return Lazy.of(() -> registerSensorWithDetailedPercentiles(sensorName, stats)); + } } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java index 1aad9bc3b8..9cd9f4f253 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java @@ -4,6 +4,7 @@ import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.lazy.Lazy; import io.tehuti.metrics.MeasurableStat; import io.tehuti.metrics.MetricConfig; import io.tehuti.metrics.MetricsRepository; @@ -57,6 +58,10 @@ protected Sensor registerSensor(String sensorName, MeasurableStat... stats) { return registerSensor(getSensorFullName(getName(), sensorName), null, null, stats); } + protected Lazy registerLazySensor(String sensorName, MeasurableStat... stats) { + return Lazy.of(() -> registerSensor(sensorName, stats)); + } + protected Sensor registerSensor(NamedMeasurableStat... stats) { if (stats.length == 0) { throw new IllegalArgumentException("At least one stat must be provided"); @@ -227,6 +232,10 @@ protected Sensor registerSensorIfAbsent(String sensorName, MeasurableStat... sta return registerSensorIfAbsent(getName(), sensorName, null, null, stats); } + protected Lazy registerLazySensorIfAbsent(String sensorName, MeasurableStat... stats) { + return Lazy.of(() -> registerSensorIfAbsent(sensorName, stats)); + } + protected Sensor registerSensorIfAbsent(NamedMeasurableStat... stats) { if (stats.length == 0) { throw new IllegalArgumentException("At least one stat must be provided");