Skip to content

Commit

Permalink
Add cron schedule generator to generate from interval
Browse files Browse the repository at this point in the history
  • Loading branch information
chenselena committed Oct 25, 2024
1 parent 6347499 commit 3a197f1
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,11 @@ public class ReplicationConfig {
@Schema(
description =
"Optional parameter interval at which the replication job should run. Default value is 1D",
example = "1D")
example = "12H, 1D, 2D, 3D")
@Valid
String interval;

@Schema(
description = "Cron schedule generated from interval used for replication job scheduling",
example = "Schedule could be '0 3/12 * * *' if interval is 12H")
@Schema(description = "Cron schedule generated from the interval.", example = "0 0 1/1 * ? *")
@Valid
String cronSchedule;

public enum Granularity {
H,
D
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,16 @@ private Replication mapReplicationPolicies(Replication replicationPolicy) {
ReplicationInterval.DEFAULT.getInterval()))
.build();
}
return replication
.toBuilder()
.cronSchedule(
CronScheduleGenerator.buildCronExpression(replication.getInterval()))
.build();
if (replication.getCronSchedule() == null) {
return replication
.toBuilder()
.cronSchedule(
CronScheduleGenerator.buildCronExpression(replication.getInterval()))
.build();
}
return replication;
})
.collect(Collectors.toList());

return replicationPolicy.toBuilder().config(replicationConfig).build();
}
return replicationPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,70 @@
import com.cronutils.builder.CronBuilder;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.field.expression.FieldExpression;
import com.cronutils.model.field.expression.FieldExpressionFactory;
import com.linkedin.openhouse.common.exception.RequestValidationFailureException;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;
import java.util.Random;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* Utility class for generating cron schedules based on interval input for {@link
* ReplicationConfig}.
*/
/** Utility class for generating a cron schedule given an interval for which a job should run */
@Slf4j
@Component
public class CronScheduleGenerator {

/**
* Public api to generate a cron schedule for a {@link ReplicationConfig} based on a given
* interval string in the form 12H, 1D, 2D, 3D.
*
* @param interval
* @return schedule
*/
public static String buildCronExpression(String interval) {
if (interval == null || interval.isEmpty()) {
log.error("Replication interval is null or empty");
throw new RequestValidationFailureException(
String.format("Replication interval is null or empty"));
}
int count = Integer.parseInt(interval.substring(0, interval.length() - 1));
int hour = new Random().nextInt(24);
String granularity = interval.substring(interval.length() - 1);
int hourSchedule = new Random().nextInt(24);
String schedule;

if (granularity.equals(ReplicationConfig.Granularity.H.toString())) {
schedule = buildHourlyCronExpression(hourSchedule, count);
if (granularity.equals("H")) {
schedule = buildHourlyCronExpression(hour, count);
} else {
schedule = buildDailyCronExpression(hourSchedule, count);
schedule = buildDailyCronExpression(hour, count);
}
return schedule;
}

private static String buildDailyCronExpression(int hourSchedule, int dailyInterval) {
return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.CRON4J))
.withDoM(FieldExpressionFactory.every(dailyInterval))
private static String buildDailyCronExpression(int hour, int dailyInterval) {
return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
.withYear(FieldExpressionFactory.always())
.withDoM(FieldExpression.questionMark())
.withDoW(FieldExpressionFactory.every(dailyInterval))
.withMonth(FieldExpressionFactory.always())
.withDoW(FieldExpressionFactory.always())
.withHour(FieldExpressionFactory.on(hourSchedule))
.withHour(FieldExpressionFactory.on(hour))
.withMinute(FieldExpressionFactory.on(0))
.withSecond(FieldExpressionFactory.on(0))
.instance()
.asString();
}

private static String buildHourlyCronExpression(int hourSchedule, int hourlyInterval) {
return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.CRON4J))
.withDoM(FieldExpressionFactory.always())
.withMonth(FieldExpressionFactory.always())
private static String buildHourlyCronExpression(int hour, int hourlyInterval) {
return CronBuilder.cron(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ))
.withYear(FieldExpressionFactory.always())
.withDoM(FieldExpressionFactory.questionMark())
.withDoW(FieldExpressionFactory.always())
.withHour(
FieldExpressionFactory.every(hourlyInterval)
.and(FieldExpressionFactory.on(hourSchedule)))
.withMonth(FieldExpressionFactory.always())
.withHour(FieldExpressionFactory.every(hourlyInterval))
.withMinute(FieldExpressionFactory.on(0))
.withSecond(FieldExpressionFactory.on(0))
.instance()
.asString();
.asString()
.replace(
String.format("*/%d", hourlyInterval), String.format("%d/%d", hour, hourlyInterval));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,8 @@ public void testCreateTableWithTableType() throws Exception {
mvc, storageManager, buildGetTableResponseBody(mvcResult), INITIAL_TABLE_VERSION, false);
Assertions.assertEquals(
tableType, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE.getTableType().toString());
RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
RequestAndValidateHelper.deleteTableAndValidateResponse(
mvc, GET_TABLE_RESPONSE_BODY_WITH_TABLE_TYPE);
}

@Test
Expand Down Expand Up @@ -1058,7 +1059,7 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception {

Assertions.assertEquals(updatedReplication.get("destination"), "clusterA");
Assertions.assertEquals(updatedReplication.get("interval"), "12H");
System.out.println();
Assertions.assertNotNull(updatedReplication.get("cronSchedule"));
RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}

Expand Down Expand Up @@ -1113,9 +1114,6 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception {
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

Assertions.assertNotEquals(currentPolicies, updatedPolicies);
Assertions.assertEquals(
updatedPolicies.get("replication").get("config").toString(),
"[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]");
Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4);
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"),
Expand Down Expand Up @@ -1170,13 +1168,14 @@ public void testUpdateSucceedsForMultipleReplicationConfig() throws Exception {

Assertions.assertEquals(updatedReplication.get("destination"), "clusterA");
Assertions.assertEquals(updatedReplication.get("interval"), "1D");

Assertions.assertNotNull(updatedReplication.get("cronSchedule"));
updatedReplication =
JsonPath.read(
mvcResult.getResponse().getContentAsString(), "$.policies.replication.config[1]");

Assertions.assertEquals(updatedReplication.get("destination"), "clusterB");
Assertions.assertEquals(updatedReplication.get("interval"), "12H");
Assertions.assertNotNull(updatedReplication.get("cronSchedule"));
RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}
}

0 comments on commit 3a197f1

Please sign in to comment.