Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert interval parameter to cron schedule for self serve replication #238

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/tables/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
implementation project(':cluster:metrics')
implementation 'org.springframework.security:spring-security-config:5.7.2'
implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
implementation 'com.cronutils:cron-utils:9.2.0'
testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version
testImplementation 'org.springframework.security:spring-security-test:5.7.3'
testImplementation(testFixtures(project(':services:common')))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ public class ReplicationConfig {
@Schema(
description =
"Optional parameter interval at which the replication job should run. Default value is 1D",
example = "1D")
example = "12H, 1D, 2D, 3D")
@Valid
String interval;

@Schema(description = "Cron schedule generated from the interval.", example = "0 0 1/1 * ? *")
@Valid
String cronSchedule;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ public class TimePartitionSpec {
@NotNull(message = "granularity cannot be null")
Granularity granularity;

@Getter
public enum Granularity {
HOUR,
DAY,
MONTH,
YEAR
HOUR("H"),
DAY("D"),
MONTH("M"),
YEAR("Y");

private final String granularity;

Granularity(String granularity) {
this.granularity = granularity;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.openhouse.tables.common.DefaultColumnPattern;
import com.linkedin.openhouse.tables.common.ReplicationInterval;
import com.linkedin.openhouse.tables.model.TableDto;
import com.linkedin.openhouse.tables.utils.IntervalToCronConverter;
import java.util.List;
import java.util.stream.Collectors;
import org.mapstruct.Mapper;
Expand Down Expand Up @@ -106,8 +107,8 @@ public Policies mapPolicies(Policies policies) {

/**
* mapRetentionPolicies is a mapStruct function which assigns default interval value in
* replication config if the interval is empty. Default values for pattern are defined at {@link
* ReplicationInterval}.
* replication config if the interval is empty and the generated cron schedule from the interval
* value. Default values for pattern are defined at {@link ReplicationInterval}.
*
* @param replicationPolicy config for Openhouse table
* @return mapped policies object
Expand All @@ -122,12 +123,22 @@ private Replication mapReplicationPolicies(Replication replicationPolicy) {
return replication
.toBuilder()
.interval(ReplicationInterval.DEFAULT.getInterval())
.cronSchedule(
IntervalToCronConverter.generateCronExpression(
ReplicationInterval.DEFAULT.getInterval()))
.build();
}
if (replication.getCronSchedule() == null) {
return replication
.toBuilder()
.cronSchedule(
IntervalToCronConverter.generateCronExpression(
replication.getInterval()))
.build();
}
return replication;
})
.collect(Collectors.toList());

return replicationPolicy.toBuilder().config(replicationConfig).build();
}
return replicationPolicy;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.linkedin.openhouse.tables.utils;

import com.cronutils.builder.CronBuilder;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.field.expression.FieldExpressionFactory;
import com.linkedin.openhouse.common.exception.RequestValidationFailureException;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec;
import java.util.Random;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/** Utility class for generating a cron schedule given an interval for which a job should run */
@Slf4j
@Component
public class IntervalToCronConverter {

/**
* Public api to generate a cron schedule for a {@link ReplicationConfig} based on a given
* interval string in the form 12H, 1D, 2D, 3D.
*
* @param interval
* @return schedule
*/
public static String generateCronExpression(String interval) {
chenselena marked this conversation as resolved.
Show resolved Hide resolved
if (interval == null || interval.isEmpty()) {
String errorMessage = "Replication interval is null or empty";
log.error(errorMessage);
throw new RequestValidationFailureException(errorMessage);
}
int count = Integer.parseInt(interval.substring(0, interval.length() - 1));
// Generating random hourly and minute intervals for the cron expression
int hour = new Random().nextInt(24);
chenselena marked this conversation as resolved.
Show resolved Hide resolved
chenselena marked this conversation as resolved.
Show resolved Hide resolved
int minute = new int[] {0, 15, 30, 45}[new Random().nextInt(4)];
chenselena marked this conversation as resolved.
Show resolved Hide resolved

String granularity = interval.substring(interval.length() - 1);
String schedule;

if (granularity.equals(TimePartitionSpec.Granularity.HOUR.getGranularity())) {
schedule = generateHourlyCronExpression(hour, minute, count);
} else {
schedule = generateDailyCronExpression(hour, minute, count);
}
return schedule;
}

private static String generateDailyCronExpression(int hour, int minute, int dailyInterval) {
return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
.withYear(FieldExpressionFactory.always())
.withDoM(FieldExpressionFactory.questionMark())
.withDoW(FieldExpressionFactory.every(dailyInterval))
.withMonth(FieldExpressionFactory.always())
.withHour(FieldExpressionFactory.on(hour))
.withMinute(FieldExpressionFactory.on(minute))
.withSecond(FieldExpressionFactory.on(0))
.instance()
.asString();
}

private static String generateHourlyCronExpression(int hour, int minute, int hourlyInterval) {
return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
.withYear(FieldExpressionFactory.always())
.withDoM(FieldExpressionFactory.questionMark())
.withDoW(FieldExpressionFactory.always())
.withMonth(FieldExpressionFactory.always())
.withHour(FieldExpressionFactory.every(hourlyInterval))
.withMinute(FieldExpressionFactory.on(minute))
.withSecond(FieldExpressionFactory.on(0))
.instance()
.asString()
.replace(
String.format("*/%d", hourlyInterval), String.format("%d/%d", hour, hourlyInterval));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinition;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.jayway.jsonpath.JsonPath;
Expand Down Expand Up @@ -324,4 +329,16 @@ static MvcResult putSnapshotsAndValidateResponse(
validateMetadataInPutSnapshotsRequest(result, icebergSnapshotsRequestBody);
return result;
}

static boolean validateCronSchedule(String cronSchedule) {
try {
CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ);
CronParser parser = new CronParser(cronDefinition);
Cron cron = parser.parse(cronSchedule);
cron.validate();
return true;
} catch (IllegalArgumentException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,8 @@ public void testCreateTableWithTableType() throws Exception {
mvc, storageManager, buildGetTableResponseBody(mvcResult), INITIAL_TABLE_VERSION, false);
Assertions.assertEquals(
tableType, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE.getTableType().toString());
RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
RequestAndValidateHelper.deleteTableAndValidateResponse(
mvc, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE);
}

@Test
Expand Down Expand Up @@ -1026,7 +1027,7 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception {
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

ReplicationConfig replicationConfig =
ReplicationConfig.builder().destination("clusterA").interval("").build();
ReplicationConfig.builder().destination("clusterA").interval("12H").build();
Replication replication =
Replication.builder().config(Arrays.asList(replicationConfig)).build();
Policies newPolicies = Policies.builder().replication(replication).build();
Expand All @@ -1051,9 +1052,16 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception {
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

Assertions.assertNotEquals(currentPolicies, updatedPolicies);
Assertions.assertEquals(
updatedPolicies.get("replication").get("config").toString(),
"[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]");

LinkedHashMap<String, String> updatedReplication =
JsonPath.read(
mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]");

Assertions.assertEquals(updatedReplication.get("destination"), "clusterA");
Assertions.assertEquals(updatedReplication.get("interval"), "12H");
Assertions.assertTrue(
RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule")));

RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}

Expand Down Expand Up @@ -1108,16 +1116,23 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception {
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

Assertions.assertNotEquals(currentPolicies, updatedPolicies);
Assertions.assertEquals(
updatedPolicies.get("replication").get("config").toString(),
"[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]");
Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4);
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"),
"timestampCol");
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"),
"yyyy-MM-dd");

LinkedHashMap<String, String> updatedReplication =
JsonPath.read(
mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]");

Assertions.assertEquals(updatedReplication.get("destination"), "clusterA");
Assertions.assertEquals(updatedReplication.get("interval"), "1D");
Assertions.assertTrue(
RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule")));

RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}

Expand Down Expand Up @@ -1158,9 +1173,24 @@ public void testUpdateSucceedsForMultipleReplicationConfig() throws Exception {
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

Assertions.assertNotEquals(currentPolicies, updatedPolicies);
Assertions.assertEquals(
updatedPolicies.get("replication").get("config").toString(),
"[{\"destination\":\"clusterA\",\"interval\":\"1D\"},{\"destination\":\"clusterB\",\"interval\":\"12H\"}]");

LinkedHashMap<String, String> updatedReplication =
JsonPath.read(
mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]");

Assertions.assertEquals(updatedReplication.get("destination"), "clusterA");
Assertions.assertEquals(updatedReplication.get("interval"), "1D");
Assertions.assertTrue(
RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule")));
updatedReplication =
JsonPath.read(
mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[1]");

Assertions.assertEquals(updatedReplication.get("destination"), "clusterB");
Assertions.assertEquals(updatedReplication.get("interval"), "12H");
Assertions.assertTrue(
RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule")));

RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public void testToPoliciesSpecJson() {
Assertions.assertEquals(
JsonPath.read(policiesSpec, "$.replication.config[0].destination"),
TableModelConstants.TABLE_POLICIES.getReplication().getConfig().get(0).getDestination());
Assertions.assertEquals(
JsonPath.read(policiesSpec, "$.replication.config[0].interval"),
TableModelConstants.TABLE_POLICIES.getReplication().getConfig().get(0).getInterval());
}

@Test
Expand Down
Loading