From 14463d1a0f2ecc850ab9fc0fede616e784baa1e0 Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Wed, 9 Oct 2024 14:23:04 +0800 Subject: [PATCH] [INLONG-11201][Sort] Enhanced sink metric instrumentation for Flink StarRocks Connector (#11206) --- .github/workflows/ci_ut.yml | 2 +- .../apache/inlong/sort/base/Constants.java | 6 + .../sort/base/metric/SinkExactlyMetric.java | 172 ++++++++++++++++++ .../table/StarRocksDynamicSinkFunctionV2.java | 45 ++++- 4 files changed, 222 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml index 49bad2e2b67..8d8587f488d 100644 --- a/.github/workflows/ci_ut.yml +++ b/.github/workflows/ci_ut.yml @@ -138,4 +138,4 @@ jobs: if-no-files-found: ignore - name: Clean up build packages - run: mvn clean + run: mvn clean \ No newline at end of file diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 219ef02dcaf..c13c47f66ee 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -81,6 +81,12 @@ public final class Constants { public static final String SNAPSHOT_TO_CHECKPOINT_TIME_LAG = "snapshotToCheckpointTimeLag"; + public static final String NUM_SERIALIZE_SUCCESS = "numSerializeSuccess"; + + public static final String NUM_SERIALIZE_ERROR = "numSerializeError"; + + public static final String SERIALIZE_TIME_LAG = "serializeTimeLag"; + /** * Timestamp when the read phase changed */ diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java index cb3d20f389b..c9b9ace9f40 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java @@ -21,6 +21,7 @@ import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; @@ -39,6 +40,13 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND; +import static org.apache.inlong.sort.base.Constants.NUM_SERIALIZE_ERROR; +import static org.apache.inlong.sort.base.Constants.NUM_SERIALIZE_SUCCESS; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_COMPLETE; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_CREATE; +import static org.apache.inlong.sort.base.Constants.NUM_SNAPSHOT_ERROR; +import static org.apache.inlong.sort.base.Constants.SERIALIZE_TIME_LAG; +import static org.apache.inlong.sort.base.Constants.SNAPSHOT_TO_CHECKPOINT_TIME_LAG; import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize; /** @@ -59,11 +67,24 @@ public class SinkExactlyMetric implements MetricData, Serializable { private Counter numBytesOutForMeter; private Counter dirtyRecordsOut; private Counter dirtyBytesOut; + private Counter numSerializeSuccess; + private Counter numSerializeError; + private Gauge serializeTimeLag; + /** number of attempts to create a snapshot, i.e. snapshotState() method call */ + private Counter numSnapshotCreate; + /** number of errors when creating a snapshot */ + + private Counter numSnapshotError; + /** number of successful snapshot completions, i.e. notifyCheckpointComplete method call */ + private Counter numSnapshotComplete; + private Gauge snapshotToCheckpointTimeLag; private Meter numRecordsOutPerSecond; private Meter numBytesOutPerSecond; private List auditKeys; private Long currentCheckpointId = 0L; private Long lastCheckpointId = 0L; + private Long snapshotToCheckpointDelay = 0L; + private Long serializeDelay = 0L; public SinkExactlyMetric(MetricOption option, MetricGroup metricGroup) { this.metricGroup = metricGroup; @@ -78,6 +99,8 @@ public SinkExactlyMetric(MetricOption option, MetricGroup metricGroup) { case DIRTY: registerMetricsForDirtyBytesOut(new ThreadSafeCounter()); registerMetricsForDirtyRecordsOut(new ThreadSafeCounter()); + registerMetricForNumSerializeError(new ThreadSafeCounter()); + registerMetricForNumSnapshotError(new ThreadSafeCounter()); break; case NORMAL: recordsOutCounter.inc(option.getInitRecords()); @@ -88,6 +111,11 @@ public SinkExactlyMetric(MetricOption option, MetricGroup metricGroup) { registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter()); registerMetricsForNumBytesOutPerSecond(); registerMetricsForNumRecordsOutPerSecond(); + registerMetricForSnapshotToCheckpointTimeLag(); + registerMetricForSerializeTimeLag(); + registerMetricForNumSerializeSuccess(new ThreadSafeCounter()); + registerMetricForNumSnapshotCreate(new ThreadSafeCounter()); + registerMetricForNumSnapshotComplete(new ThreadSafeCounter()); break; default: recordsOutCounter.inc(option.getInitRecords()); @@ -102,6 +130,13 @@ public SinkExactlyMetric(MetricOption option, MetricGroup metricGroup) { registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter()); registerMetricsForNumBytesOutPerSecond(); registerMetricsForNumRecordsOutPerSecond(); + registerMetricForSnapshotToCheckpointTimeLag(); + registerMetricForSerializeTimeLag(); + registerMetricForNumSerializeSuccess(new ThreadSafeCounter()); + registerMetricForNumSerializeError(new ThreadSafeCounter()); + registerMetricForNumSnapshotCreate(new ThreadSafeCounter()); + registerMetricForNumSnapshotError(new ThreadSafeCounter()); + registerMetricForNumSnapshotComplete(new ThreadSafeCounter()); break; } @@ -184,6 +219,55 @@ public void registerMetricsForDirtyBytesOut(Counter counter) { dirtyBytesOut = registerCounter(DIRTY_BYTES_OUT, counter); } + public void registerMetricForNumSerializeSuccess() { + numSerializeSuccess = registerCounter(NUM_SERIALIZE_SUCCESS, new SimpleCounter()); + } + + public void registerMetricForNumSerializeSuccess(Counter counter) { + numSerializeSuccess = registerCounter(NUM_SERIALIZE_SUCCESS, counter); + } + + public void registerMetricForNumSerializeError() { + numSerializeError = registerCounter(NUM_SERIALIZE_ERROR, new SimpleCounter()); + } + + public void registerMetricForNumSerializeError(Counter counter) { + numSerializeError = registerCounter(NUM_SERIALIZE_ERROR, counter); + } + + public void registerMetricForSerializeTimeLag() { + serializeTimeLag = registerGauge(SERIALIZE_TIME_LAG, (Gauge) this::getSerializeDelay); + } + + public void registerMetricForNumSnapshotCreate() { + numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, new SimpleCounter()); + } + + public void registerMetricForNumSnapshotCreate(Counter counter) { + numSnapshotCreate = registerCounter(NUM_SNAPSHOT_CREATE, counter); + } + + public void registerMetricForNumSnapshotError() { + numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, new SimpleCounter()); + } + + public void registerMetricForNumSnapshotError(Counter counter) { + numSnapshotError = registerCounter(NUM_SNAPSHOT_ERROR, counter); + } + + public void registerMetricForNumSnapshotComplete() { + numSnapshotComplete = registerCounter(NUM_SNAPSHOT_COMPLETE, new SimpleCounter()); + } + + public void registerMetricForNumSnapshotComplete(Counter counter) { + numSnapshotComplete = registerCounter(NUM_SNAPSHOT_COMPLETE, counter); + } + + public void registerMetricForSnapshotToCheckpointTimeLag() { + snapshotToCheckpointTimeLag = + registerGauge(SNAPSHOT_TO_CHECKPOINT_TIME_LAG, (Gauge) this::getSnapshotToCheckpointDelay); + } + public Counter getNumRecordsOut() { return numRecordsOut; } @@ -208,6 +292,50 @@ public Meter getNumBytesOutPerSecond() { return numBytesOutPerSecond; } + public Counter getNumSerializeSuccess() { + return numSerializeSuccess; + } + + public Counter getNumSerializeError() { + return numSerializeError; + } + + public Counter getNumSnapshotCreate() { + return numSnapshotCreate; + } + + public Counter getNumSnapshotError() { + return numSnapshotError; + } + + public Counter getNumSnapshotComplete() { + return numSnapshotComplete; + } + + public Gauge getSerializeTimeLag() { + return serializeTimeLag; + } + + public Gauge getSnapshotToCheckpointTimeLag() { + return snapshotToCheckpointTimeLag; + } + + public Long getSnapshotToCheckpointDelay() { + return snapshotToCheckpointDelay; + } + + public Long getSerializeDelay() { + return serializeDelay; + } + + public void recordSerializeDelay(Long delay) { + this.serializeDelay = delay; + } + + public void recordSnapshotToCheckpointDelay(Long delay) { + this.snapshotToCheckpointDelay = delay; + } + @Override public MetricGroup getMetricGroup() { return metricGroup; @@ -218,6 +346,36 @@ public Map getLabels() { return labels; } + public void incNumSerializeSuccess() { + if (numSerializeSuccess != null) { + numSerializeSuccess.inc(); + } + } + + public void incNumSerializeError() { + if (numSerializeError != null) { + numSerializeError.inc(); + } + } + + public void incNumSnapshotCreate() { + if (numSnapshotCreate != null) { + numSnapshotCreate.inc(); + } + } + + public void incNumSnapshotError() { + if (numSnapshotError != null) { + numSnapshotError.inc(); + } + } + + public void incNumSnapshotComplete() { + if (numSnapshotComplete != null) { + numSnapshotComplete.inc(); + } + } + public Counter getNumRecordsOutForMeter() { return numRecordsOutForMeter; } @@ -301,6 +459,8 @@ public String toString() { + ", labels=" + labels + ", dirtyRecords=" + dirtyRecordsOut.getCount() + ", dirtyBytes=" + dirtyBytesOut.getCount() + + ", numSerializeError=" + numSerializeError.getCount() + + ", numSnapshotError=" + numSnapshotError.getCount() + '}'; case NORMAL: return "SinkMetricData{" @@ -313,6 +473,11 @@ public String toString() { + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount() + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate() + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate() + + ", serializeTimeLag=" + serializeTimeLag.getValue() + + ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue() + + ", numSerializeSuccess=" + numSerializeSuccess.getCount() + + ", numSnapshotCreate=" + numSnapshotCreate.getCount() + + ", numSnapshotComplete=" + numSnapshotComplete.getCount() + '}'; default: return "SinkMetricData{" @@ -327,6 +492,13 @@ public String toString() { + ", dirtyBytesOut=" + dirtyBytesOut.getCount() + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate() + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate() + + ", serializeTimeLag=" + serializeTimeLag.getValue() + + ", snapshotToCheckpointTimeLag=" + snapshotToCheckpointTimeLag.getValue() + + ", numSerializeSuccess=" + numSerializeSuccess.getCount() + + ", numSnapshotCreate=" + numSnapshotCreate.getCount() + + ", numSnapshotComplete=" + numSnapshotComplete.getCount() + + ", numSerializeError=" + numSerializeError.getCount() + + ", numSnapshotError=" + numSnapshotError.getCount() + '}'; } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java index b4e915eec97..a829ed26b70 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java @@ -101,6 +101,9 @@ public class StarRocksDynamicSinkFunctionV2 extends StarRocksDynamicSinkFunct private SchemaUtils schemaUtils; private String stateKey; + /** The map to store the start time of each checkpoint. */ + private transient Map checkpointStartTimeMap; + public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer rowTransformer, String inlongMetric, @@ -208,12 +211,26 @@ public void invoke(T value, Context context) flushLegacyData(); Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete()); - + long serializeStartTime = System.currentTimeMillis(); + String serializedData; + try { + serializedData = serializer.serialize(data); + } catch (Exception e) { + log.error("Failed to serialize data", e); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.incNumSerializeError(); + } + return; + } + if (sinkExactlyMetric != null) { + sinkExactlyMetric.incNumSerializeSuccess(); + sinkExactlyMetric.recordSerializeDelay(System.currentTimeMillis() - serializeStartTime); + } sinkManager.write( null, sinkOptions.getDatabaseName(), sinkOptions.getTableName(), - serializer.serialize(schemaUtils.filterOutTimeField(data))); + serializedData); ouputMetrics(value, data); } @@ -243,6 +260,8 @@ public void open(Configuration parameters) { commitTransaction(Long.MAX_VALUE); log.info("Open sink function v2. {}", EnvUtils.getGitInformation()); + + checkpointStartTimeMap = new HashMap<>(); } @Override @@ -266,10 +285,16 @@ public void close() { @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + // record the start time of each checkpoint + checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis()); + updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId()); sinkManager.flush(); if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { + if (sinkExactlyMetric != null) { + sinkExactlyMetric.incNumSnapshotCreate(); + } return; } @@ -280,8 +305,15 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw snapshotStates.clear(); snapshotStates.add(StarrocksSnapshotState.of(snapshotMap)); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.incNumSnapshotCreate(); + } } else { sinkManager.abort(snapshot); + checkpointStartTimeMap.remove(functionSnapshotContext.getCheckpointId()); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.incNumSnapshotError(); + } throw new RuntimeException("Snapshot state failed by prepare"); } @@ -343,6 +375,15 @@ public void notifyCheckpointComplete(long checkpointId) { commitTransaction(checkpointId); flushAudit(); updateLastCheckpointId(checkpointId); + if (sinkExactlyMetric != null) { + sinkExactlyMetric.incNumSnapshotComplete(); + } + // get the start time of the currently completed checkpoint + Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId); + if (snapShotStartTimeById != null && sinkExactlyMetric != null) { + sinkExactlyMetric + .recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById); + } } private void commitTransaction(long checkpointId) {