Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce AllocateTableStorage in Storage interface #119

Merged
merged 8 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.openhouse.cluster.storage;

import com.google.common.base.Preconditions;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -41,4 +43,37 @@ public Map<String, String> getProperties() {
.map(HashMap::new)
.orElseGet(HashMap::new);
}

/**
* Allocates Table Storage and return location.
*
* <p>Default tableLocation looks like: {endpoint}/{rootPrefix}/{databaseId}/{tableId}-{tableUUID}
*
* @param databaseId the database id of the table
* @param tableId the table id of the table
* @param tableUUID the UUID of the table
* @param tableCreator the creator of the table
* @return the table location where the table data should be stored
*/
@Override
public String allocateTableLocation(
String databaseId, String tableId, String tableUUID, String tableCreator) {
Preconditions.checkArgument(databaseId != null, "Database ID cannot be null");
Preconditions.checkArgument(tableId != null, "Table ID cannot be null");
Preconditions.checkArgument(tableUUID != null, "Table UUID cannot be null");
Preconditions.checkState(
storageProperties.getTypes().containsKey(getType().getValue()),
"Storage properties doesn't contain type: " + getType().getValue());
return URI.create(
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
getClient().getEndpoint()
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
+ getClient().getRootPrefix()
+ "/"
+ databaseId
+ "/"
+ tableId
+ "-"
+ tableUUID)
.normalize()
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,26 @@ public interface Storage {
* @return a client to interact with the storage
*/
StorageClient<?> getClient();

/**
* Allocates Table Storage and return location.
*
* <p>Allocating involves creating directory structure/ creating bucket, and setting appropriate
* permissions etc. Please note that this method should avoid creating TableFormat specific
* directories (ex: /data, /metadata for Iceberg or _delta_log for DeltaLake). Such provisioning
* should be done by the TableFormat implementation.
*
* <p>After allocation is done, this method should return the table location where the table data
* should be stored. Example: hdfs:///rootPrefix/databaseId/tableId-UUID for HDFS storage;
* file:/tmp/databaseId/tableId-UUID for Local storage; s3://bucket/databaseId/tableId-UUID for S3
* storage
*
* @param databaseId the database id of the table
* @param tableId the table id of the table
* @param tableUUID the UUID of the table
* @param tableCreator the creator of the table
* @return the table location after provisioning is done
*/
String allocateTableLocation(
String databaseId, String tableId, String tableUUID, String tableCreator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.openhouse.cluster.storage.BaseStorage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import java.nio.file.Paths;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -36,4 +37,19 @@ public StorageType.Type getType() {
public StorageClient<?> getClient() {
return hdfsStorageClient;
}

/**
* Allocates Table Space for the HDFS storage.
*
* <p>tableLocation looks like: /{rootPrefix}/{databaseId}/{tableId}-{tableUUID} We strip the
* endpoint to ensure backward-compatibility. This override should be removed after resolving <a
* href="https://github.com/linkedin/openhouse/issues/121">
*
* @return the table location
*/
@Override
public String allocateTableLocation(
String databaseId, String tableId, String tableUUID, String tableCreator) {
return Paths.get(getClient().getRootPrefix(), databaseId, tableId + "-" + tableUUID).toString();
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.nio.file.Paths;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -51,4 +52,17 @@ public StorageType.Type getType() {
public StorageClient<?> getClient() {
return localStorageClient;
}

/**
* Allocates Table Space for the Local Storage.
*
* <p>tableLocation looks like: /{rootPrefix}/{databaseId}/{tableId}-{tableUUID} We strip the
* endpoint to ensure backward-compatibility. This override should be removed after resolving <a
* href="https://github.com/linkedin/openhouse/issues/121">
*/
@Override
public String allocateTableLocation(
String databaseId, String tableId, String tableUUID, String tableCreator) {
return Paths.get(getClient().getRootPrefix(), databaseId, tableId + "-" + tableUUID).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateProperties;

Expand Down Expand Up @@ -142,8 +143,10 @@ static TableDto convertToTableDto(
.tableUUID(megaProps.get(getCanonicalFieldName("tableUUID")))
.tableLocation(
URI.create(
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
storage.getClient().getEndpoint()
+ megaProps.get(getCanonicalFieldName("tableLocation")))
StringUtils.prependIfMissing( // remove after resolving
// https://github.com/linkedin/openhouse/issues/121
megaProps.get(getCanonicalFieldName("tableLocation")),
storage.getClient().getEndpoint()))
.normalize()
.toString())
.tableVersion(megaProps.get(getCanonicalFieldName("tableVersion")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,13 @@ public TableDto save(TableDto tableDto) {
tableIdentifier,
writeSchema,
partitionSpec,
constructTablePath(
storageManager,
storageManager
.getDefaultStorage()
.allocateTableLocation(
tableDto.getDatabaseId(),
tableDto.getTableId(),
tableDto.getTableUUID())
.toString(),
tableDto.getTableUUID(),
tableDto.getTableCreator()),
computePropsForTableCreation(tableDto));
meterRegistry.counter(MetricsConstant.REPO_TABLE_CREATED_CTR).increment();
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.linkedin.openhouse.tables.mock.storage.base;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.linkedin.openhouse.cluster.storage.BaseStorage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import com.linkedin.openhouse.cluster.storage.hdfs.HdfsStorageClient;
import javax.annotation.PostConstruct;
import lombok.Setter;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestComponent;
import org.springframework.boot.test.mock.mockito.MockBean;

@SpringBootTest
public class BaseStorageTest {

@MockBean private StorageProperties storageProperties;

@MockBean private HdfsStorageClient hdfsStorageClient;

@TestComponent
@Setter
public static class DummyBaseStorage extends BaseStorage {

HdfsStorageClient hdfsStorageClient;

@Override
public StorageType.Type getType() {
return new StorageType.Type("TEST"); // return a dummy type
}

@Override
public StorageClient<FileSystem> getClient() {
return hdfsStorageClient; // return a dummy client
}
}

@Autowired private DummyBaseStorage baseStorage;

@PostConstruct
public void setupTest() {
baseStorage.setHdfsStorageClient(hdfsStorageClient);
}

private static final String databaseId = "db1";
private static final String tableId = "table1";
private static final String tableUUID = "uuid1";
private static final String tableCreator = "creator1";

@Test
public void testAllocateTableLocationPattern1() {
mockStorageProperties("hdfs://localhost:9000", "/data/openhouse");
assertEquals(
"hdfs://localhost:9000/data/openhouse/db1/table1-uuid1",
baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator));
}

@Test
public void testAllocateTableLocationPattern2() {
mockStorageProperties("hdfs://localhost:9000/", "/data/openhouse");
assertEquals(
"hdfs://localhost:9000/data/openhouse/db1/table1-uuid1",
baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator));
}

@Test
public void testAllocateTableLocationPattern3() {
mockStorageProperties("hdfs://localhost:9000/", "data/openhouse");
assertEquals(
"hdfs://localhost:9000/data/openhouse/db1/table1-uuid1",
baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator));
}

@Test
public void testAllocateTableLocationPattern4() {
mockStorageProperties("hdfs://localhost:9000/", "data");
assertEquals(
"hdfs://localhost:9000/data/db1/table1-uuid1",
baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator));
}

@Test
public void testAllocateTableLocationPattern5() {
mockStorageProperties("hdfs:///", "data/openhouse");
assertEquals(
"hdfs:///data/openhouse/db1/table1-uuid1",
baseStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator));
}

void mockStorageProperties(String endpoint, String rootPrefix) {
when(hdfsStorageClient.getEndpoint()).thenReturn(endpoint);
when(hdfsStorageClient.getRootPrefix()).thenReturn(rootPrefix);
when(storageProperties.getTypes())
.thenReturn(
ImmutableMap.of(
"TEST",
new StorageProperties.StorageTypeProperties(
rootPrefix, endpoint, ImmutableMap.of())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,17 @@ public void testHdfsStoragePropertiesReturned() {
"/data/openhouse", "hdfs://localhost:9000", testMap)));
assertEquals(testMap, hdfsStorage.getProperties());
}

@Test
public void testAllocateTableSpace() {
String databaseId = "db1";
String tableId = "table1";
String tableUUID = "uuid1";
String tableCreator = "creator1";
boolean skipProvisioning = false;
when(hdfsStorageClient.getRootPrefix()).thenReturn("/data/openhouse");
String expected = "/data/openhouse/db1/table1-uuid1";
assertEquals(
expected, hdfsStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.openhouse.tables.mock.storage.local;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -83,4 +84,17 @@ public void testLocalStorageGetClient() {
when(localStorageClient.getNativeClient()).thenReturn(localFileSystem);
assertTrue(localStorage.getClient().getNativeClient().equals(localFileSystem));
}

@Test
public void testAllocateTableSpace() {
String databaseId = "db1";
String tableId = "table1";
String tableUUID = "uuid1";
String tableCreator = "creator1";
boolean skipProvisioning = false;
when(localStorageClient.getRootPrefix()).thenReturn("/tmp");
String expected = "/tmp/db1/table1-uuid1";
assertEquals(
expected, localStorage.allocateTableLocation(databaseId, tableId, tableUUID, tableCreator));
}
}
Loading