Skip to content

Commit

Permalink
Self serve replication API server side implementation (#227)
Browse files Browse the repository at this point in the history
## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

Branched off from #220, this
PR adds the server side implementation for the self serve replication
API. Separate PR for SQL level changes can be found here:
#226.

This PR adds validations for the interval and destination cluster
parameters and stores the replication config as part of table policies
in table properties.

Validations on parameters:
- Destination cluster cannot be the same as the source cluster of the
table.
- For the interval parameter, if user inputted it should be in the
format <X>H or <X>D where hourly inputs can be 12H and daily inputs can
be 1-3D.

This PR doesn't include the changes to generate the cron schedule from
the interval input, those will be made in a separate PR.

## Changes

- [x] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [ ] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [x] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

Added unit testing.

Tested with local docker server:
successful POST to
`http://localhost:8000/v1/databases/u_tableowner/tables` with
parameters:
```
{
    "tableId": "test_table",
    "databaseId": "u_tableowner",
    "baseTableVersion": "INITIAL_VERSION",
    "clusterId": "LocalHadoopCluster",
    "schema": "{\"type\": \"struct\", \"fields\": [{\"id\": 1,\"required\": true,\"name\": \"id\",\"type\": \"string\"},{\"id\": 2,\"required\": true,\"name\": \"name\",\"type\": \"string\"},{\"id\": 3,\"required\": true,\"name\": \"ts\",\"type\": \"timestamp\"}]}",
    "tableProperties": {
        "key": "value"
    },
    "policies": {
        "sharingEnabled": "true",
        "replication": {
            "config": [
                {
                    "destination": "LocalHadoopClusterA",
                    "interval": "12H"
                }
            ]
        }
    }
}
```

successful POST to
`http://localhost:8000/v1/databases/u_tableowner/tables` with
parameters:
```
{
    "tableId": "test_table",
    "databaseId": "u_tableowner",
    "baseTableVersion": "INITIAL_VERSION",
    "clusterId": "LocalHadoopCluster",
    "schema": "{\"type\": \"struct\", \"fields\": [{\"id\": 1,\"required\": true,\"name\": \"id\",\"type\": \"string\"},{\"id\": 2,\"required\": true,\"name\": \"name\",\"type\": \"string\"},{\"id\": 3,\"required\": true,\"name\": \"ts\",\"type\": \"timestamp\"}]}",
    "tableProperties": {
        "key": "value"
    },
    "policies": {
        "sharingEnabled": "true",
        "replication": {
            "config": [
                {
                    "destination": "LocalHadoopClusterA",
                    "interval": "1D"
                }
            ]
        }
    }
}
```

Using `interval: 24H` gives the following error:
```
{
    "status": "BAD_REQUEST",
    "error": "Bad Request",
    "message": " : Replication interval for the table LocalHadoopCluster.u_tableowner.test_table1 can either be 12 hours or daily for up to 3 days",
    "stacktrace": null,
    "cause": "Not Available"
}
```

Trying to set the destination cluster as the source cluster gives the
following error:
```
{
    "status": "BAD_REQUEST",
    "error": "Bad Request",
    "message": " : Replication destination cluster for the table LocalHadoopCluster.u_tableowner.test_table1 must be different from the source cluster",
    "stacktrace": null,
    "cause": "Not Available"
}
```

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
chenselena authored Oct 24, 2024
1 parent 2960f85 commit 74d85fc
Show file tree
Hide file tree
Showing 15 changed files with 668 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,87 @@ public void testColumnPolicyTagsExistUpdateExistingPolicyTags() {
Assertions.assertTrue(updatedPolicies.getColumnTags().containsKey("col1"));
Assertions.assertEquals(tagHC, updatedPolicies.getColumnTags().get("col1").getTags());
}

@Test
public void testPoliciesReplicationExistsButNoUpdateEmptyInterval() {
Map<String, String> props = new HashMap<>();
props.put(
"policies", "{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"\"}]}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getDestination(), "a");
Assertions.assertTrue(
updatedPolicies.getReplication().getConfig().get(0).getInterval().isEmpty());
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 1);
}

@Test
public void testNoPoliciesReplicationButUpdateExists() {
Map<String, String> props = new HashMap<>();
props.put(
"updated.openhouse.policy",
"{\"replication\":{\"config\":[{\"destination\":\"aa\", \"interval\":\"1D\"}]}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getDestination(), "aa");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getInterval(), "1D");
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 1);
}

@Test
public void testPoliciesReplicationExistsUpdateExists() {
Map<String, String> props = new HashMap<>();
props.put(
"policies",
"{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"1D\"}, {\"destination\":\"b\", \"interval\":\"1D\"}]}}");
props.put(
"updated.openhouse.policy",
"{\"replication\":{\"config\":[{\"destination\":\"aa\", \"interval\":\"2D\"}]}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getDestination(), "aa");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getInterval(), "2D");
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 1);
}

@Test
public void testPoliciesReplicationExistsUpdateExistsForMultiple() {
Map<String, String> props = new HashMap<>();
props.put(
"policies",
"{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"1D\"}]}}");
props.put(
"updated.openhouse.policy",
"{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"1D\"}, {\"destination\":\"aa\", \"interval\":\"2D\"}]}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getDestination(), "a");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getInterval(), "1D");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(1).getDestination(), "aa");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(1).getInterval(), "2D");
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ protected CreateUpdateTableRequestBody constructMetadataRequestBody(
CreateUpdateTableRequestBody.TableTypeEnum.valueOf(
metadata.properties().get(OPENHOUSE_TABLE_TYPE_KEY)));
}

return createUpdateTableRequestBody;
}

Expand Down Expand Up @@ -212,6 +211,10 @@ Policies buildUpdatedPolicies(TableMetadata metadata) {
}
policies.setColumnTags(patchUpdatedPolicy.getColumnTags());
}
// Update replication config
if (patchUpdatedPolicy.getReplication() != null) {
policies.replication(patchUpdatedPolicy.getReplication());
}
return policies;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public class Policies {
example = "{'colName': [PII, HC]}")
@Valid
Map<String, PolicyTag> columnTags;

