-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dvc][fc][tc] Lazily register client-side metrics #1104
base: main
Are you sure you want to change the base?
Changes from all commits
430d0ed
a39aab1
3bb215f
70343be
0e4efb8
ee1f1cc
6e01889
e5a1184
cae4dac
b9f02e8
08d6416
37328cb
a642740
237fb50
a7746f6
b83eab8
d17737b
1650f93
9a13271
e744219
b1de07a
fc87595
8b0540f
de408b2
99a13da
159128b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
import com.linkedin.davinci.kafka.consumer.StoreIngestionTask; | ||
import com.linkedin.venice.stats.LongAdderRateGauge; | ||
import com.linkedin.venice.utils.RegionUtils; | ||
import com.linkedin.venice.utils.lazy.Lazy; | ||
import io.tehuti.metrics.MetricConfig; | ||
import io.tehuti.metrics.MetricsRepository; | ||
import io.tehuti.metrics.Sensor; | ||
|
@@ -93,9 +94,9 @@ public class IngestionStats { | |
private final WritePathLatencySensor consumedRecordEndToEndProcessingLatencySensor; | ||
private final WritePathLatencySensor nearlineProducerToLocalBrokerLatencySensor; | ||
private final WritePathLatencySensor nearlineLocalBrokerToReadyToServeLatencySensor; | ||
private WritePathLatencySensor transformerLatencySensor; | ||
private WritePathLatencySensor transformerLifecycleStartLatencySensor; | ||
private WritePathLatencySensor transformerLifecycleEndLatencySensor; | ||
private final WritePathLatencySensor transformerLatencySensor; | ||
private final WritePathLatencySensor transformerLifecycleStartLatencySensor; | ||
private final WritePathLatencySensor transformerLifecycleEndLatencySensor; | ||
private final WritePathLatencySensor producerCallBackLatency; | ||
private final WritePathLatencySensor leaderPreprocessingLatency; | ||
private final WritePathLatencySensor internalPreprocessingLatency; | ||
|
@@ -109,14 +110,14 @@ public class IngestionStats { | |
|
||
/** Record a version-level offset rewind events for VTs across all stores. */ | ||
private final Count versionTopicEndOffsetRewindCount = new Count(); | ||
private final Sensor versionTopicEndOffsetRewindSensor; | ||
private final Lazy<Sensor> versionTopicEndOffsetRewindSensor; | ||
private final MetricsRepository localMetricRepository; | ||
|
||
// Measure the max idle time among partitions for a given the store on this host | ||
private final LongAdderRateGauge idleTimeSensor = new LongAdderRateGauge(); | ||
|
||
private Count transformerErrorCount = new Count(); | ||
private Sensor transformerErrorSensor; | ||
private final Count transformerErrorCount = new Count(); | ||
private final Lazy<Sensor> transformerErrorSensor; | ||
|
||
public IngestionStats(VeniceServerConfig serverConfig) { | ||
|
||
|
@@ -171,8 +172,11 @@ public IngestionStats(VeniceServerConfig serverConfig) { | |
registerSensor(localMetricRepository, LEADER_RECORDS_PRODUCED_METRIC_NAME, leaderRecordsProducedSensor); | ||
registerSensor(localMetricRepository, LEADER_BYTES_PRODUCED_METRIC_NAME, leaderBytesProducedSensor); | ||
|
||
versionTopicEndOffsetRewindSensor = localMetricRepository.sensor(VERSION_TOPIC_END_OFFSET_REWIND_COUNT); | ||
versionTopicEndOffsetRewindSensor.add(VERSION_TOPIC_END_OFFSET_REWIND_COUNT, versionTopicEndOffsetRewindCount); | ||
versionTopicEndOffsetRewindSensor = Lazy.of(() -> { | ||
Sensor versionTopicEndOffsetRewindSensor = localMetricRepository.sensor(VERSION_TOPIC_END_OFFSET_REWIND_COUNT); | ||
versionTopicEndOffsetRewindSensor.add(VERSION_TOPIC_END_OFFSET_REWIND_COUNT, versionTopicEndOffsetRewindCount); | ||
return versionTopicEndOffsetRewindSensor; | ||
}); | ||
|
||
producerSourceBrokerLatencySensor = | ||
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, "producer_to_source_broker_latency"); | ||
|
@@ -207,6 +211,17 @@ public IngestionStats(VeniceServerConfig serverConfig) { | |
registerSensor(localMetricRepository, OFFSET_REGRESSION_DCR_ERROR, offsetRegressionDCRErrorSensor); | ||
registerSensor(localMetricRepository, TOMBSTONE_CREATION_DCR, tombstoneCreationDCRSensor); | ||
registerSensor(localMetricRepository, IDLE_TIME, idleTimeSensor); | ||
|
||
transformerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LATENCY); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused here. Why are some of these lazy and some not? When do these get registered? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some metrics are required to be registered by some unit/integration tests by asserting that their value is 0. Should I modify the tests to assert that the sensor be null instead? The underlying sensor inside There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's ok to edit tests if the behavior no longer matches up. I think we should be uniform here unless theres a valid reason to not do so. |
||
transformerLifecycleStartLatencySensor = | ||
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_START_LATENCY); | ||
transformerLifecycleEndLatencySensor = | ||
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_END_LATENCY); | ||
transformerErrorSensor = Lazy.of(() -> { | ||
Sensor transformerErrorSensor = localMetricRepository.sensor(TRANSFORMER_ERROR_COUNT); | ||
transformerErrorSensor.add(TRANSFORMER_ERROR_COUNT, transformerErrorCount); | ||
return transformerErrorSensor; | ||
}); | ||
} | ||
|
||
private void registerSensor(MetricsRepository localMetricRepository, String sensorName, LongAdderRateGauge gauge) { | ||
|
@@ -376,7 +391,7 @@ public void recordInternalPreprocessingLatency(double value, long currentTimeMs) | |
} | ||
|
||
public void recordVersionTopicEndOffsetRewind() { | ||
versionTopicEndOffsetRewindSensor.record(); | ||
versionTopicEndOffsetRewindSensor.get().record(); | ||
} | ||
|
||
public double getVersionTopicEndOffsetRewindCount() { | ||
|
@@ -568,55 +583,25 @@ public void recordNearlineLocalBrokerToReadyToServeLatency(double value, long cu | |
} | ||
|
||
public void recordTransformerError(double value, long currentTimeMs) { | ||
transformerErrorSensor.record(value, currentTimeMs); | ||
} | ||
|
||
public void registerTransformerErrorSensor() { | ||
if (transformerErrorSensor == null) { | ||
transformerErrorSensor = localMetricRepository.sensor(TRANSFORMER_ERROR_COUNT); | ||
transformerErrorSensor.add(TRANSFORMER_ERROR_COUNT, transformerErrorCount); | ||
} | ||
transformerErrorSensor.get().record(value, currentTimeMs); | ||
} | ||
|
||
public double getTransformerErrorCount() { | ||
if (transformerErrorCount != null) { | ||
return transformerErrorCount.measure(METRIC_CONFIG, System.currentTimeMillis()); | ||
} | ||
return 0; | ||
return transformerErrorCount.measure(METRIC_CONFIG, System.currentTimeMillis()); | ||
} | ||
|
||
public void recordTransformerLatency(double value, long currentTimeMs) { | ||
transformerLatencySensor.record(value, currentTimeMs); | ||
} | ||
|
||
public void registerTransformerLatencySensor() { | ||
if (transformerLatencySensor == null) { | ||
transformerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LATENCY); | ||
} | ||
} | ||
|
||
public void recordTransformerLifecycleStartLatency(double value, long currentTimeMs) { | ||
transformerLifecycleStartLatencySensor.record(value, currentTimeMs); | ||
} | ||
|
||
public void registerTransformerLifecycleStartLatencySensor() { | ||
if (transformerLifecycleStartLatencySensor == null) { | ||
transformerLifecycleStartLatencySensor = | ||
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_START_LATENCY); | ||
} | ||
} | ||
|
||
public void recordTransformerLifecycleEndLatency(double value, long currentTimeMs) { | ||
transformerLifecycleEndLatencySensor.record(value, currentTimeMs); | ||
} | ||
|
||
public void registerTransformerLifecycleEndLatencySensor() { | ||
if (transformerLifecycleEndLatencySensor == null) { | ||
transformerLifecycleEndLatencySensor = | ||
new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LIFECYCLE_END_LATENCY); | ||
} | ||
} | ||
|
||
public void recordIdleTime(long value) { | ||
idleTimeSensor.record(value); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to do this one too?