From 6347499722d60fd3be6dff26e42f13e65292f656 Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Wed, 23 Oct 2024 10:01:16 -0700 Subject: [PATCH 1/6] WIP --- services/tables/build.gradle | 1 + .../request/components/ReplicationConfig.java | 11 ++++ .../mapper/iceberg/PoliciesSpecMapper.java | 14 ++++- .../tables/utils/CronScheduleGenerator.java | 56 +++++++++++++++++++ .../tables/e2e/h2/TablesControllerTest.java | 30 +++++++--- .../mock/mapper/PoliciesSpecMapperTest.java | 3 + 6 files changed, 105 insertions(+), 10 deletions(-) create mode 100644 services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java diff --git a/services/tables/build.gradle b/services/tables/build.gradle index d78b8bdb..714fe7bd 100644 --- a/services/tables/build.gradle +++ b/services/tables/build.gradle @@ -31,6 +31,7 @@ dependencies { implementation project(':cluster:metrics') implementation 'org.springframework.security:spring-security-config:5.7.2' implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' + implementation 'com.cronutils:cron-utils:9.2.0' testImplementation 'org.junit.jupiter:junit-jupiter-engine:' + junit_version testImplementation 'org.springframework.security:spring-security-test:5.7.3' testImplementation(testFixtures(project(':services:common'))) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java index 3bb69284..952f6d82 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java @@ -29,4 +29,15 @@ public class ReplicationConfig { example = "1D") @Valid String interval; + + @Schema( + description = "Cron schedule generated from interval used for replication job scheduling", + example = "Schedule could be '0 3/12 * * *' if interval is 12H") + @Valid + String cronSchedule; + + public enum Granularity { + H, + D + } } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java index 065916df..89969307 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java @@ -10,6 +10,7 @@ import com.linkedin.openhouse.tables.common.DefaultColumnPattern; import com.linkedin.openhouse.tables.common.ReplicationInterval; import com.linkedin.openhouse.tables.model.TableDto; +import com.linkedin.openhouse.tables.utils.CronScheduleGenerator; import java.util.List; import java.util.stream.Collectors; import org.mapstruct.Mapper; @@ -106,8 +107,8 @@ public Policies mapPolicies(Policies policies) { /** * mapRetentionPolicies is a mapStruct function which assigns default interval value in - * replication config if the interval is empty. Default values for pattern are defined at {@link - * ReplicationInterval}. + * replication config if the interval is empty and the generated cron schedule from the interval + * value. Default values for pattern are defined at {@link ReplicationInterval}. * * @param replicationPolicy config for Openhouse table * @return mapped policies object @@ -122,9 +123,16 @@ private Replication mapReplicationPolicies(Replication replicationPolicy) { return replication .toBuilder() .interval(ReplicationInterval.DEFAULT.getInterval()) + .cronSchedule( + CronScheduleGenerator.buildCronExpression( + ReplicationInterval.DEFAULT.getInterval())) .build(); } - return replication; + return replication + .toBuilder() + .cronSchedule( + CronScheduleGenerator.buildCronExpression(replication.getInterval())) + .build(); }) .collect(Collectors.toList()); diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java new file mode 100644 index 00000000..d05f8316 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java @@ -0,0 +1,56 @@ +package com.linkedin.openhouse.tables.utils; + +import com.cronutils.builder.CronBuilder; +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.field.expression.FieldExpressionFactory; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; +import java.util.Random; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * Utility class for generating cron schedules based on interval input for {@link + * ReplicationConfig}. + */ +@Slf4j +@Component +public class CronScheduleGenerator { + public static String buildCronExpression(String interval) { + int count = Integer.parseInt(interval.substring(0, interval.length() - 1)); + String granularity = interval.substring(interval.length() - 1); + int hourSchedule = new Random().nextInt(24); + String schedule; + + if (granularity.equals(ReplicationConfig.Granularity.H.toString())) { + schedule = buildHourlyCronExpression(hourSchedule, count); + } else { + schedule = buildDailyCronExpression(hourSchedule, count); + } + return schedule; + } + + private static String buildDailyCronExpression(int hourSchedule, int dailyInterval) { + return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.CRON4J)) + .withDoM(FieldExpressionFactory.every(dailyInterval)) + .withMonth(FieldExpressionFactory.always()) + .withDoW(FieldExpressionFactory.always()) + .withHour(FieldExpressionFactory.on(hourSchedule)) + .withMinute(FieldExpressionFactory.on(0)) + .instance() + .asString(); + } + + private static String buildHourlyCronExpression(int hourSchedule, int hourlyInterval) { + return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.CRON4J)) + .withDoM(FieldExpressionFactory.always()) + .withMonth(FieldExpressionFactory.always()) + .withDoW(FieldExpressionFactory.always()) + .withHour( + FieldExpressionFactory.every(hourlyInterval) + .and(FieldExpressionFactory.on(hourSchedule))) + .withMinute(FieldExpressionFactory.on(0)) + .instance() + .asString(); + } +} diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java index fc474e41..9737b643 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java @@ -1026,7 +1026,7 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception { JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); ReplicationConfig replicationConfig = - ReplicationConfig.builder().destination("clusterA").interval("").build(); + ReplicationConfig.builder().destination("clusterA").interval("12H").build(); Replication replication = Replication.builder().config(Arrays.asList(replicationConfig)).build(); Policies newPolicies = Policies.builder().replication(replication).build(); @@ -1051,9 +1051,14 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception { JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); Assertions.assertNotEquals(currentPolicies, updatedPolicies); - Assertions.assertEquals( - updatedPolicies.get("replication").get("config").toString(), - "[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]"); + + LinkedHashMap updatedReplication = + JsonPath.read( + mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]"); + + Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); + Assertions.assertEquals(updatedReplication.get("interval"), "12H"); + System.out.println(); RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } @@ -1158,9 +1163,20 @@ public void testUpdateSucceedsForMultipleReplicationConfig() throws Exception { JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); Assertions.assertNotEquals(currentPolicies, updatedPolicies); - Assertions.assertEquals( - updatedPolicies.get("replication").get("config").toString(), - "[{\"destination\":\"clusterA\",\"interval\":\"1D\"},{\"destination\":\"clusterB\",\"interval\":\"12H\"}]"); + + LinkedHashMap updatedReplication = + JsonPath.read( + mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]"); + + Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); + Assertions.assertEquals(updatedReplication.get("interval"), "1D"); + + updatedReplication = + JsonPath.read( + mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[1]"); + + Assertions.assertEquals(updatedReplication.get("destination"), "clusterB"); + Assertions.assertEquals(updatedReplication.get("interval"), "12H"); RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java index 3f6861a2..94953334 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java @@ -37,6 +37,9 @@ public void testToPoliciesSpecJson() { Assertions.assertEquals( JsonPath.read(policiesSpec, "$.replication.config[0].destination"), TableModelConstants.TABLE_POLICIES.getReplication().getConfig().get(0).getDestination()); + Assertions.assertEquals( + JsonPath.read(policiesSpec, "$.replication.config[0].interval"), + TableModelConstants.TABLE_POLICIES.getReplication().getConfig().get(0).getInterval()); } @Test From 3a197f14d461c333a3116a358b98bce5cc4f3975 Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Wed, 23 Oct 2024 21:10:10 -0700 Subject: [PATCH 2/6] Add cron schedule generator to generate from interval --- .../request/components/ReplicationConfig.java | 11 +--- .../mapper/iceberg/PoliciesSpecMapper.java | 14 +++-- .../tables/utils/CronScheduleGenerator.java | 58 ++++++++++++------- .../tables/e2e/h2/TablesControllerTest.java | 11 ++-- 4 files changed, 52 insertions(+), 42 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java index 952f6d82..793bdce6 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java @@ -26,18 +26,11 @@ public class ReplicationConfig { @Schema( description = "Optional parameter interval at which the replication job should run. Default value is 1D", - example = "1D") + example = "12H, 1D, 2D, 3D") @Valid String interval; - @Schema( - description = "Cron schedule generated from interval used for replication job scheduling", - example = "Schedule could be '0 3/12 * * *' if interval is 12H") + @Schema(description = "Cron schedule generated from the interval.", example = "0 0 1/1 * ? *") @Valid String cronSchedule; - - public enum Granularity { - H, - D - } } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java index 89969307..5d8c7f85 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java @@ -128,14 +128,16 @@ private Replication mapReplicationPolicies(Replication replicationPolicy) { ReplicationInterval.DEFAULT.getInterval())) .build(); } - return replication - .toBuilder() - .cronSchedule( - CronScheduleGenerator.buildCronExpression(replication.getInterval())) - .build(); + if (replication.getCronSchedule() == null) { + return replication + .toBuilder() + .cronSchedule( + CronScheduleGenerator.buildCronExpression(replication.getInterval())) + .build(); + } + return replication; }) .collect(Collectors.toList()); - return replicationPolicy.toBuilder().config(replicationConfig).build(); } return replicationPolicy; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java index d05f8316..c4efe4e2 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java @@ -3,54 +3,70 @@ import com.cronutils.builder.CronBuilder; import com.cronutils.model.CronType; import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.model.field.expression.FieldExpression; import com.cronutils.model.field.expression.FieldExpressionFactory; +import com.linkedin.openhouse.common.exception.RequestValidationFailureException; import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; import java.util.Random; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -/** - * Utility class for generating cron schedules based on interval input for {@link - * ReplicationConfig}. - */ +/** Utility class for generating a cron schedule given an interval for which a job should run */ @Slf4j @Component public class CronScheduleGenerator { + + /** + * Public api to generate a cron schedule for a {@link ReplicationConfig} based on a given + * interval string in the form 12H, 1D, 2D, 3D. + * + * @param interval + * @return schedule + */ public static String buildCronExpression(String interval) { + if (interval == null || interval.isEmpty()) { + log.error("Replication interval is null or empty"); + throw new RequestValidationFailureException( + String.format("Replication interval is null or empty")); + } int count = Integer.parseInt(interval.substring(0, interval.length() - 1)); + int hour = new Random().nextInt(24); String granularity = interval.substring(interval.length() - 1); - int hourSchedule = new Random().nextInt(24); String schedule; - if (granularity.equals(ReplicationConfig.Granularity.H.toString())) { - schedule = buildHourlyCronExpression(hourSchedule, count); + if (granularity.equals("H")) { + schedule = buildHourlyCronExpression(hour, count); } else { - schedule = buildDailyCronExpression(hourSchedule, count); + schedule = buildDailyCronExpression(hour, count); } return schedule; } - private static String buildDailyCronExpression(int hourSchedule, int dailyInterval) { - return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.CRON4J)) - .withDoM(FieldExpressionFactory.every(dailyInterval)) + private static String buildDailyCronExpression(int hour, int dailyInterval) { + return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) + .withYear(FieldExpressionFactory.always()) + .withDoM(FieldExpression.questionMark()) + .withDoW(FieldExpressionFactory.every(dailyInterval)) .withMonth(FieldExpressionFactory.always()) - .withDoW(FieldExpressionFactory.always()) - .withHour(FieldExpressionFactory.on(hourSchedule)) + .withHour(FieldExpressionFactory.on(hour)) .withMinute(FieldExpressionFactory.on(0)) + .withSecond(FieldExpressionFactory.on(0)) .instance() .asString(); } - private static String buildHourlyCronExpression(int hourSchedule, int hourlyInterval) { - return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.CRON4J)) - .withDoM(FieldExpressionFactory.always()) - .withMonth(FieldExpressionFactory.always()) + private static String buildHourlyCronExpression(int hour, int hourlyInterval) { + return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) + .withYear(FieldExpressionFactory.always()) + .withDoM(FieldExpressionFactory.questionMark()) .withDoW(FieldExpressionFactory.always()) - .withHour( - FieldExpressionFactory.every(hourlyInterval) - .and(FieldExpressionFactory.on(hourSchedule))) + .withMonth(FieldExpressionFactory.always()) + .withHour(FieldExpressionFactory.every(hourlyInterval)) .withMinute(FieldExpressionFactory.on(0)) + .withSecond(FieldExpressionFactory.on(0)) .instance() - .asString(); + .asString() + .replace( + String.format("*/%d", hourlyInterval), String.format("%d/%d", hour, hourlyInterval)); } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java index 9737b643..58393ba1 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java @@ -773,7 +773,8 @@ public void testCreateTableWithTableType() throws Exception { mvc, storageManager, buildGetTableResponseBody(mvcResult), INITIAL_TABLE_VERSION, false); Assertions.assertEquals( tableType, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE.getTableType().toString()); - RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); + RequestAndValidateHelper.deleteTableAndValidateResponse( + mvc, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE); } @Test @@ -1058,7 +1059,7 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception { Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); Assertions.assertEquals(updatedReplication.get("interval"), "12H"); - System.out.println(); + Assertions.assertNotNull(updatedReplication.get("cronSchedule")); RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } @@ -1113,9 +1114,6 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception { JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); Assertions.assertNotEquals(currentPolicies, updatedPolicies); - Assertions.assertEquals( - updatedPolicies.get("replication").get("config").toString(), - "[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]"); Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4); Assertions.assertEquals( ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"), @@ -1170,13 +1168,14 @@ public void testUpdateSucceedsForMultipleReplicationConfig() throws Exception { Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); Assertions.assertEquals(updatedReplication.get("interval"), "1D"); - + Assertions.assertNotNull(updatedReplication.get("cronSchedule")); updatedReplication = JsonPath.read( mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[1]"); Assertions.assertEquals(updatedReplication.get("destination"), "clusterB"); Assertions.assertEquals(updatedReplication.get("interval"), "12H"); + Assertions.assertNotNull(updatedReplication.get("cronSchedule")); RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } } From d5ab8150e36f8dc124748822b9fd4a09d491b9f9 Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Mon, 28 Oct 2024 14:35:11 -0700 Subject: [PATCH 3/6] Update naming and add additional randomization for cron schedule minute --- .../request/components/ReplicationConfig.java | 5 ++++ .../mapper/iceberg/PoliciesSpecMapper.java | 7 +++-- ...ator.java => IntervalToCronConverter.java} | 29 ++++++++++--------- 3 files changed, 24 insertions(+), 17 deletions(-) rename services/tables/src/main/java/com/linkedin/openhouse/tables/utils/{CronScheduleGenerator.java => IntervalToCronConverter.java} (70%) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java index 793bdce6..e8865457 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java @@ -33,4 +33,9 @@ public class ReplicationConfig { @Schema(description = "Cron schedule generated from the interval.", example = "0 0 1/1 * ? *") @Valid String cronSchedule; + + public enum Granularity { + H, + D + } } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java index 5d8c7f85..258365dd 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java @@ -10,7 +10,7 @@ import com.linkedin.openhouse.tables.common.DefaultColumnPattern; import com.linkedin.openhouse.tables.common.ReplicationInterval; import com.linkedin.openhouse.tables.model.TableDto; -import com.linkedin.openhouse.tables.utils.CronScheduleGenerator; +import com.linkedin.openhouse.tables.utils.IntervalToCronConverter; import java.util.List; import java.util.stream.Collectors; import org.mapstruct.Mapper; @@ -124,7 +124,7 @@ private Replication mapReplicationPolicies(Replication replicationPolicy) { .toBuilder() .interval(ReplicationInterval.DEFAULT.getInterval()) .cronSchedule( - CronScheduleGenerator.buildCronExpression( + IntervalToCronConverter.generateCronExpression( ReplicationInterval.DEFAULT.getInterval())) .build(); } @@ -132,7 +132,8 @@ private Replication mapReplicationPolicies(Replication replicationPolicy) { return replication .toBuilder() .cronSchedule( - CronScheduleGenerator.buildCronExpression(replication.getInterval())) + IntervalToCronConverter.generateCronExpression( + replication.getInterval())) .build(); } return replication; diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java similarity index 70% rename from services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java rename to services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java index c4efe4e2..bce0d478 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/CronScheduleGenerator.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java @@ -3,7 +3,6 @@ import com.cronutils.builder.CronBuilder; import com.cronutils.model.CronType; import com.cronutils.model.definition.CronDefinitionBuilder; -import com.cronutils.model.field.expression.FieldExpression; import com.cronutils.model.field.expression.FieldExpressionFactory; import com.linkedin.openhouse.common.exception.RequestValidationFailureException; import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; @@ -14,7 +13,7 @@ /** Utility class for generating a cron schedule given an interval for which a job should run */ @Slf4j @Component -public class CronScheduleGenerator { +public class IntervalToCronConverter { /** * Public api to generate a cron schedule for a {@link ReplicationConfig} based on a given @@ -23,46 +22,48 @@ public class CronScheduleGenerator { * @param interval * @return schedule */ - public static String buildCronExpression(String interval) { + public static String generateCronExpression(String interval) { if (interval == null || interval.isEmpty()) { - log.error("Replication interval is null or empty"); - throw new RequestValidationFailureException( - String.format("Replication interval is null or empty")); + String errorMessage = "Replication interval is null or empty"; + log.error(errorMessage); + throw new RequestValidationFailureException(errorMessage); } int count = Integer.parseInt(interval.substring(0, interval.length() - 1)); int hour = new Random().nextInt(24); + int minute = new int[] {0, 15, 30, 45}[new Random().nextInt(4)]; + String granularity = interval.substring(interval.length() - 1); String schedule; - if (granularity.equals("H")) { - schedule = buildHourlyCronExpression(hour, count); + if (granularity.equals(ReplicationConfig.Granularity.H.toString())) { + schedule = generateHourlyCronExpression(hour, minute, count); } else { - schedule = buildDailyCronExpression(hour, count); + schedule = generateDailyCronExpression(hour, minute, count); } return schedule; } - private static String buildDailyCronExpression(int hour, int dailyInterval) { + private static String generateDailyCronExpression(int hour, int minute, int dailyInterval) { return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) .withYear(FieldExpressionFactory.always()) - .withDoM(FieldExpression.questionMark()) + .withDoM(FieldExpressionFactory.questionMark()) .withDoW(FieldExpressionFactory.every(dailyInterval)) .withMonth(FieldExpressionFactory.always()) .withHour(FieldExpressionFactory.on(hour)) - .withMinute(FieldExpressionFactory.on(0)) + .withMinute(FieldExpressionFactory.on(minute)) .withSecond(FieldExpressionFactory.on(0)) .instance() .asString(); } - private static String buildHourlyCronExpression(int hour, int hourlyInterval) { + private static String generateHourlyCronExpression(int hour, int minute, int hourlyInterval) { return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ)) .withYear(FieldExpressionFactory.always()) .withDoM(FieldExpressionFactory.questionMark()) .withDoW(FieldExpressionFactory.always()) .withMonth(FieldExpressionFactory.always()) .withHour(FieldExpressionFactory.every(hourlyInterval)) - .withMinute(FieldExpressionFactory.on(0)) + .withMinute(FieldExpressionFactory.on(minute)) .withSecond(FieldExpressionFactory.on(0)) .instance() .asString() From 4499e984798511bea546ab1ea49119068c8baecd Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Tue, 29 Oct 2024 14:20:50 -0700 Subject: [PATCH 4/6] Update comments and granularity enum for replication --- .../v0/request/components/ReplicationConfig.java | 12 ++++++++++-- .../tables/utils/IntervalToCronConverter.java | 3 ++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java index e8865457..1d14950c 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java @@ -34,8 +34,16 @@ public class ReplicationConfig { @Valid String cronSchedule; + // ENUM for granularity of the replication interval + @Getter public enum Granularity { - H, - D + HOUR("H"), + DAY("D"); + + private String granularity; + + Granularity(String granularity) { + this.granularity = granularity; + } } } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java index bce0d478..30149887 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java @@ -29,13 +29,14 @@ public static String generateCronExpression(String interval) { throw new RequestValidationFailureException(errorMessage); } int count = Integer.parseInt(interval.substring(0, interval.length() - 1)); + // Generating random hourly and minute intervals for the cron expression int hour = new Random().nextInt(24); int minute = new int[] {0, 15, 30, 45}[new Random().nextInt(4)]; String granularity = interval.substring(interval.length() - 1); String schedule; - if (granularity.equals(ReplicationConfig.Granularity.H.toString())) { + if (granularity.equals(ReplicationConfig.Granularity.HOUR.getGranularity())) { schedule = generateHourlyCronExpression(hour, minute, count); } else { schedule = generateDailyCronExpression(hour, minute, count); From ff3d358dca1ed1bacd6411b997913b88d46bb7af Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Tue, 29 Oct 2024 23:23:18 -0700 Subject: [PATCH 5/6] Use existing granularity --- .../v0/request/components/ReplicationConfig.java | 13 ------------- .../v0/request/components/TimePartitionSpec.java | 15 +++++++++++---- .../tables/utils/IntervalToCronConverter.java | 3 ++- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java index 1d14950c..793bdce6 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java @@ -33,17 +33,4 @@ public class ReplicationConfig { @Schema(description = "Cron schedule generated from the interval.", example = "0 0 1/1 * ? *") @Valid String cronSchedule; - - // ENUM for granularity of the replication interval - @Getter - public enum Granularity { - HOUR("H"), - DAY("D"); - - private String granularity; - - Granularity(String granularity) { - this.granularity = granularity; - } - } } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/TimePartitionSpec.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/TimePartitionSpec.java index 926e2688..dc1ceeb5 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/TimePartitionSpec.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/TimePartitionSpec.java @@ -31,10 +31,17 @@ public class TimePartitionSpec { @NotNull(message = "granularity cannot be null") Granularity granularity; + @Getter public enum Granularity { - HOUR, - DAY, - MONTH, - YEAR + HOUR("H"), + DAY("D"), + MONTH("M"), + YEAR("Y"); + + private final String granularity; + + Granularity(String granularity) { + this.granularity = granularity; + } } } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java index 30149887..08bc6bc5 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/utils/IntervalToCronConverter.java @@ -6,6 +6,7 @@ import com.cronutils.model.field.expression.FieldExpressionFactory; import com.linkedin.openhouse.common.exception.RequestValidationFailureException; import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; import java.util.Random; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -36,7 +37,7 @@ public static String generateCronExpression(String interval) { String granularity = interval.substring(interval.length() - 1); String schedule; - if (granularity.equals(ReplicationConfig.Granularity.HOUR.getGranularity())) { + if (granularity.equals(TimePartitionSpec.Granularity.HOUR.getGranularity())) { schedule = generateHourlyCronExpression(hour, minute, count); } else { schedule = generateDailyCronExpression(hour, minute, count); From 5263be525404067ce76c5da17b264f1a8a0766c4 Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Wed, 30 Oct 2024 11:13:19 -0700 Subject: [PATCH 6/6] Update tests --- .../e2e/h2/RequestAndValidateHelper.java | 17 +++++++++++++ .../tables/e2e/h2/TablesControllerTest.java | 25 +++++++++++++++---- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RequestAndValidateHelper.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RequestAndValidateHelper.java index 507d0441..abb51cd9 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RequestAndValidateHelper.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RequestAndValidateHelper.java @@ -10,6 +10,11 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import com.cronutils.model.Cron; +import com.cronutils.model.CronType; +import com.cronutils.model.definition.CronDefinition; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.parser.CronParser; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.jayway.jsonpath.JsonPath; @@ -324,4 +329,16 @@ static MvcResult putSnapshotsAndValidateResponse( validateMetadataInPutSnapshotsRequest(result, icebergSnapshotsRequestBody); return result; } + + static boolean validateCronSchedule(String cronSchedule) { + try { + CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ); + CronParser parser = new CronParser(cronDefinition); + Cron cron = parser.parse(cronSchedule); + cron.validate(); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java index 58393ba1..5467155c 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java @@ -1053,13 +1053,15 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception { Assertions.assertNotEquals(currentPolicies, updatedPolicies); - LinkedHashMap updatedReplication = + LinkedHashMap updatedReplication = JsonPath.read( mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]"); Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); Assertions.assertEquals(updatedReplication.get("interval"), "12H"); - Assertions.assertNotNull(updatedReplication.get("cronSchedule")); + Assertions.assertTrue( + RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule"))); + RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } @@ -1121,6 +1123,16 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception { Assertions.assertEquals( ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"), "yyyy-MM-dd"); + + LinkedHashMap updatedReplication = + JsonPath.read( + mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]"); + + Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); + Assertions.assertEquals(updatedReplication.get("interval"), "1D"); + Assertions.assertTrue( + RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule"))); + RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } @@ -1162,20 +1174,23 @@ public void testUpdateSucceedsForMultipleReplicationConfig() throws Exception { Assertions.assertNotEquals(currentPolicies, updatedPolicies); - LinkedHashMap updatedReplication = + LinkedHashMap updatedReplication = JsonPath.read( mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[0]"); Assertions.assertEquals(updatedReplication.get("destination"), "clusterA"); Assertions.assertEquals(updatedReplication.get("interval"), "1D"); - Assertions.assertNotNull(updatedReplication.get("cronSchedule")); + Assertions.assertTrue( + RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule"))); updatedReplication = JsonPath.read( mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[1]"); Assertions.assertEquals(updatedReplication.get("destination"), "clusterB"); Assertions.assertEquals(updatedReplication.get("interval"), "12H"); - Assertions.assertNotNull(updatedReplication.get("cronSchedule")); + Assertions.assertTrue( + RequestAndValidateHelper.validateCronSchedule(updatedReplication.get("cronSchedule"))); + RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } }