Skip to content

Commit

Permalink
Favor composition in FlinkMetricsContainer implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Sep 27, 2023
1 parent b332eeb commit 3f2c5e7
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,6 +40,7 @@ public class FlinkMetricContainer extends FlinkMetricContainerBase {
private final RuntimeContext runtimeContext;

public FlinkMetricContainer(RuntimeContext runtimeContext) {
super(runtimeContext.getMetricGroup());
this.runtimeContext = runtimeContext;
}

Expand All @@ -65,8 +65,4 @@ public void registerMetricsForPipelineResult() {
metricsAccumulator.add(metricsContainers);
}

@Override
protected MetricGroup getMetricGroup() {
return runtimeContext.getMetricGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,19 @@ abstract class FlinkMetricContainerBase {
private final Map<String, Counter> flinkCounterCache;
private final Map<String, FlinkDistributionGauge> flinkDistributionGaugeCache;
private final Map<String, FlinkGauge> flinkGaugeCache;
private final MetricGroup metricGroup;

public FlinkMetricContainerBase() {
public FlinkMetricContainerBase(MetricGroup metricGroup) {
this.flinkCounterCache = new HashMap<>();
this.flinkDistributionGaugeCache = new HashMap<>();
this.flinkGaugeCache = new HashMap<>();
this.metricsContainers = new MetricsContainerStepMap();
this.metricGroup = metricGroup;
}

protected abstract MetricGroup getMetricGroup();
public MetricGroup getMetricGroup() {
return metricGroup;
}

public MetricsContainerImpl getMetricsContainer(String stepName) {
return metricsContainers.getContainer(stepName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,7 @@
* @see FlinkMetricContainer
*/
public class FlinkMetricContainerWithoutAccumulator extends FlinkMetricContainerBase {
private final MetricGroup metricGroup;

public FlinkMetricContainerWithoutAccumulator(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}

@Override
protected MetricGroup getMetricGroup() {
return metricGroup;
super(metricGroup);
}
}

0 comments on commit 3f2c5e7

Please sign in to comment.