Skip to content

Commit

Permalink
Convert interval parameter to cron schedule for self serve replication (
Browse files Browse the repository at this point in the history
#238)

## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

This PR adds cron schedule to the replication config for the self serve
replication API and converts the interval parameter to a cron schedule.
Cron schedule is needed as an input for the replication job that
performs the cross cluster data copy. The library `cronutils` is added
to perform the cron expression generation.

The interval parameter can be validated as `12H`, `1D`, `2D`, `3D` and
the cron schedule is generated based on:
- 12H schedule dictates replication should trigger every 12 hours
- The cron schedule should have an X hour to start from midnight where X
can range from 0-23 and Y minute where Y is in [0, 15, 30, 45]. E.g
12hours can lead to schedule “0 30 3/12 ? * * *” -> trigger at 3:30am
and 3:30pm every day.
- If interval is not provided, a daily replication schedule should be
set up with daily schedules with X hour starting from midnight.
- X is randomized from 0-23 and Y is randomized as a value of [0, 15,
30, 45] to spread out the cron and avoid job clusters around a time.

Examples of:
12H: `0 15 23/12 ? * * *`
1D: `0 45 10 ? * * *`
2D: `0 30 8 ? * */2 *`
3D: `0 0 8 ? * */3 *`

## Changes

- [x] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [x] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

Unit tests added to check that the cron schedule was created
successfully.

POST http://localhost:8000/v1/databases/u_tableowner/tables for a 1D
schedule:
```
{
    "policies": {
        "sharingEnabled": "true",
        "replication": {
            "config": [
                {
                    "destination": "clusterA"                
                 }
            ]
        }
    }
}
```
response:
```
        "replication": {
            "config": [
                {
                    "destination": "clusterA",
                    "interval": "1D",
                    "cronSchedule": "0 0 10 ? * * *"
                }
            ]
        }
```
12H schedule:
```
"replication": {
            "config": [
                {
                    "destination": "clusterA",
                    "interval": "12H",
                    "cronSchedule": "0 0 23/12 ? * * *"
                }
            ]
        }
```
# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
chenselena authored Oct 31, 2024
1 parent b2bc32b commit 9be2378
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 19 deletions.
1 change: 1 addition & 0 deletions services/tables/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
implementation project(':cluster:metrics')
implementation 'org.springframework.security:spring-security-config:5.7.2'
implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
implementation 'com.cronutils:cron-utils:9.2.0'
testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version
testImplementation 'org.springframework.security:spring-security-test:5.7.3'
testImplementation(testFixtures(project(':services:common')))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ public class ReplicationConfig {
@Schema(
description =
"Optional parameter interval at which the replication job should run. Default value is 1D",
example = "1D")
example = "12H, 1D, 2D, 3D")
@Valid
String interval;

@Schema(description = "Cron schedule generated from the interval.", example = "0 0 1/1 * ? *")
@Valid
String cronSchedule;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ public class TimePartitionSpec {
@NotNull(message = "granularity cannot be null")
Granularity granularity;

@Getter
public enum Granularity {
HOUR,
DAY,
MONTH,
YEAR
HOUR("H"),
DAY("D"),
MONTH("M"),
YEAR("Y");

private final String granularity;

Granularity(String granularity) {
this.granularity = granularity;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.openhouse.tables.common.DefaultColumnPattern;
import com.linkedin.openhouse.tables.common.ReplicationInterval;
import com.linkedin.openhouse.tables.model.TableDto;
import com.linkedin.openhouse.tables.utils.IntervalToCronConverter;
import java.util.List;
import java.util.stream.Collectors;
import org.mapstruct.Mapper;
Expand Down Expand Up @@ -106,8 +107,8 @@ public Policies mapPolicies(Policies policies) {

/**
* mapRetentionPolicies is a mapStruct function which assigns default interval value in
* replication config if the interval is empty. Default values for pattern are defined at {@link
* ReplicationInterval}.
* replication config if the interval is empty and the generated cron schedule from the interval
* value. Default values for pattern are defined at {@link ReplicationInterval}.
*
* @param replicationPolicy config for Openhouse table
* @return mapped policies object
Expand All @@ -122,12 +123,22 @@ private Replication mapReplicationPolicies(Replication replicationPolicy) {
return replication
.toBuilder()
.interval(ReplicationInterval.DEFAULT.getInterval())
.cronSchedule(
IntervalToCronConverter.generateCronExpression(
ReplicationInterval.DEFAULT.getInterval()))
.build();
}
if (replication.getCronSchedule() == null) {
return replication
.toBuilder()
.cronSchedule(
IntervalToCronConverter.generateCronExpression(
replication.getInterval()))
.build();
}
return replication;
})
.collect(Collectors.toList());

return replicationPolicy.toBuilder().config(replicationConfig).build();
}
return replicationPolicy;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.linkedin.openhouse.tables.utils;

import com.cronutils.builder.CronBuilder;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.field.expression.FieldExpressionFactory;
import com.linkedin.openhouse.common.exception.RequestValidationFailureException;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec;
import java.util.Random;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/** Utility class for generating a cron schedule given an interval for which a job should run */
@Slf4j
@Component
public class IntervalToCronConverter {

/**
* Public api to generate a cron schedule for a {@link ReplicationConfig} based on a given
* interval string in the form 12H, 1D, 2D, 3D.
*
* @param interval
* @return schedule
*/
public static String generateCronExpression(String interval) {
if (interval == null || interval.isEmpty()) {
String errorMessage = "Replication interval is null or empty";
log.error(errorMessage);
throw new RequestValidationFailureException(errorMessage);
}
int count = Integer.parseInt(interval.substring(0, interval.length() - 1));
// Generating random hourly and minute intervals for the cron expression
int hour = new Random().nextInt(24);
int minute = new int[] {0, 15, 30, 45}[new Random().nextInt(4)];

String granularity = interval.substring(interval.length() - 1);
String schedule;

if (granularity.equals(TimePartitionSpec.Granularity.HOUR.getGranularity())) {
schedule = generateHourlyCronExpression(hour, minute, count);
} else {
schedule = generateDailyCronExpression(hour, minute, count);
}
return schedule;
}

private static String generateDailyCronExpression(int hour, int minute, int dailyInterval) {
return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
.withYear(FieldExpressionFactory.always())
.withDoM(FieldExpressionFactory.questionMark())
.withDoW(FieldExpressionFactory.every(dailyInterval))
.withMonth(FieldExpressionFactory.always())
.withHour(FieldExpressionFactory.on(hour))
.withMinute(FieldExpressionFactory.on(minute))
.withSecond(FieldExpressionFactory.on(0))
.instance()
.asString();
}

private static String generateHourlyCronExpression(int hour, int minute, int hourlyInterval) {
return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
.withYear(FieldExpressionFactory.always())
.withDoM(FieldExpressionFactory.questionMark())
.withDoW(FieldExpressionFactory.always())
.withMonth(FieldExpressionFactory.always())
.withHour(FieldExpressionFactory.every(hourlyInterval))
.withMinute(FieldExpressionFactory.on(minute))
.withSecond(FieldExpressionFactory.on(0))
.instance()
.asString()
.replace(
String.format("*/%d", hourlyInterval), String.format("%d/%d", hour, hourlyInterval));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinition;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.jayway.jsonpath.JsonPath;
Expand Down Expand Up @@ -324,4 +329,16 @@ static MvcResult putSnapshotsAndValidateResponse(
validateMetadataInPutSnapshotsRequest(result, icebergSnapshotsRequestBody);
return result;
}

static boolean validateCronSchedule(String cronSchedule) {
try {
CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ);
CronParser parser = new CronParser(cronDefinition);
Cron cron = parser.parse(cronSchedule);
cron.validate();
return true;
} catch (IllegalArgumentException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,8 @@ public void testCreateTableWithTableType() throws Exception {
mvc, storageManager, buildGetTableResponseBody(mvcResult), INITIAL_TABLE_VERSION, false);
Assertions.assertEquals(
tableType, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE.getTableType().toString());
RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
RequestAndValidateHelper.deleteTableAndValidateResponse(
mvc, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE);
}

@Test
Expand Down Expand Up @@ -1026,7 +1027,7 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception {
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

ReplicationConfig replicationConfig =
ReplicationConfig.builder().destination("clusterA").interval("").build();
ReplicationConfig.builder().destination("clusterA").interval("12H").build();
Replication replication =
Replication.builder().config(Arrays.asList(replicationConfig)).build();
Policies newPolicies = Policies.builder().replication(replication).build();
Expand All @@ -1051,9 +1052,16 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception {
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

Assertions.assertNotEquals(currentPolicies, updatedPolicies);
Assertions.assertEquals(
updatedPolicies.get("replication").get("config").toString(),
"[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]");

LinkedHashMap<String, String> updatedReplication =
JsonPath.read(
mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]");

Assertions.assertEquals(updatedReplication.get("destination"), "clusterA");
Assertions.assertEquals(updatedReplication.get("interval"), "12H");
Assertions.assertTrue(
RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule")));

RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}

Expand Down Expand Up @@ -1108,16 +1116,23 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception {
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

Assertions.assertNotEquals(currentPolicies, updatedPolicies);
Assertions.assertEquals(
updatedPolicies.get("replication").get("config").toString(),
"[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]");
Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4);
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"),
"timestampCol");
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"),
"yyyy-MM-dd");

LinkedHashMap<String, String> updatedReplication =
JsonPath.read(
mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]");

Assertions.assertEquals(updatedReplication.get("destination"), "clusterA");
Assertions.assertEquals(updatedReplication.get("interval"), "1D");
Assertions.assertTrue(
RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule")));

RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}

Expand Down Expand Up @@ -1158,9 +1173,24 @@ public void testUpdateSucceedsForMultipleReplicationConfig() throws Exception {
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

Assertions.assertNotEquals(currentPolicies, updatedPolicies);
Assertions.assertEquals(
updatedPolicies.get("replication").get("config").toString(),
"[{\"destination\":\"clusterA\",\"interval\":\"1D\"},{\"destination\":\"clusterB\",\"interval\":\"12H\"}]");

LinkedHashMap<String, String> updatedReplication =
JsonPath.read(
mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]");

Assertions.assertEquals(updatedReplication.get("destination"), "clusterA");
Assertions.assertEquals(updatedReplication.get("interval"), "1D");
Assertions.assertTrue(
RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule")));
updatedReplication =
JsonPath.read(
mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[1]");

Assertions.assertEquals(updatedReplication.get("destination"), "clusterB");
Assertions.assertEquals(updatedReplication.get("interval"), "12H");
Assertions.assertTrue(
RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule")));

RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public void testToPoliciesSpecJson() {
Assertions.assertEquals(
JsonPath.read(policiesSpec, "$.replication.config[0].destination"),
TableModelConstants.TABLE_POLICIES.getReplication().getConfig().get(0).getDestination());
Assertions.assertEquals(
JsonPath.read(policiesSpec, "$.replication.config[0].interval"),
TableModelConstants.TABLE_POLICIES.getReplication().getConfig().get(0).getInterval());
}

@Test
Expand Down

0 comments on commit 9be2378

Please sign in to comment.