Skip to content

Commit

Permalink
Change CLUSTER keyword to DESTINATION
Browse files Browse the repository at this point in the history
  • Loading branch information
chenselena committed Oct 17, 2024
1 parent 0bfc80f commit 755c522
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,37 +43,38 @@ public void setupSpark() {

@Test
public void testSimpleSetReplicationPolicy() {
String replicationConfigJson = "[{\"cluster\":\"a\", \"interval\":\"24H\"}]";
String replicationConfigJson = "[{\"destination\":\"a\", \"interval\":\"24H\"}]";
Dataset<Row> ds =
spark.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = "
+ "({cluster:'a', interval:24H}))");
+ "({destination:'a', interval:24H}))");
assert isPlanValid(ds, replicationConfigJson);

// Test support with multiple clusters
replicationConfigJson =
"[{\"cluster\":\"a\", \"interval\":\"12H\"}, {\"cluster\":\"aa\", \"interval\":\"12H\"}]";
"[{\"destination\":\"a\", \"interval\":\"12H\"}, {\"destination\":\"aa\", \"interval\":\"12H\"}]";
ds =
spark.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = "
+ "({cluster:'a', interval:12h}, {cluster:'aa', interval:12H}))");
+ "({destination:'a', interval:12h}, {destination:'aa', interval:12H}))");
assert isPlanValid(ds, replicationConfigJson);
}

@Test
public void testSimpleSetReplicationPolicyOptionalInterval() {
// Test with optional interval
String replicationConfigJson = "[{\"cluster\":\"a\"}]";
String replicationConfigJson = "[{\"destination\":\"a\"}]";
Dataset<Row> ds =
spark.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({cluster:'a'}))");
spark.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({destination:'a'}))");
assert isPlanValid(ds, replicationConfigJson);

// Test with optional interval for multiple clusters
replicationConfigJson = "[{\"cluster\":\"a\"}, {\"cluster\":\"b\"}]";
replicationConfigJson = "[{\"destination\":\"a\"}, {\"destination\":\"b\"}]";
ds =
spark.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = "
+ "({cluster:'a'}, {cluster:'b'}))");
+ "({destination:'a'}, {destination:'b'}))");
assert isPlanValid(ds, replicationConfigJson);
}

Expand All @@ -84,7 +85,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
OpenhouseParseException.class,
() ->
spark
.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:}))")
.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:}))")
.show());

// Empty interval value
Expand All @@ -93,7 +94,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval:}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval:}))")
.show());

// Empty interval value
Expand All @@ -102,7 +103,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval:}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval:}))")
.show());

// Missing cluster value but interval present
Expand All @@ -111,7 +112,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval: '12h'}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:, interval: '12h'}))")
.show());

// Missing interval value but keyword present
Expand All @@ -120,7 +121,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'a', interval:}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'a', interval:}))")
.show());

// Missing cluster value for multiple clusters
Expand All @@ -129,7 +130,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval:'12H'}, {cluster:, interval: '12H'}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:, interval:'12H'}, {cluster:, interval: '12H'}))")
.show());

// Missing cluster keyword for multiple clusters
Expand All @@ -138,7 +139,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({interval:'a'}, {interval: '12h'}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:'a'}, {interval: '12h'}))")
.show());

// Missing cluster keyword
Expand All @@ -156,7 +157,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interv: '12h'}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interv: '12h'}))")
.show());

// Typo in keyword cluster
Expand All @@ -165,7 +166,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({clustr: 'aa', interval: '12h'}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destina: 'aa', interval: '12h'}))")
.show());

// Missing quote in cluster value
Expand All @@ -174,7 +175,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: aa', interval: '12h}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: aa', interval: '12h}))")
.show());

// Typo in REPLICATION keyword
Expand All @@ -183,7 +184,7 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({cluster: 'aa', interval: '12h'}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({destination: 'aa', interval: '12h'}))")
.show());

// Interval input does not follow 'h/H' format
Expand All @@ -192,23 +193,23 @@ public void testReplicationPolicyWithoutProperSyntax() {
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '12'}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12'}))")
.show());

Assertions.assertThrows(
OpenhouseParseException.class,
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '1D'}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '1D'}))")
.show());

Assertions.assertThrows(
OpenhouseParseException.class,
() ->
spark
.sql(
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '12d'}))")
"ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12d'}))")
.show());

// Missing cluster and interval values
Expand Down Expand Up @@ -247,11 +248,11 @@ private boolean isPlanValid(Dataset<Row> dataframe, String replicationConfigJson
boolean isValid = false;
for (JsonElement element : jsonArray) {
JsonObject entry = element.getAsJsonObject();
String cluster = entry.get("cluster").getAsString();
isValid = queryStr.contains(cluster);
String destination = entry.get("destination").getAsString();
isValid = queryStr.contains(destination);
if (entry.has("interval")) {
String interval = entry.get("interval").getAsString();
isValid = queryStr.contains(cluster) && queryStr.contains(interval);
isValid = queryStr.contains(destination) && queryStr.contains(interval);
}
}
return isValid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ replicationPolicyClause
;

replicationPolicyClusterClause
: CLUSTER ':' STRING
: DESTINATION ':' STRING
;

replicationPolicyIntervalClause
Expand Down Expand Up @@ -172,7 +172,7 @@ DATABASE: 'DATABASE';
SHOW: 'SHOW';
GRANTS: 'GRANTS';
PATTERN: 'PATTERN';
CLUSTER: 'CLUSTER';
DESTINATION: 'DESTINATION';
INTERVAL: 'INTERVAL';
WHERE: 'WHERE';
COLUMN: 'COLUMN';
Expand Down

0 comments on commit 755c522

Please sign in to comment.