diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index f60e13abd2b0..2b0151180499 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -85,10 +85,12 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.view.View; import java.util.ArrayList; import java.util.Collection; @@ -141,6 +143,7 @@ import static com.facebook.presto.iceberg.IcebergUtil.getPartitions; import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdTimeOperator; import static com.facebook.presto.iceberg.IcebergUtil.getTableComment; +import static com.facebook.presto.iceberg.IcebergUtil.getViewComment; import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName; import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns; import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation; @@ -217,6 +220,8 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName protected abstract Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName); + protected abstract View getIcebergView(ConnectorSession session, SchemaTableName schemaTableName); + protected abstract boolean tableExists(ConnectorSession session, SchemaTableName schemaTableName); public abstract void registerTable(ConnectorSession clientSession, SchemaTableName schemaTableName, Path metadataLocation); @@ -386,22 +391,31 @@ protected Optional getIcebergSystemTable(SchemaTableName tableName, public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { IcebergTableHandle icebergTableHandle = (IcebergTableHandle) table; - return getTableMetadata(session, icebergTableHandle.getSchemaTableName(), icebergTableHandle.getIcebergTableName()); + return getTableOrViewMetadata(session, icebergTableHandle.getSchemaTableName(), icebergTableHandle.getIcebergTableName()); } - protected ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table, IcebergTableName icebergTableName) + protected ConnectorTableMetadata getTableOrViewMetadata(ConnectorSession session, SchemaTableName table, IcebergTableName icebergTableName) { - Table icebergTable = getIcebergTable(session, new SchemaTableName(table.getSchemaName(), icebergTableName.getTableName())); - ImmutableList.Builder columns = ImmutableList.builder(); - columns.addAll(getColumnMetadatas(icebergTable)); - if (icebergTableName.getTableType() == CHANGELOG) { - return ChangelogUtil.getChangelogTableMeta(table, typeManager, columns.build()); + try { + Table icebergTable = getIcebergTable(session, new SchemaTableName(table.getSchemaName(), icebergTableName.getTableName())); + ImmutableList.Builder columns = ImmutableList.builder(); + columns.addAll(getColumnMetadatas(icebergTable)); + if (icebergTableName.getTableType() == CHANGELOG) { + return ChangelogUtil.getChangelogTableMeta(table, typeManager, columns.build()); + } + else { + columns.add(PATH_COLUMN_METADATA); + columns.add(DATA_SEQUENCE_NUMBER_COLUMN_METADATA); + } + return new ConnectorTableMetadata(table, columns.build(), createMetadataProperties(icebergTable), getTableComment(icebergTable)); } - else { - columns.add(PATH_COLUMN_METADATA); - columns.add(DATA_SEQUENCE_NUMBER_COLUMN_METADATA); + catch (NoSuchTableException e) { + // Considering that the Iceberg library does not provide an efficient way to determine whether + // it's a view or a table without loading it, we first try to load it as a table directly, and then + // try to load it as a view when getting an `NoSuchTableException`. This will be more efficient. + View icebergView = getIcebergView(session, new SchemaTableName(table.getSchemaName(), icebergTableName.getTableName())); + return new ConnectorTableMetadata(table, getColumnMetadatas(icebergView), createViewMetadataProperties(icebergView), getViewComment(icebergView)); } - return new ConnectorTableMetadata(table, columns.build(), createMetadataProperties(icebergTable), getTableComment(icebergTable)); } @Override @@ -414,7 +428,7 @@ public Map> listTableColumns(ConnectorSess try { IcebergTableName tableName = IcebergTableName.from(table.getTableName()); if (!tableName.isSystemTable()) { - columns.put(table, getTableMetadata(session, table, tableName).getColumns()); + columns.put(table, getTableOrViewMetadata(session, table, tableName).getColumns()); } } catch (TableNotFoundException e) { @@ -546,6 +560,18 @@ protected List getColumnMetadatas(Table table) .collect(toImmutableList()); } + protected List getColumnMetadatas(View view) + { + return view.schema().columns().stream() + .map(column -> ColumnMetadata.builder() + .setName(column.name()) + .setType(toPrestoType(column.type(), typeManager)) + .setComment(Optional.ofNullable(column.doc())) + .setHidden(false) + .build()) + .collect(toImmutableList()); + } + private static String columnExtraInfo(List partitionTransforms) { if (partitionTransforms.size() == 1 && partitionTransforms.get(0).equals("identity")) { @@ -579,6 +605,16 @@ protected ImmutableMap createMetadataProperties(Table icebergTab return properties.build(); } + protected ImmutableMap createViewMetadataProperties(View view) + { + ImmutableMap.Builder properties = ImmutableMap.builder(); + if (view.properties() != null) { + view.properties().entrySet().stream() + .forEach(entry -> properties.put(entry.getKey(), entry.getValue())); + } + return properties.build(); + } + public static Schema toIcebergSchema(List columns) { List icebergColumns = new ArrayList<>(); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index ea610a503708..d9998e6b772f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -73,6 +73,7 @@ import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.view.View; import org.joda.time.DateTimeZone; import java.io.IOException; @@ -181,6 +182,12 @@ protected org.apache.iceberg.Table getRawIcebergTable(ConnectorSession session, return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, session, schemaTableName); } + @Override + protected View getIcebergView(ConnectorSession session, SchemaTableName schemaTableName) + { + throw new PrestoException(NOT_SUPPORTED, "Iceberg Hive catalog support native Iceberg views."); + } + @Override protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTableName) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index 3277fd220929..4d55876171ae 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -24,31 +24,46 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.ConnectorViewDefinition; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.plan.FilterStatsCalculatorService; import com.facebook.presto.spi.relation.RowExpressionService; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.Path; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewBuilder; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat; import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning; import static com.facebook.presto.iceberg.IcebergTableType.DATA; +import static com.facebook.presto.iceberg.IcebergUtil.VIEW_OWNER; +import static com.facebook.presto.iceberg.IcebergUtil.createIcebergViewProperties; import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable; +import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView; import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties; import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported; import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields; @@ -68,9 +83,11 @@ public class IcebergNativeMetadata extends IcebergAbstractMetadata { private static final String INFORMATION_SCHEMA = "information_schema"; + private static final String VIEW_DIALECT = "presto"; private final IcebergNativeCatalogFactory catalogFactory; private final CatalogType catalogType; + private final ConcurrentMap icebergViews = new ConcurrentHashMap<>(); public IcebergNativeMetadata( IcebergNativeCatalogFactory catalogFactory, @@ -94,6 +111,14 @@ protected Table getRawIcebergTable(ConnectorSession session, SchemaTableName sch return getNativeIcebergTable(catalogFactory, session, schemaTableName); } + @Override + protected View getIcebergView(ConnectorSession session, SchemaTableName schemaTableName) + { + return icebergViews.computeIfAbsent( + schemaTableName, + ignored -> getNativeIcebergView(catalogFactory, session, schemaTableName)); + } + @Override protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTableName) { @@ -158,6 +183,106 @@ public void renameSchema(ConnectorSession session, String source, String target) throw new PrestoException(NOT_SUPPORTED, format("Iceberg %s catalog does not support rename namespace", catalogType.name())); } + @Override + public void createView(ConnectorSession session, ConnectorTableMetadata viewMetadata, String viewData, boolean replace) + { + Catalog catalog = catalogFactory.getCatalog(session); + if (catalog instanceof ViewCatalog) { + Schema schema = toIcebergSchema(viewMetadata.getColumns()); + ViewBuilder viewBuilder = ((ViewCatalog) catalog).buildView(TableIdentifier.of(viewMetadata.getTable().getSchemaName(), viewMetadata.getTable().getTableName())) + .withSchema(schema) + .withDefaultNamespace(Namespace.of(viewMetadata.getTable().getSchemaName())) + .withQuery(VIEW_DIALECT, viewData) + .withProperties(createIcebergViewProperties(session, nodeVersion.toString())); + if (replace) { + viewBuilder.createOrReplace(); + } + else { + viewBuilder.create(); + } + } + else { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating views"); + } + } + + @Override + public List listViews(ConnectorSession session, Optional schemaName) + { + ImmutableList.Builder tableNames = ImmutableList.builder(); + Catalog catalog = catalogFactory.getCatalog(session); + if (catalog instanceof ViewCatalog) { + for (String schema : listSchemas(session, schemaName.orElse(null))) { + try { + for (TableIdentifier tableIdentifier : ((ViewCatalog) catalog).listViews(Namespace.of(schema))) { + tableNames.add(new SchemaTableName(schema, tableIdentifier.name())); + } + } + catch (NoSuchNamespaceException e) { + // ignore + } + } + } + return tableNames.build(); + } + + private List listSchemas(ConnectorSession session, String schemaNameOrNull) + { + if (schemaNameOrNull == null) { + return listSchemaNames(session); + } + return ImmutableList.of(schemaNameOrNull); + } + + @Override + public Map getViews(ConnectorSession session, SchemaTablePrefix prefix) + { + ImmutableMap.Builder views = ImmutableMap.builder(); + Catalog catalog = catalogFactory.getCatalog(session); + if (catalog instanceof ViewCatalog) { + List tableNames; + if (prefix.getTableName() != null) { + tableNames = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); + } + else { + tableNames = listViews(session, Optional.of(prefix.getSchemaName())); + } + + for (SchemaTableName schemaTableName : tableNames) { + try { + if (((ViewCatalog) catalog).viewExists(TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()))) { + View view = ((ViewCatalog) catalog).loadView(TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName())); + verifyAndPopulateViews(view, schemaTableName, view.sqlFor(VIEW_DIALECT).sql(), views); + } + } + catch (IllegalArgumentException e) { + // Ignore illegal view names + } + } + } + return views.build(); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName viewName) + { + Catalog catalog = catalogFactory.getCatalog(session); + if (catalog instanceof ViewCatalog) { + ((ViewCatalog) catalog).dropView(TableIdentifier.of(viewName.getSchemaName(), viewName.getTableName())); + } + else { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support dropping views"); + } + } + + private void verifyAndPopulateViews(View view, SchemaTableName schemaTableName, String viewData, ImmutableMap.Builder views) + { + views.put(schemaTableName, new ConnectorViewDefinition( + schemaTableName, + Optional.ofNullable(view.properties().get(VIEW_OWNER)), + viewData)); + } + @Override public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 73f251f7f76d..7d17c02c0be4 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -69,6 +69,8 @@ import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.ViewCatalog; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.io.CloseableIterable; @@ -78,6 +80,7 @@ import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.view.View; import java.io.IOException; import java.io.UncheckedIOException; @@ -214,6 +217,8 @@ public final class IcebergUtil public static final int REAL_NEGATIVE_ZERO = 0x80000000; public static final int REAL_NEGATIVE_INFINITE = 0xff800000; + protected static final String VIEW_OWNER = "view_owner"; + private IcebergUtil() {} public static boolean isIcebergTable(com.facebook.presto.hive.metastore.Table table) @@ -252,6 +257,15 @@ public static Table getNativeIcebergTable(IcebergNativeCatalogFactory catalogFac return catalogFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table)); } + public static View getNativeIcebergView(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table) + { + Catalog catalog = catalogFactory.getCatalog(session); + if (catalog instanceof ViewCatalog) { + return ((ViewCatalog) catalog).loadView(toIcebergTableIdentifier(table)); + } + throw new PrestoException(NOT_SUPPORTED, "This connector does not support get views"); + } + public static List getPartitionKeyColumnHandles(IcebergTableHandle tableHandle, Table table, TypeManager typeManager) { Set partitionSpecs = tableHandle.getIcebergTableName().getSnapshotId() @@ -375,6 +389,11 @@ public static Optional getTableComment(Table table) return Optional.ofNullable(table.properties().get(TABLE_COMMENT)); } + public static Optional getViewComment(View view) + { + return Optional.ofNullable(view.properties().get(TABLE_COMMENT)); + } + private static String quotedTableName(SchemaTableName name) { return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName()); @@ -453,6 +472,7 @@ public static Map createIcebergViewProperties(ConnectorSession s .put(PRESTO_VERSION_NAME, prestoVersion) .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) .put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE) + .put(VIEW_OWNER, session.getUser()) .build(); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergNessieCatalogDistributedQueries.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergNessieCatalogDistributedQueries.java index 2c75718c80d0..cb238c0aa044 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergNessieCatalogDistributedQueries.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergNessieCatalogDistributedQueries.java @@ -62,6 +62,6 @@ protected QueryRunner createQueryRunner() @Override protected boolean supportsViews() { - return false; + return true; } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java index d9edd97ad81d..4baf4560b28c 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java @@ -70,6 +70,7 @@ public static TestingHttpServer getRestServer(String location) .put(WAREHOUSE_LOCATION, location) .put("jdbc.username", "user") .put("jdbc.password", "password") + .put("jdbc.schema-version", "V1") .build(); backingCatalog.initialize("rest_jdbc_backend", properties); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestCatalogDistributedQueries.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestCatalogDistributedQueries.java index 33dd865b65a0..ab76a96a77da 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestCatalogDistributedQueries.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergRestCatalogDistributedQueries.java @@ -86,6 +86,6 @@ protected QueryRunner createQueryRunner() @Override protected boolean supportsViews() { - return false; + return true; } }