diff --git a/services/tables/build.gradle b/services/tables/build.gradle index d78b8bdb..714fe7bd 100644 --- a/services/tables/build.gradle +++ b/services/tables/build.gradle @@ -31,6 +31,7 @@ dependencies { implementation project(':cluster:metrics') implementation 'org.springframework.security:spring-security-config:5.7.2' implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' + implementation 'com.cronutils:cron-utils:9.2.0' testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version testImplementation 'org.springframework.security:spring-security-test:5.7.3' testImplementation(testFixtures(project(':services:common'))) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java index 3bb69284..793bdce6 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java @@ -26,7 +26,11 @@ public class ReplicationConfig { @Schema( description = "Optional parameter interval at which the replication job should run. Default value is 1D", - example = "1D") + example = "12H, 1D, 2D, 3D") @Valid String interval; + + @Schema(description = "Cron schedule generated from the interval.", example = "0 0 1/1 * ? *") + @Valid + String cronSchedule; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/TimePartitionSpec.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/TimePartitionSpec.java index 926e2688..dc1ceeb5 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/TimePartitionSpec.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/TimePartitionSpec.java @@ -31,10 +31,17 @@ public class TimePartitionSpec { @NotNull(message = "granularity cannot be null") Granularity granularity; + @Getter public enum Granularity { - HOUR, - DAY, - MONTH, - YEAR + HOUR("H"), + DAY("D"), + MONTH("M"), + YEAR("Y"); + + private final String granularity; + + Granularity(String granularity) { + this.granularity = granularity; + } } } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java index 065916df..258365dd 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java @@ -10,6 +10,7 @@ import com.linkedin.openhouse.tables.common.DefaultColumnPattern; import com.linkedin.openhouse.tables.common.ReplicationInterval; import com.linkedin.openhouse.tables.model.TableDto; +import com.linkedin.openhouse.tables.utils.IntervalToCronConverter; import java.util.List; import java.util.stream.Collectors; import org.mapstruct.Mapper; @@ -106,8 +107,8 @@ public Policies mapPolicies(Policies policies) { /** * mapRetentionPolicies is a mapStruct function which assigns default interval value in - * replication config if the interval is empty. Default values for pattern are defined at {@link - * ReplicationInterval}. + * replication config if the interval is empty and the generated cron schedule from the interval + * value. Default values for pattern are defined at {@link ReplicationInterval}. * * @param replicationPolicy config for Openhouse table * @return mapped policies object @@ -122,12 +123,22 @@ private Replication mapReplicationPolicies(Replication replicationPolicy) { return replication .toBuilder() .interval(ReplicationInterval.DEFAULT.getInterval()) + .cronSchedule( + IntervalToCronConverter.generateCronExpression( + ReplicationInterval.DEFAULT.getInterval())) + .build(); + } + if (replication.getCronSchedule() == null) { + return replication + .toBuilder() + .cronSchedule( + IntervalToCronConverter.generateCronExpression( + replication.getInterval())) .build(); } return replication; }) .collect(Collectors.toList()); - return replicationPolicy.toBuilder().config(replicationConfig).build(); } return replicationPolicy; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java new file mode 100644 index 00000000..08bc6bc5 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java @@ -0,0 +1,75 @@ +package com.linkedin.openhouse.tables.utils; + +import com.cronutils.builder.CronBuilder; +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.field.expression.FieldExpressionFactory; +import com.linkedin.openhouse.common.exception.RequestValidationFailureException; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; +import java.util.Random; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** Utility class for generating a cron schedule given an interval for which a job should run */ +@Slf4j +@Component +public class IntervalToCronConverter { + + /** + * Public api to generate a cron schedule for a {@link ReplicationConfig} based on a given + * interval string in the form 12H, 1D, 2D, 3D. + * + * @param interval + * @return schedule + */ + public static String generateCronExpression(String interval) { + if (interval == null || interval.isEmpty()) { + String errorMessage = "Replication interval is null or empty"; + log.error(errorMessage); + throw new RequestValidationFailureException(errorMessage); + } + int count = Integer.parseInt(interval.substring(0, interval.length() - 1)); + // Generating random hourly and minute intervals for the cron expression + int hour = new Random().nextInt(24); + int minute = new int[] {0, 15, 30, 45}[new Random().nextInt(4)]; + + String granularity = interval.substring(interval.length() - 1); + String schedule; + + if (granularity.equals(TimePartitionSpec.Granularity.HOUR.getGranularity())) { + schedule = generateHourlyCronExpression(hour, minute, count); + } else { + schedule = generateDailyCronExpression(hour, minute, count); + } + return schedule; + } + + private static String generateDailyCronExpression(int hour, int minute, int dailyInterval) { + return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) + .withYear(FieldExpressionFactory.always()) + .withDoM(FieldExpressionFactory.questionMark()) + .withDoW(FieldExpressionFactory.every(dailyInterval)) + .withMonth(FieldExpressionFactory.always()) + .withHour(FieldExpressionFactory.on(hour)) + .withMinute(FieldExpressionFactory.on(minute)) + .withSecond(FieldExpressionFactory.on(0)) + .instance() + .asString(); + } + + private static String generateHourlyCronExpression(int hour, int minute, int hourlyInterval) { + return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) + .withYear(FieldExpressionFactory.always()) + .withDoM(FieldExpressionFactory.questionMark()) + .withDoW(FieldExpressionFactory.always()) + .withMonth(FieldExpressionFactory.always()) + .withHour(FieldExpressionFactory.every(hourlyInterval)) + .withMinute(FieldExpressionFactory.on(minute)) + .withSecond(FieldExpressionFactory.on(0)) + .instance() + .asString() + .replace( + String.format("*/%d", hourlyInterval), String.format("%d/%d", hour, hourlyInterval)); + } +} diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RequestAndValidateHelper.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RequestAndValidateHelper.java index 507d0441..abb51cd9 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RequestAndValidateHelper.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RequestAndValidateHelper.java @@ -10,6 +10,11 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import com.cronutils.model.Cron; +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinition; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.parser.CronParser; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.jayway.jsonpath.JsonPath; @@ -324,4 +329,16 @@ static MvcResult putSnapshotsAndValidateResponse( validateMetadataInPutSnapshotsRequest(result, icebergSnapshotsRequestBody); return result; } + + static boolean validateCronSchedule(String cronSchedule) { + try { + CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ); + CronParser parser = new CronParser(cronDefinition); + Cron cron = parser.parse(cronSchedule); + cron.validate(); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java index fc474e41..5467155c 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java @@ -773,7 +773,8 @@ public void testCreateTableWithTableType() throws Exception { mvc, storageManager, buildGetTableResponseBody(mvcResult), INITIAL_TABLE_VERSION, false); Assertions.assertEquals( tableType, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE.getTableType().toString()); - RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); + RequestAndValidateHelper.deleteTableAndValidateResponse( + mvc, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE); } @Test @@ -1026,7 +1027,7 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception { JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); ReplicationConfig replicationConfig = - ReplicationConfig.builder().destination("clusterA").interval("").build(); + ReplicationConfig.builder().destination("clusterA").interval("12H").build(); Replication replication = Replication.builder().config(Arrays.asList(replicationConfig)).build(); Policies newPolicies = Policies.builder().replication(replication).build(); @@ -1051,9 +1052,16 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception { JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); Assertions.assertNotEquals(currentPolicies, updatedPolicies); - Assertions.assertEquals( - updatedPolicies.get("replication").get("config").toString(), - "[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]"); + + LinkedHashMap updatedReplication = + JsonPath.read( + mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]"); + + Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); + Assertions.assertEquals(updatedReplication.get("interval"), "12H"); + Assertions.assertTrue( + RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule"))); + RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } @@ -1108,9 +1116,6 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception { JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); Assertions.assertNotEquals(currentPolicies, updatedPolicies); - Assertions.assertEquals( - updatedPolicies.get("replication").get("config").toString(), - "[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]"); Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4); Assertions.assertEquals( ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"), @@ -1118,6 +1123,16 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception { Assertions.assertEquals( ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"), "yyyy-MM-dd"); + + LinkedHashMap updatedReplication = + JsonPath.read( + mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]"); + + Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); + Assertions.assertEquals(updatedReplication.get("interval"), "1D"); + Assertions.assertTrue( + RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule"))); + RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } @@ -1158,9 +1173,24 @@ public void testUpdateSucceedsForMultipleReplicationConfig() throws Exception { JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); Assertions.assertNotEquals(currentPolicies, updatedPolicies); - Assertions.assertEquals( - updatedPolicies.get("replication").get("config").toString(), - "[{\"destination\":\"clusterA\",\"interval\":\"1D\"},{\"destination\":\"clusterB\",\"interval\":\"12H\"}]"); + + LinkedHashMap updatedReplication = + JsonPath.read( + mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]"); + + Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); + Assertions.assertEquals(updatedReplication.get("interval"), "1D"); + Assertions.assertTrue( + RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule"))); + updatedReplication = + JsonPath.read( + mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[1]"); + + Assertions.assertEquals(updatedReplication.get("destination"), "clusterB"); + Assertions.assertEquals(updatedReplication.get("interval"), "12H"); + Assertions.assertTrue( + RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule"))); + RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java index 3f6861a2..94953334 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java @@ -37,6 +37,9 @@ public void testToPoliciesSpecJson() { Assertions.assertEquals( JsonPath.read(policiesSpec, "$.replication.config[0].destination"), TableModelConstants.TABLE_POLICIES.getReplication().getConfig().get(0).getDestination()); + Assertions.assertEquals( + JsonPath.read(policiesSpec, "$.replication.config[0].interval"), + TableModelConstants.TABLE_POLICIES.getReplication().getConfig().get(0).getInterval()); } @Test