diff --git a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/BaseStorage.java b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/BaseStorage.java index 2c992bfe..20443d18 100644 --- a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/BaseStorage.java +++ b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/BaseStorage.java @@ -1,6 +1,8 @@ package com.linkedin.openhouse.cluster.storage; +import com.google.common.base.Preconditions; import com.linkedin.openhouse.cluster.storage.configs.StorageProperties; +import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -41,4 +43,37 @@ public Map getProperties() { .map(HashMap::new) .orElseGet(HashMap::new); } + + /** + * Allocates Table Storage and return location. + * + *

Default tableLocation looks like: {endpoint}/{rootPrefix}/{databaseId}/{tableId}-{tableUUID} + * + * @param databaseId the database id of the table + * @param tableId the table id of the table + * @param tableUUID the UUID of the table + * @param tableCreator the creator of the table + * @return the table location where the table data should be stored + */ + @Override + public String allocateTableLocation( + String databaseId, String tableId, String tableUUID, String tableCreator) { + Preconditions.checkArgument(databaseId != null, "Database ID cannot be null"); + Preconditions.checkArgument(tableId != null, "Table ID cannot be null"); + Preconditions.checkArgument(tableUUID != null, "Table UUID cannot be null"); + Preconditions.checkState( + storageProperties.getTypes().containsKey(getType().getValue()), + "Storage properties doesn't contain type: " + getType().getValue()); + return URI.create( + getClient().getEndpoint() + + getClient().getRootPrefix() + + "/" + + databaseId + + "/" + + tableId + + "-" + + tableUUID) + .normalize() + .toString(); + } } diff --git a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/Storage.java b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/Storage.java index b0a8f079..7f78b6d9 100644 --- a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/Storage.java +++ b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/Storage.java @@ -48,4 +48,26 @@ public interface Storage { * @return a client to interact with the storage */ StorageClient getClient(); + + /** + * Allocates Table Storage and return location. + * + *

Allocating involves creating directory structure/ creating bucket, and setting appropriate + * permissions etc. Please note that this method should avoid creating TableFormat specific + * directories (ex: /data, /metadata for Iceberg or _delta_log for DeltaLake). Such provisioning + * should be done by the TableFormat implementation. + * + *

After allocation is done, this method should return the table location where the table data + * should be stored. Example: hdfs:///rootPrefix/databaseId/tableId-UUID for HDFS storage; + * file:/tmp/databaseId/tableId-UUID for Local storage; s3://bucket/databaseId/tableId-UUID for S3 + * storage + * + * @param databaseId the database id of the table + * @param tableId the table id of the table + * @param tableUUID the UUID of the table + * @param tableCreator the creator of the table + * @return the table location after provisioning is done + */ + String allocateTableLocation( + String databaseId, String tableId, String tableUUID, String tableCreator); } diff --git a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/hdfs/HdfsStorage.java b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/hdfs/HdfsStorage.java index a6ad210c..2a9ae378 100644 --- a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/hdfs/HdfsStorage.java +++ b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/hdfs/HdfsStorage.java @@ -3,6 +3,7 @@ import com.linkedin.openhouse.cluster.storage.BaseStorage; import com.linkedin.openhouse.cluster.storage.StorageClient; import com.linkedin.openhouse.cluster.storage.StorageType; +import java.nio.file.Paths; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; @@ -36,4 +37,19 @@ public StorageType.Type getType() { public StorageClient getClient() { return hdfsStorageClient; } + + /** + * Allocates Table Space for the HDFS storage. + * + *

tableLocation looks like: /{rootPrefix}/{databaseId}/{tableId}-{tableUUID} We strip the + * endpoint to ensure backward-compatibility. This override should be removed after resolving + * + * @return the table location + */ + @Override + public String allocateTableLocation( + String databaseId, String tableId, String tableUUID, String tableCreator) { + return Paths.get(getClient().getRootPrefix(), databaseId, tableId + "-" + tableUUID).toString(); + } } diff --git a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/local/LocalStorage.java b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/local/LocalStorage.java index f3ee5e8c..76a390b2 100644 --- a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/local/LocalStorage.java +++ b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/local/LocalStorage.java @@ -4,6 +4,7 @@ import com.linkedin.openhouse.cluster.storage.StorageClient; import com.linkedin.openhouse.cluster.storage.StorageType; import com.linkedin.openhouse.cluster.storage.configs.StorageProperties; +import java.nio.file.Paths; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; @@ -51,4 +52,17 @@ public StorageType.Type getType() { public StorageClient getClient() { return localStorageClient; } + + /** + * Allocates Table Space for the Local Storage. + * + *

tableLocation looks like: /{rootPrefix}/{databaseId}/{tableId}-{tableUUID} We strip the + * endpoint to ensure backward-compatibility. This override should be removed after resolving + */ + @Override + public String allocateTableLocation( + String databaseId, String tableId, String tableUUID, String tableCreator) { + return Paths.get(getClient().getRootPrefix(), databaseId, tableId + "-" + tableUUID).toString(); + } } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java index cd6d7323..ec86deb1 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java @@ -19,6 +19,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.apache.iceberg.Table; import org.apache.iceberg.UpdateProperties; @@ -142,8 +143,10 @@ static TableDto convertToTableDto( .tableUUID(megaProps.get(getCanonicalFieldName("tableUUID"))) .tableLocation( URI.create( - storage.getClient().getEndpoint() - + megaProps.get(getCanonicalFieldName("tableLocation"))) + StringUtils.prependIfMissing( // remove after resolving + // https://github.com/linkedin/openhouse/issues/121 + megaProps.get(getCanonicalFieldName("tableLocation")), + storage.getClient().getEndpoint())) .normalize() .toString()) .tableVersion(megaProps.get(getCanonicalFieldName("tableVersion"))) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java index 6f5fccaf..0992bf05 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java @@ -111,12 +111,13 @@ public TableDto save(TableDto tableDto) { tableIdentifier, writeSchema, partitionSpec, - constructTablePath( - storageManager, + storageManager + .getDefaultStorage() + .allocateTableLocation( tableDto.getDatabaseId(), tableDto.getTableId(), - tableDto.getTableUUID()) - .toString(), + tableDto.getTableUUID(), + tableDto.getTableCreator()), computePropsForTableCreation(tableDto)); meterRegistry.counter(MetricsConstant.REPO_TABLE_CREATED_CTR).increment(); log.info( diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/base/BaseStorageTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/base/BaseStorageTest.java new file mode 100644 index 00000000..82c66d1b --- /dev/null +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/base/BaseStorageTest.java @@ -0,0 +1,107 @@ +package com.linkedin.openhouse.tables.mock.storage.base; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.linkedin.openhouse.cluster.storage.BaseStorage; +import com.linkedin.openhouse.cluster.storage.StorageClient; +import com.linkedin.openhouse.cluster.storage.StorageType; +import com.linkedin.openhouse.cluster.storage.configs.StorageProperties; +import com.linkedin.openhouse.cluster.storage.hdfs.HdfsStorageClient; +import javax.annotation.PostConstruct; +import lombok.Setter; +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestComponent; +import org.springframework.boot.test.mock.mockito.MockBean; + +@SpringBootTest +public class BaseStorageTest { + + @MockBean private StorageProperties storageProperties; + + @MockBean private HdfsStorageClient hdfsStorageClient; + + @TestComponent + @Setter + public static class DummyBaseStorage extends BaseStorage { + + HdfsStorageClient hdfsStorageClient; + + @Override + public StorageType.Type getType() { + return new StorageType.Type("TEST"); // return a dummy type + } + + @Override + public StorageClient getClient() { + return hdfsStorageClient; // return a dummy client + } + } + + @Autowired private DummyBaseStorage baseStorage; + + @PostConstruct + public void setupTest() { + baseStorage.setHdfsStorageClient(hdfsStorageClient); + } + + private static final String databaseId = "db1"; + private static final String tableId = "table1"; + private static final String tableUUID = "uuid1"; + private static final String tableCreator = "creator1"; + + @Test + public void testAllocateTableLocationPattern1() { + mockStorageProperties("hdfs://localhost:9000", "/data/openhouse"); + assertEquals( + "hdfs://localhost:9000/data/openhouse/db1/table1-uuid1", + baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator)); + } + + @Test + public void testAllocateTableLocationPattern2() { + mockStorageProperties("hdfs://localhost:9000/", "/data/openhouse"); + assertEquals( + "hdfs://localhost:9000/data/openhouse/db1/table1-uuid1", + baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator)); + } + + @Test + public void testAllocateTableLocationPattern3() { + mockStorageProperties("hdfs://localhost:9000/", "data/openhouse"); + assertEquals( + "hdfs://localhost:9000/data/openhouse/db1/table1-uuid1", + baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator)); + } + + @Test + public void testAllocateTableLocationPattern4() { + mockStorageProperties("hdfs://localhost:9000/", "data"); + assertEquals( + "hdfs://localhost:9000/data/db1/table1-uuid1", + baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator)); + } + + @Test + public void testAllocateTableLocationPattern5() { + mockStorageProperties("hdfs:///", "data/openhouse"); + assertEquals( + "hdfs:///data/openhouse/db1/table1-uuid1", + baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator)); + } + + void mockStorageProperties(String endpoint, String rootPrefix) { + when(hdfsStorageClient.getEndpoint()).thenReturn(endpoint); + when(hdfsStorageClient.getRootPrefix()).thenReturn(rootPrefix); + when(storageProperties.getTypes()) + .thenReturn( + ImmutableMap.of( + "TEST", + new StorageProperties.StorageTypeProperties( + rootPrefix, endpoint, ImmutableMap.of()))); + } +} diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/hdfs/HdfsStorageTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/hdfs/HdfsStorageTest.java index f4c508ac..f3bfffa6 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/hdfs/HdfsStorageTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/hdfs/HdfsStorageTest.java @@ -56,4 +56,17 @@ public void testHdfsStoragePropertiesReturned() { "/data/openhouse", "hdfs://localhost:9000", testMap))); assertEquals(testMap, hdfsStorage.getProperties()); } + + @Test + public void testAllocateTableSpace() { + String databaseId = "db1"; + String tableId = "table1"; + String tableUUID = "uuid1"; + String tableCreator = "creator1"; + boolean skipProvisioning = false; + when(hdfsStorageClient.getRootPrefix()).thenReturn("/data/openhouse"); + String expected = "/data/openhouse/db1/table1-uuid1"; + assertEquals( + expected, hdfsStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator)); + } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/local/LocalStorageTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/local/LocalStorageTest.java index f7a0d84c..d6f00890 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/local/LocalStorageTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/local/LocalStorageTest.java @@ -1,5 +1,6 @@ package com.linkedin.openhouse.tables.mock.storage.local; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.when; @@ -83,4 +84,17 @@ public void testLocalStorageGetClient() { when(localStorageClient.getNativeClient()).thenReturn(localFileSystem); assertTrue(localStorage.getClient().getNativeClient().equals(localFileSystem)); } + + @Test + public void testAllocateTableSpace() { + String databaseId = "db1"; + String tableId = "table1"; + String tableUUID = "uuid1"; + String tableCreator = "creator1"; + boolean skipProvisioning = false; + when(localStorageClient.getRootPrefix()).thenReturn("/tmp"); + String expected = "/tmp/db1/table1-uuid1"; + assertEquals( + expected, localStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator)); + } }