Skip to content

Commit

Permalink
[Iceberg]Support view on Rest catalog and Nessie catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Nov 4, 2024
1 parent c7b3abd commit addb7c9
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.view.View;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -141,6 +143,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getPartitions;
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdTimeOperator;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.getViewComment;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation;
Expand Down Expand Up @@ -217,6 +220,8 @@ protected final Table getIcebergTable(ConnectorSession session, SchemaTableName

protected abstract Table getRawIcebergTable(ConnectorSession session, SchemaTableName schemaTableName);

protected abstract View getIcebergView(ConnectorSession session, SchemaTableName schemaTableName);

protected abstract boolean tableExists(ConnectorSession session, SchemaTableName schemaTableName);

public abstract void registerTable(ConnectorSession clientSession, SchemaTableName schemaTableName, Path metadataLocation);
Expand Down Expand Up @@ -386,22 +391,31 @@ protected Optional<SystemTable> getIcebergSystemTable(SchemaTableName tableName,
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) table;
return getTableMetadata(session, icebergTableHandle.getSchemaTableName(), icebergTableHandle.getIcebergTableName());
return getTableOrViewMetadata(session, icebergTableHandle.getSchemaTableName(), icebergTableHandle.getIcebergTableName());
}

protected ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table, IcebergTableName icebergTableName)
protected ConnectorTableMetadata getTableOrViewMetadata(ConnectorSession session, SchemaTableName table, IcebergTableName icebergTableName)
{
Table icebergTable = getIcebergTable(session, new SchemaTableName(table.getSchemaName(), icebergTableName.getTableName()));
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
columns.addAll(getColumnMetadatas(icebergTable));
if (icebergTableName.getTableType() == CHANGELOG) {
return ChangelogUtil.getChangelogTableMeta(table, typeManager, columns.build());
try {
Table icebergTable = getIcebergTable(session, new SchemaTableName(table.getSchemaName(), icebergTableName.getTableName()));
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
columns.addAll(getColumnMetadatas(icebergTable));
if (icebergTableName.getTableType() == CHANGELOG) {
return ChangelogUtil.getChangelogTableMeta(table, typeManager, columns.build());
}
else {
columns.add(PATH_COLUMN_METADATA);
columns.add(DATA_SEQUENCE_NUMBER_COLUMN_METADATA);
}
return new ConnectorTableMetadata(table, columns.build(), createMetadataProperties(icebergTable), getTableComment(icebergTable));
}
else {
columns.add(PATH_COLUMN_METADATA);
columns.add(DATA_SEQUENCE_NUMBER_COLUMN_METADATA);
catch (NoSuchTableException e) {
// Considering that the Iceberg library does not provide an efficient way to determine whether
// it's a view or a table without loading it, we first try to load it as a table directly, and then
// try to load it as a view when getting an `NoSuchTableException`. This will be more efficient.
View icebergView = getIcebergView(session, new SchemaTableName(table.getSchemaName(), icebergTableName.getTableName()));
return new ConnectorTableMetadata(table, getColumnMetadatas(icebergView), createViewMetadataProperties(icebergView), getViewComment(icebergView));
}
return new ConnectorTableMetadata(table, columns.build(), createMetadataProperties(icebergTable), getTableComment(icebergTable));
}

@Override
Expand All @@ -414,7 +428,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
try {
IcebergTableName tableName = IcebergTableName.from(table.getTableName());
if (!tableName.isSystemTable()) {
columns.put(table, getTableMetadata(session, table, tableName).getColumns());
columns.put(table, getTableOrViewMetadata(session, table, tableName).getColumns());
}
}
catch (TableNotFoundException e) {
Expand Down Expand Up @@ -546,6 +560,18 @@ protected List<ColumnMetadata> getColumnMetadatas(Table table)
.collect(toImmutableList());
}

protected List<ColumnMetadata> getColumnMetadatas(View view)
{
return view.schema().columns().stream()
.map(column -> ColumnMetadata.builder()
.setName(column.name())
.setType(toPrestoType(column.type(), typeManager))
.setComment(Optional.ofNullable(column.doc()))
.setHidden(false)
.build())
.collect(toImmutableList());
}

private static String columnExtraInfo(List<String> partitionTransforms)
{
if (partitionTransforms.size() == 1 && partitionTransforms.get(0).equals("identity")) {
Expand Down Expand Up @@ -579,6 +605,16 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
return properties.build();
}

protected ImmutableMap<String, Object> createViewMetadataProperties(View view)
{
ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
if (view.properties() != null) {
view.properties().entrySet().stream()
.forEach(entry -> properties.put(entry.getKey(), entry.getValue()));
}
return properties.build();
}

public static Schema toIcebergSchema(List<ColumnMetadata> columns)
{
List<Types.NestedField> icebergColumns = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.view.View;
import org.joda.time.DateTimeZone;

import java.io.IOException;
Expand Down Expand Up @@ -181,6 +182,12 @@ protected org.apache.iceberg.Table getRawIcebergTable(ConnectorSession session,
return getHiveIcebergTable(metastore, hdfsEnvironment, hiveTableOeprationsConfig, session, schemaTableName);
}

@Override
protected View getIcebergView(ConnectorSession session, SchemaTableName schemaTableName)
{
throw new PrestoException(NOT_SUPPORTED, "Iceberg Hive catalog support native Iceberg views.");
}

@Override
protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,46 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.ConnectorViewDefinition;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergUtil.VIEW_OWNER;
import static com.facebook.presto.iceberg.IcebergUtil.createIcebergViewProperties;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
Expand All @@ -68,9 +83,11 @@ public class IcebergNativeMetadata
extends IcebergAbstractMetadata
{
private static final String INFORMATION_SCHEMA = "information_schema";
private static final String VIEW_DIALECT = "presto";

private final IcebergNativeCatalogFactory catalogFactory;
private final CatalogType catalogType;
private final ConcurrentMap<SchemaTableName, View> icebergViews = new ConcurrentHashMap<>();

public IcebergNativeMetadata(
IcebergNativeCatalogFactory catalogFactory,
Expand All @@ -94,6 +111,14 @@ protected Table getRawIcebergTable(ConnectorSession session, SchemaTableName sch
return getNativeIcebergTable(catalogFactory, session, schemaTableName);
}

@Override
protected View getIcebergView(ConnectorSession session, SchemaTableName schemaTableName)
{
return icebergViews.computeIfAbsent(
schemaTableName,
ignored -> getNativeIcebergView(catalogFactory, session, schemaTableName));
}

@Override
protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTableName)
{
Expand Down Expand Up @@ -158,6 +183,106 @@ public void renameSchema(ConnectorSession session, String source, String target)
throw new PrestoException(NOT_SUPPORTED, format("Iceberg %s catalog does not support rename namespace", catalogType.name()));
}

@Override
public void createView(ConnectorSession session, ConnectorTableMetadata viewMetadata, String viewData, boolean replace)
{
Catalog catalog = catalogFactory.getCatalog(session);
if (catalog instanceof ViewCatalog) {
Schema schema = toIcebergSchema(viewMetadata.getColumns());
ViewBuilder viewBuilder = ((ViewCatalog) catalog).buildView(TableIdentifier.of(viewMetadata.getTable().getSchemaName(), viewMetadata.getTable().getTableName()))
.withSchema(schema)
.withDefaultNamespace(Namespace.of(viewMetadata.getTable().getSchemaName()))
.withQuery(VIEW_DIALECT, viewData)
.withProperties(createIcebergViewProperties(session, nodeVersion.toString()));
if (replace) {
viewBuilder.createOrReplace();
}
else {
viewBuilder.create();
}
}
else {
throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating views");
}
}

@Override
public List<SchemaTableName> listViews(ConnectorSession session, Optional<String> schemaName)
{
ImmutableList.Builder<SchemaTableName> tableNames = ImmutableList.builder();
Catalog catalog = catalogFactory.getCatalog(session);
if (catalog instanceof ViewCatalog) {
for (String schema : listSchemas(session, schemaName.orElse(null))) {
try {
for (TableIdentifier tableIdentifier : ((ViewCatalog) catalog).listViews(Namespace.of(schema))) {
tableNames.add(new SchemaTableName(schema, tableIdentifier.name()));
}
}
catch (NoSuchNamespaceException e) {
// ignore
}
}
}
return tableNames.build();
}

private List<String> listSchemas(ConnectorSession session, String schemaNameOrNull)
{
if (schemaNameOrNull == null) {
return listSchemaNames(session);
}
return ImmutableList.of(schemaNameOrNull);
}

@Override
public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession session, SchemaTablePrefix prefix)
{
ImmutableMap.Builder<SchemaTableName, ConnectorViewDefinition> views = ImmutableMap.builder();
Catalog catalog = catalogFactory.getCatalog(session);
if (catalog instanceof ViewCatalog) {
List<SchemaTableName> tableNames;
if (prefix.getTableName() != null) {
tableNames = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
}
else {
tableNames = listViews(session, Optional.of(prefix.getSchemaName()));
}

for (SchemaTableName schemaTableName : tableNames) {
try {
if (((ViewCatalog) catalog).viewExists(TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()))) {
View view = ((ViewCatalog) catalog).loadView(TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()));
verifyAndPopulateViews(view, schemaTableName, view.sqlFor(VIEW_DIALECT).sql(), views);
}
}
catch (IllegalArgumentException e) {
// Ignore illegal view names
}
}
}
return views.build();
}

@Override
public void dropView(ConnectorSession session, SchemaTableName viewName)
{
Catalog catalog = catalogFactory.getCatalog(session);
if (catalog instanceof ViewCatalog) {
((ViewCatalog) catalog).dropView(TableIdentifier.of(viewName.getSchemaName(), viewName.getTableName()));
}
else {
throw new PrestoException(NOT_SUPPORTED, "This connector does not support dropping views");
}
}

private void verifyAndPopulateViews(View view, SchemaTableName schemaTableName, String viewData, ImmutableMap.Builder<SchemaTableName, ConnectorViewDefinition> views)
{
views.put(schemaTableName, new ConnectorViewDefinition(
schemaTableName,
Optional.ofNullable(view.properties().get(VIEW_OWNER)),
viewData));
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorNewTableLayout> layout)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.CloseableIterable;
Expand All @@ -78,6 +80,7 @@
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.view.View;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -214,6 +217,8 @@ public final class IcebergUtil
public static final int REAL_NEGATIVE_ZERO = 0x80000000;
public static final int REAL_NEGATIVE_INFINITE = 0xff800000;

protected static final String VIEW_OWNER = "view_owner";

private IcebergUtil() {}

public static boolean isIcebergTable(com.facebook.presto.hive.metastore.Table table)
Expand Down Expand Up @@ -252,6 +257,15 @@ public static Table getNativeIcebergTable(IcebergNativeCatalogFactory catalogFac
return catalogFactory.getCatalog(session).loadTable(toIcebergTableIdentifier(table));
}

public static View getNativeIcebergView(IcebergNativeCatalogFactory catalogFactory, ConnectorSession session, SchemaTableName table)
{
Catalog catalog = catalogFactory.getCatalog(session);
if (catalog instanceof ViewCatalog) {
return ((ViewCatalog) catalog).loadView(toIcebergTableIdentifier(table));
}
throw new PrestoException(NOT_SUPPORTED, "This connector does not support get views");
}

public static List<IcebergColumnHandle> getPartitionKeyColumnHandles(IcebergTableHandle tableHandle, Table table, TypeManager typeManager)
{
Set<PartitionSpec> partitionSpecs = tableHandle.getIcebergTableName().getSnapshotId()
Expand Down Expand Up @@ -375,6 +389,11 @@ public static Optional<String> getTableComment(Table table)
return Optional.ofNullable(table.properties().get(TABLE_COMMENT));
}

public static Optional<String> getViewComment(View view)
{
return Optional.ofNullable(view.properties().get(TABLE_COMMENT));
}

private static String quotedTableName(SchemaTableName name)
{
return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName());
Expand Down Expand Up @@ -453,6 +472,7 @@ public static Map<String, String> createIcebergViewProperties(ConnectorSession s
.put(PRESTO_VERSION_NAME, prestoVersion)
.put(PRESTO_QUERY_ID_NAME, session.getQueryId())
.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE)
.put(VIEW_OWNER, session.getUser())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ protected QueryRunner createQueryRunner()
@Override
protected boolean supportsViews()
{
return false;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static TestingHttpServer getRestServer(String location)
.put(WAREHOUSE_LOCATION, location)
.put("jdbc.username", "user")
.put("jdbc.password", "password")
.put("jdbc.schema-version", "V1")
.build();
backingCatalog.initialize("rest_jdbc_backend", properties);

Expand Down
Loading

0 comments on commit addb7c9

Please sign in to comment.