Skip to content

Commit

Permalink
Add an option to enable/disable storage stats collection as part of T…
Browse files Browse the repository at this point in the history
…ableStatsCollector (#186)

## Summary

Storage stats computation can be expensive as this could involve
recursive directory walk in the file system. Hence making the storage
stats computation optional so that this can be invoked as needed.

## Changes

- [ ] Client-facing API Changes
- [X] Internal API Changes
- [ ] Bug Fixes
- [ ] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [X] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
maluchari authored Sep 9, 2024
1 parent 896320a commit b37e2ac
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,15 @@ static String partitionToString(StructLike partition) {
* Collect and publish table stats for a given fully-qualified table name.
*
* @param fqtn fully-qualified table name
* @param skipStorageStatsCollection whether to skip storage stats collection
*/
public IcebergTableStats collectTableStats(String fqtn) {
public IcebergTableStats collectTableStats(String fqtn, Boolean skipStorageStatsCollection) {
Table table = getTable(fqtn);

TableStatsCollector tableStatsCollector;
try {
tableStatsCollector = new TableStatsCollector(fs(), spark, fqtn, table);
tableStatsCollector =
new TableStatsCollector(fs(), spark, fqtn, table, skipStorageStatsCollection);
} catch (IOException e) {
log.error("Unable to initialize file system for table stats collection", e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,24 @@
* Class with main entry point to collect Iceberg stats for a table.
*
* <p>Example of invocation: com.linkedin.openhouse.jobs.spark.TableStatsCollectionSparkApp
* --tableName db.testTable
* --tableName db.testTable --skipStorageStats true
*/
@Slf4j
public class TableStatsCollectionSparkApp extends BaseTableSparkApp {

public TableStatsCollectionSparkApp(String jobId, StateManager stateManager, String fqtn) {
private final Boolean skipStorageStatsCollection;

public TableStatsCollectionSparkApp(
String jobId, StateManager stateManager, String fqtn, Boolean skipStorageStatsCollection) {
super(jobId, stateManager, fqtn);
this.skipStorageStatsCollection = skipStorageStatsCollection;
}

@Override
protected void runInner(Operations ops) {
log.info("Running TableStatsCollectorApp for table {}", fqtn);

IcebergTableStats icebergTableStats = ops.collectTableStats(fqtn);
IcebergTableStats icebergTableStats = ops.collectTableStats(fqtn, skipStorageStatsCollection);
publishStats(icebergTableStats);
}

Expand All @@ -43,10 +47,16 @@ protected void publishStats(IcebergTableStats icebergTableStats) {
public static void main(String[] args) {
List<Option> extraOptions = new ArrayList<>();
extraOptions.add(new Option("t", "tableName", true, "Fully-qualified table name"));
extraOptions.add(
new Option("s", "skipStorageStats", false, "Whether to skip storage stats collection"));

CommandLine cmdLine = createCommandLine(args, extraOptions);
TableStatsCollectionSparkApp app =
new TableStatsCollectionSparkApp(
getJobId(cmdLine), createStateManager(cmdLine), cmdLine.getOptionValue("tableName"));
getJobId(cmdLine),
createStateManager(cmdLine),
cmdLine.getOptionValue("tableName"),
cmdLine.hasOption("skipStorageStats"));
app.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class TableStatsCollector {
private SparkSession spark;
String fqtn;
Table table;
Boolean skipStorageStatsCollection;

/** Collect table stats. */
public IcebergTableStats collectTableStats() {
Expand All @@ -30,6 +31,10 @@ public IcebergTableStats collectTableStats() {
TableStatsCollectorUtil.populateStatsForSnapshots(
fqtn, table, spark, statsWithReferenceFiles);

if (skipStorageStatsCollection) {
return statsWithCurrentSnapshot;
}

IcebergTableStats tableStats =
TableStatsCollectorUtil.populateStorageStats(fqtn, table, fs, statsWithCurrentSnapshot);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,15 @@ public void testCollectTableStats() throws Exception {
final int numInserts = 3;
try (Operations ops = Operations.withCatalog(getSparkSession(), meter)) {
prepareTable(ops, tableName);
IcebergTableStats stats = ops.collectTableStats(tableName);
IcebergTableStats stats = ops.collectTableStats(tableName, true);

// Validate empty data files case
Assertions.assertEquals(stats.getNumReferencedDataFiles(), 0);
Assertions.assertEquals(stats.getNumExistingMetadataJsonFiles(), 1);
long modifiedTimeStamp = System.currentTimeMillis();

populateTable(ops, tableName, 1);
stats = ops.collectTableStats(tableName);
stats = ops.collectTableStats(tableName, true);
Assertions.assertEquals(stats.getNumReferencedDataFiles(), 1);
Assertions.assertTrue(stats.getTableLastUpdatedTimestamp() >= modifiedTimeStamp);

Expand All @@ -553,13 +553,15 @@ public void testCollectTableStats() throws Exception {
populateTable(ops, tableName, numInserts);
table = ops.getTable(tableName);
log.info("Loaded table {}, location {}", table.name(), table.location());
stats = ops.collectTableStats(tableName);
stats = ops.collectTableStats(tableName, true);
Assertions.assertEquals(stats.getCurrentSnapshotId(), table.currentSnapshot().snapshotId());
Assertions.assertEquals(stats.getNumReferencedDataFiles(), numInserts + 1);
Assertions.assertEquals(stats.getNumExistingMetadataJsonFiles(), numInserts + 2);
Assertions.assertEquals(
stats.getCurrentSnapshotTimestamp(), table.currentSnapshot().timestampMillis());
Assertions.assertEquals(stats.getOldestSnapshotTimestamp(), oldestSnapshot);
Assertions.assertEquals(stats.getNumObjectsInDirectory(), null);
stats = ops.collectTableStats(tableName, false);
Assertions.assertEquals(
stats.getNumObjectsInDirectory(),
stats.getNumReferencedDataFiles()
Expand Down

0 comments on commit b37e2ac

Please sign in to comment.