@Schema(
description =
"Replication as required in /tables API request. This field holds the replication spec config.",
example = "{replication:{config:[{destination: clusterA, interval: 12H}]}}")
@Valid
Replication replication;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.linkedin.openhouse.tables.api.spec.v0.request.components;

import io.swagger.v3.oas.annotations.media.Schema;
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Builder(toBuilder = true)
@EqualsAndHashCode
@Getter
@AllArgsConstructor(access = AccessLevel.PROTECTED)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Replication {
@Schema(
description =
"Replication config for the destination cluster name and replication job interval",
example = "[{destination: clusterA, interval: 12H}, {destination: clusterB, interval: 12H}]")
@NotNull(message = "Incorrect replication policy specified. Replication config cannot be null.")
@Valid
List<ReplicationConfig> config;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.linkedin.openhouse.tables.api.spec.v0.request.components;

import io.swagger.v3.oas.annotations.media.Schema;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Builder(toBuilder = true)
@EqualsAndHashCode
@Getter
@AllArgsConstructor(access = AccessLevel.PROTECTED)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class ReplicationConfig {
@Schema(description = "Replication destination cluster name", example = "clusterA")
@NotNull(
message =
"Incorrect destination specified. Destination field for replication config cannot be null")
@Valid
String destination;

@Schema(
description =
"Optional parameter interval at which the replication job should run. Default value is 1D",
example = "1D")
@Valid
String interval;
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,16 @@ private List<String> validateUUIDForReplicaTable(
}

private void validatePolicies(CreateUpdateTableRequestBody createUpdateTableRequestBody) {
if (!policiesSpecValidator.validate(
createUpdateTableRequestBody.getPolicies(),
createUpdateTableRequestBody.getTimePartitioning(),
TableUri tableUri =
TableUri.builder()
.tableId(createUpdateTableRequestBody.getTableId())
.clusterId(createUpdateTableRequestBody.getClusterId())
.databaseId(createUpdateTableRequestBody.getDatabaseId())
.build(),
.build();
if (!policiesSpecValidator.validate(
createUpdateTableRequestBody.getPolicies(),
createUpdateTableRequestBody.getTimePartitioning(),
tableUri,
createUpdateTableRequestBody.getSchema())) {
throw new RequestValidationFailureException(
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.linkedin.openhouse.common.api.spec.TableUri;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec;
import com.linkedin.openhouse.tables.common.DefaultColumnPattern;
Expand Down Expand Up @@ -86,9 +87,71 @@ public boolean validate(
return false;
}
}

return validateReplication(policies, tableUri);
}

/**
* Valid cases for replication object: 0. Interval input can be either be accepted as 12H or daily
* from 1-3D 1. Destination cluster cannot be equal to the source cluster
*/
protected boolean validateReplication(Policies policies, TableUri tableUri) {
if (policies != null
&& policies.getReplication() != null
&& policies.getReplication().getConfig() != null) {
return policies.getReplication().getConfig().stream()
.allMatch(
replicationConfig -> {
if (replicationConfig.getInterval() != null
&& !replicationConfig.getInterval().isEmpty()) {
if (!validateReplicationInterval(replicationConfig)) {
failureMessage =
String.format(
"Replication interval for the table [%s] can either be 12 hours or daily for up to 3 days",
tableUri);
return false;
}
}
if (replicationConfig.getDestination() != null) {
if (!validateReplicationDestination(replicationConfig, tableUri)) {
failureMessage =
String.format(
"Replication destination cluster for the table [%s] must be different from the source cluster",
tableUri);
return false;
}
}
return true;
});
}
return true;
}

/**
* Validate that the optional interval parameter provided by users exists as an interval of 12 or
* as a daily value up to 3 days
*/
protected boolean validateReplicationInterval(ReplicationConfig replicationConfig) {
String granularity =
replicationConfig.getInterval().substring(replicationConfig.getInterval().length() - 1);
int interval =
Integer.parseInt(
replicationConfig
.getInterval()
.substring(0, replicationConfig.getInterval().length() - 1));

return (interval >= 1 && interval <= 3 && granularity.equals("D"))
|| (interval == 12 && granularity.equals("H"));
}

/**
* Validate that the destination cluster provided by users is not the same as the source cluster
*/
protected boolean validateReplicationDestination(
ReplicationConfig replicationConfig, TableUri tableUri) {
return !replicationConfig.getDestination().toString().equals(tableUri.getClusterId());
}

/**
* Validate the pattern provided by users are legit pattern that complies with {@link
* DateTimeFormatter} symbols. Also, the provided column name needs to be part of schema.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.openhouse.tables.common;

import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;

/** ENUM for default replication interval associated with Interval in {@link ReplicationConfig} */
public enum ReplicationInterval {
// default interval to run replication jobs if no interval provided by user
DEFAULT("1D");

private final String interval;

ReplicationInterval(String interval) {
this.interval = interval;
}

public String getInterval() {
return interval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Replication;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention;
import com.linkedin.openhouse.tables.common.DefaultColumnPattern;
import com.linkedin.openhouse.tables.common.ReplicationInterval;
import com.linkedin.openhouse.tables.model.TableDto;
import java.util.List;
import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.mapstruct.Named;

Expand Down Expand Up @@ -61,6 +66,7 @@ public Policies toPoliciesObject(String policiesString) throws JsonParseExceptio
@Named("mapPolicies")
public Policies mapPolicies(Policies policies) {
String defaultPattern;
Policies updatedPolicies = policies;
if (policies != null
&& policies.getRetention() != null
&& policies.getRetention().getColumnPattern() != null
Expand All @@ -86,9 +92,44 @@ public Policies mapPolicies(Policies policies) {
.pattern(defaultPattern)
.build())
.build();
return policies.toBuilder().retention(retentionPolicy).build();
} else {
return policies;
updatedPolicies = policies.toBuilder().retention(retentionPolicy).build();
}
if (policies != null && policies.getReplication() != null) {
updatedPolicies =
updatedPolicies
.toBuilder()
.replication(mapReplicationPolicies(policies.getReplication()))
.build();
}
return updatedPolicies;
}

/**
* mapRetentionPolicies is a mapStruct function which assigns default interval value in
* replication config if the interval is empty. Default values for pattern are defined at {@link
* ReplicationInterval}.
*
* @param replicationPolicy config for Openhouse table
* @return mapped policies object
*/
private Replication mapReplicationPolicies(Replication replicationPolicy) {
if (replicationPolicy != null && replicationPolicy.getConfig() != null) {
List<ReplicationConfig> replicationConfig =
replicationPolicy.getConfig().stream()
.map(
replication -> {
if (replication.getInterval() == null || replication.getInterval().isEmpty()) {
return replication
.toBuilder()
.interval(ReplicationInterval.DEFAULT.getInterval())
.build();
}
return replication;
})
.collect(Collectors.toList());

return replicationPolicy.toBuilder().config(replicationConfig).build();
}
return replicationPolicy;
}
}
Loading

0 comments on commit 74d85fc

Please sign in to comment.