Skip to content

Commit

Permalink
Change quick stats classes to use ExtendedHiveMetastore
Browse files Browse the repository at this point in the history
  • Loading branch information
aaneja committed Nov 4, 2024
1 parent fc9131c commit c6762eb
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public S3SelectTestHelper(String host,
new HivePartitionStats(),
new HiveFileRenamer(),
columnConverterProvider,
new QuickStatsProvider(hdfsEnvironment, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new QuickStatsProvider(metastoreClient, hdfsEnvironment, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(config));
transactionManager = new HiveTransactionManager();
splitManager = new HiveSplitManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.hive.cache.HiveCachingHdfsConfiguration;
import com.facebook.presto.hive.datasink.DataSinkFactory;
import com.facebook.presto.hive.datasink.OutputStreamDataSinkFactory;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HiveMetastoreCacheStats;
import com.facebook.presto.hive.metastore.HivePartitionMutator;
import com.facebook.presto.hive.metastore.MetastoreCacheStats;
Expand Down Expand Up @@ -377,15 +378,18 @@ public ParquetMetadataSource createParquetMetadataSource(ParquetCacheConfig parq

@Singleton
@Provides
public QuickStatsProvider createQuickStatsProvider(HdfsEnvironment hdfsEnvironment,
public QuickStatsProvider createQuickStatsProvider(
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
DirectoryLister directoryLister,
HiveClientConfig hiveClientConfig,
NamenodeStats nameNodeStats,
FileFormatDataSourceStats fileFormatDataSourceStats,
MBeanExporter exporter)
{
ParquetQuickStatsBuilder parquetQuickStatsBuilder = new ParquetQuickStatsBuilder(fileFormatDataSourceStats, hdfsEnvironment, hiveClientConfig);
QuickStatsProvider quickStatsProvider = new QuickStatsProvider(hdfsEnvironment,
QuickStatsProvider quickStatsProvider = new QuickStatsProvider(metastore,
hdfsEnvironment,
directoryLister,
hiveClientConfig,
nameNodeStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private Map<String, PartitionStatistics> getPartitionsStatistics(ConnectorSessio
PartitionStatistics tableStatistics = metastore.getTableStatistics(metastoreContext, table.getSchemaName(), table.getTableName());
if (isQuickStatsEnabled(session) &&
(tableStatistics.equals(empty()) || tableStatistics.getColumnStatistics().isEmpty())) {
tableStatistics = quickStatsProvider.getQuickStats(session, metastore, table, metastoreContext, UNPARTITIONED_ID.getPartitionName());
tableStatistics = quickStatsProvider.getQuickStats(session, table, metastoreContext, UNPARTITIONED_ID.getPartitionName());
}
return ImmutableMap.of(UNPARTITIONED_ID.getPartitionName(), tableStatistics);
}
Expand All @@ -156,7 +156,7 @@ private Map<String, PartitionStatistics> getPartitionsStatistics(ConnectorSessio
.map(Map.Entry::getKey)
.collect(toImmutableList());

Map<String, PartitionStatistics> partitionQuickStats = quickStatsProvider.getQuickStats(session, metastore, table, metastoreContext, partitionsWithNoStats);
Map<String, PartitionStatistics> partitionQuickStats = quickStatsProvider.getQuickStats(session, table, metastoreContext, partitionsWithNoStats);

HashMap<String, PartitionStatistics> mergedMap = new HashMap<>(partitionStatistics);
mergedMap.putAll(partitionQuickStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.HiveFileInfo;
import com.facebook.presto.hive.PartitionNameWithVersion;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.parquet.ParquetDataSource;
Expand Down Expand Up @@ -283,7 +283,7 @@ public ThreadPoolExecutorMBean getExecutor()
}

@Override
public PartitionQuickStats buildQuickStats(ConnectorSession session, SemiTransactionalHiveMetastore metastore,
public PartitionQuickStats buildQuickStats(ConnectorSession session, ExtendedHiveMetastore metastore,
SchemaTableName table, MetastoreContext metastoreContext, String partitionId, Iterator<HiveFileInfo> files)
{
requireNonNull(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package com.facebook.presto.hive.statistics;

import com.facebook.presto.hive.HiveFileInfo;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;

Expand All @@ -25,6 +25,6 @@
@FunctionalInterface
public interface QuickStatsBuilder
{
PartitionQuickStats buildQuickStats(ConnectorSession session, SemiTransactionalHiveMetastore metastore, SchemaTableName table,
PartitionQuickStats buildQuickStats(ConnectorSession session, ExtendedHiveMetastore metastore, SchemaTableName table,
MetastoreContext metastoreContext, String partitionId, Iterator<HiveFileInfo> files);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import com.facebook.presto.hive.NamenodeStats;
import com.facebook.presto.hive.PartitionNameWithVersion;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
Expand Down Expand Up @@ -101,10 +101,16 @@ public class QuickStatsProvider
private final Cache<String, PartitionStatistics> partitionToStatsCache;
private final NamenodeStats nameNodeStats;
private final TimeStat buildDuration = new TimeStat(MILLISECONDS);
private final ExtendedHiveMetastore metastore;

public QuickStatsProvider(HdfsEnvironment hdfsEnvironment, DirectoryLister directoryLister, HiveClientConfig hiveClientConfig, NamenodeStats nameNodeStats,
public QuickStatsProvider(ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
DirectoryLister directoryLister,
HiveClientConfig hiveClientConfig,
NamenodeStats nameNodeStats,
List<QuickStatsBuilder> statsBuilderStrategies)
{
this.metastore = metastore;
this.hdfsEnvironment = hdfsEnvironment;
this.directoryLister = directoryLister;
this.recursiveDirWalkerEnabled = hiveClientConfig.getRecursiveDirWalkerEnabled();
Expand Down Expand Up @@ -151,7 +157,7 @@ public Map<String, Instant> getInProgressBuildsSnapshot()
return inProgressBuilds.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, v -> v.getValue().getBuildStart()));
}

public Map<String, PartitionStatistics> getQuickStats(ConnectorSession session, SemiTransactionalHiveMetastore metastore, SchemaTableName table,
public Map<String, PartitionStatistics> getQuickStats(ConnectorSession session, SchemaTableName table,
MetastoreContext metastoreContext, List<String> partitionIds)
{
if (!isQuickStatsEnabled(session)) {
Expand All @@ -161,7 +167,7 @@ public Map<String, PartitionStatistics> getQuickStats(ConnectorSession session,
CompletableFuture<PartitionStatistics>[] partitionQuickStatCompletableFutures = new CompletableFuture[partitionIds.size()];
for (int counter = 0; counter < partitionIds.size(); counter++) {
String partitionId = partitionIds.get(counter);
partitionQuickStatCompletableFutures[counter] = supplyAsync(() -> getQuickStats(session, metastore, table, metastoreContext, partitionId), backgroundFetchExecutor);
partitionQuickStatCompletableFutures[counter] = supplyAsync(() -> getQuickStats(session, table, metastoreContext, partitionId), backgroundFetchExecutor);
}

try {
Expand Down Expand Up @@ -203,7 +209,7 @@ public Map<String, PartitionStatistics> getQuickStats(ConnectorSession session,
return result.build();
}

public PartitionStatistics getQuickStats(ConnectorSession session, SemiTransactionalHiveMetastore metastore, SchemaTableName table,
public PartitionStatistics getQuickStats(ConnectorSession session, SchemaTableName table,
MetastoreContext metastoreContext, String partitionId)
{
if (!isQuickStatsEnabled(session)) {
Expand Down Expand Up @@ -236,7 +242,7 @@ public PartitionStatistics getQuickStats(ConnectorSession session, SemiTransacti
// If not, atomically initiate a call to build quick stats in a background thread
AtomicReference<CompletableFuture<PartitionStatistics>> partitionStatisticsCompletableFuture = new AtomicReference<>();
inProgressBuilds.computeIfAbsent(partitionKey, (key) -> {
CompletableFuture<PartitionStatistics> fetchFuture = supplyAsync(() -> buildQuickStats(partitionKey, partitionId, session, metastore, table, metastoreContext), backgroundFetchExecutor);
CompletableFuture<PartitionStatistics> fetchFuture = supplyAsync(() -> buildQuickStats(partitionKey, partitionId, session, table, metastoreContext), backgroundFetchExecutor);
partitionStatisticsCompletableFuture.set(fetchFuture);

return new InProgressBuildInfo(fetchFuture, Instant.now());
Expand Down Expand Up @@ -282,7 +288,7 @@ public PartitionStatistics getQuickStats(ConnectorSession session, SemiTransacti
else {
// The quick stats inline fetch was pre-empted by another thread
// We get the up-to-date value by calling getQuickStats again
return getQuickStats(session, metastore, table, metastoreContext, partitionId);
return getQuickStats(session, table, metastoreContext, partitionId);
}
}

Expand Down Expand Up @@ -311,7 +317,7 @@ private long getQuickStatsInlineBuildTimeoutMillis(ConnectorSession session)
return getQuickStatsInlineBuildTimeout(session).toMillis();
}

private PartitionStatistics buildQuickStats(String partitionKey, String partitionId, ConnectorSession session, SemiTransactionalHiveMetastore metastore, SchemaTableName table,
private PartitionStatistics buildQuickStats(String partitionKey, String partitionId, ConnectorSession session, SchemaTableName table,
MetastoreContext metastoreContext)
{
Table resolvedTable = metastore.getTable(metastoreContext, table.getSchemaName(), table.getTableName()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
new HivePartitionStats(),
new HiveFileRenamer(),
DEFAULT_COLUMN_CONVERTER_PROVIDER,
new QuickStatsProvider(HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new QuickStatsProvider(metastoreClient, HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(false));

transactionManager = new HiveTransactionManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive
new HivePartitionStats(),
new HiveFileRenamer(),
columnConverterProvider,
new QuickStatsProvider(HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new QuickStatsProvider(metastoreClient, HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(config));

transactionManager = new HiveTransactionManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private HiveMetadata getHiveMetadata(TestingExtendedHiveMetastore metastore, Hiv
new HivePartitionStats(),
new HiveFileRenamer(),
HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER,
new QuickStatsProvider(HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new QuickStatsProvider(metastore, HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(false));
return hiveMetadataFactory.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void setup()
new HivePartitionStats(),
new HiveFileRenamer(),
HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER,
new QuickStatsProvider(HDFS_ENVIRONMENT, HiveTestUtils.DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new QuickStatsProvider(metastore, HDFS_ENVIRONMENT, HiveTestUtils.DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(false));

metastore.createDatabase(METASTORE_CONTEXT, Database.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,9 @@ private void assertRedundantColumnDomains(Range predicateRange, PartitionStatist
new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, new MetastoreClientConfig()), ImmutableSet.of(), hiveClientConfig),
new MetastoreClientConfig(),
new NoHdfsAuthentication());
TestingExtendedHiveMetastore metastore = new TestingExtendedHiveMetastore(TEST_TABLE, partitionWithStatistics);
HiveMetadataFactory metadataFactory = new HiveMetadataFactory(
new TestingExtendedHiveMetastore(TEST_TABLE, partitionWithStatistics),
metastore,
hdfsEnvironment,
new HivePartitionManager(FUNCTION_AND_TYPE_MANAGER, hiveClientConfig),
DateTimeZone.forOffsetHours(1),
Expand Down Expand Up @@ -531,7 +532,7 @@ private void assertRedundantColumnDomains(Range predicateRange, PartitionStatist
new HivePartitionStats(),
new HiveFileRenamer(),
HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER,
new QuickStatsProvider(HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new QuickStatsProvider(metastore, HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(false));

HiveSplitManager splitManager = new HiveSplitManager(
Expand Down Expand Up @@ -646,8 +647,9 @@ public void testEncryptionInformation()
new NoHdfsAuthentication());
HiveEncryptionInformationProvider encryptionInformationProvider = new HiveEncryptionInformationProvider(ImmutableList.of(new TestDwrfEncryptionInformationSource()));

TestingExtendedHiveMetastore metastore = new TestingExtendedHiveMetastore(testTable, partitionWithStatistics);
HiveMetadataFactory metadataFactory = new HiveMetadataFactory(
new TestingExtendedHiveMetastore(testTable, partitionWithStatistics),
metastore,
hdfsEnvironment,
new HivePartitionManager(FUNCTION_AND_TYPE_MANAGER, hiveClientConfig),
DateTimeZone.forOffsetHours(1),
Expand Down Expand Up @@ -678,7 +680,7 @@ public void testEncryptionInformation()
new HivePartitionStats(),
new HiveFileRenamer(),
HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER,
new QuickStatsProvider(HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new QuickStatsProvider(metastore, HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(false));

HiveSplitManager splitManager = new HiveSplitManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.hive.OrcFileWriterConfig;
import com.facebook.presto.hive.ParquetFileWriterConfig;
import com.facebook.presto.hive.PartitionNameWithVersion;
import com.facebook.presto.hive.TestingExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.DateStatistics;
import com.facebook.presto.hive.metastore.DecimalStatistics;
import com.facebook.presto.hive.metastore.DoubleStatistics;
Expand Down Expand Up @@ -107,7 +108,7 @@ public class TestMetastoreHiveStatisticsProvider
private static final HiveColumnHandle PARTITION_COLUMN_1 = new HiveColumnHandle("p1", HIVE_STRING, VARCHAR.getTypeSignature(), 0, PARTITION_KEY, Optional.empty(), Optional.empty());
private static final HiveColumnHandle PARTITION_COLUMN_2 = new HiveColumnHandle("p2", HIVE_LONG, BIGINT.getTypeSignature(), 1, PARTITION_KEY, Optional.empty(), Optional.empty());

private static final QuickStatsProvider quickStatsProvider = new QuickStatsProvider(HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of());
private static final QuickStatsProvider quickStatsProvider = new QuickStatsProvider(new TestingExtendedHiveMetastore(), HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of());

@Test
public void testGetPartitionsSample()
Expand Down
Loading

0 comments on commit c6762eb

Please sign in to comment.