Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture number of snapshots created per day as a metric #149

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -114,16 +119,42 @@ protected static IcebergTableStats populateStatsForSnapshots(
.min(Long::compareTo)
.orElse(null);

Map<String, Long> snapshotCountByDay =
getSnapShotDistributionPerDay(table, spark, MetadataTableType.SNAPSHOTS);

return stats
.toBuilder()
.currentSnapshotId(currentSnapshotId)
.currentSnapshotTimestamp(currentSnapshotTimestamp)
.oldestSnapshotTimestamp(oldestSnapshotTimestamp)
.numCurrentSnapshotReferencedDataFiles(countOfDataFiles)
.totalCurrentSnapshotReferencedDataFilesSizeInBytes(sumOfFileSizeBytes)
.snapshotCountByDay(snapshotCountByDay)
.build();
}

/** Get snapshot distribution for a given table by date. */
private static Map<String, Long> getSnapshotDistributionPerDay(
Table table, SparkSession spark, MetadataTableType metadataTableType) {
Dataset<Row> snapShotDistribution =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Dataset<Row> snapShotDistribution =
Dataset<Row> snapshotDistribution =

SparkTableUtil.loadMetadataTable(spark, table, metadataTableType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this get all snapshots committed from beginning of table?
If yes, should it have a filter criteria as well to get snapshots count only in last X days?

.selectExpr(new String[] {"snapshot_id", "committed_at"})
.dropDuplicates("snapshot_id", "committed_at");

Map<String, Long> snapshotCountByDay =
snapShotDistribution.collectAsList().stream()
.collect(
Collectors.toMap(
row -> {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
return formatter.format(new Date(row.getTimestamp(1).getTime()));
},
row -> 1L,
Long::sum,
LinkedHashMap::new));
Comment on lines +147 to +154
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be better to implement this before collectAsList?

return snapshotCountByDay;
}

/**
* Collect storage stats for a given fully-qualified table name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import com.linkedin.openhouse.tables.client.model.Retention;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import io.opentelemetry.api.metrics.Meter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -566,6 +569,18 @@ public void testCollectTableStats() throws Exception {
+ stats.getNumExistingMetadataJsonFiles()
+ stats.getNumReferencedManifestFiles()
+ stats.getNumReferencedManifestLists());
AtomicLong snapShotCount = new AtomicLong();

table
.snapshots()
.forEach(
snapshot -> {
snapShotCount.getAndIncrement();
});

SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
Assertions.assertEquals(
stats.getSnapshotCountByDay().get(formatter.format(new Date())), snapShotCount.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.openhouse.common.stats.model;

import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -35,4 +36,6 @@ public class IcebergTableStats extends BaseTableMetadata {
private Long numReferencedManifestFiles;

private Long numReferencedManifestLists;

private Map<String, Long> snapshotCountByDay;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be a large map? If key is all days.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if SE is functioning fine this should only have a bounded number of days?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is right. It should ideally have only 3 days worth of data. But we can consider collecting only past 2 days since we should already have the old data from previous runs

}
Loading