diff --git a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java index 99c6cbf9..02138f06 100644 --- a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java +++ b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java @@ -43,37 +43,38 @@ public void setupSpark() { @Test public void testSimpleSetReplicationPolicy() { - String replicationConfigJson = "[{\"cluster\":\"a\", \"interval\":\"24H\"}]"; + String replicationConfigJson = "[{\"destination\":\"a\", \"interval\":\"24H\"}]"; Dataset ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({cluster:'a', interval:24H}))"); + + "({destination:'a', interval:24H}))"); assert isPlanValid(ds, replicationConfigJson); // Test support with multiple clusters replicationConfigJson = - "[{\"cluster\":\"a\", \"interval\":\"12H\"}, {\"cluster\":\"aa\", \"interval\":\"12H\"}]"; + "[{\"destination\":\"a\", \"interval\":\"12H\"}, {\"destination\":\"aa\", \"interval\":\"12H\"}]"; ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({cluster:'a', interval:12h}, {cluster:'aa', interval:12H}))"); + + "({destination:'a', interval:12h}, {destination:'aa', interval:12H}))"); assert isPlanValid(ds, replicationConfigJson); } @Test public void testSimpleSetReplicationPolicyOptionalInterval() { // Test with optional interval - String replicationConfigJson = "[{\"cluster\":\"a\"}]"; + String replicationConfigJson = "[{\"destination\":\"a\"}]"; Dataset ds = - spark.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({cluster:'a'}))"); + spark.sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({destination:'a'}))"); assert isPlanValid(ds, replicationConfigJson); // Test with optional interval for multiple clusters - replicationConfigJson = "[{\"cluster\":\"a\"}, {\"cluster\":\"b\"}]"; + replicationConfigJson = "[{\"destination\":\"a\"}, {\"destination\":\"b\"}]"; ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({cluster:'a'}, {cluster:'b'}))"); + + "({destination:'a'}, {destination:'b'}))"); assert isPlanValid(ds, replicationConfigJson); } @@ -84,7 +85,7 @@ public void testReplicationPolicyWithoutProperSyntax() { OpenhouseParseException.class, () -> spark - .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:}))") + .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:}))") .show()); // Empty interval value @@ -93,7 +94,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval:}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval:}))") .show()); // Empty interval value @@ -102,7 +103,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval:}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval:}))") .show()); // Missing cluster value but interval present @@ -111,7 +112,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:, interval: '12h'}))") .show()); // Missing interval value but keyword present @@ -120,7 +121,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'a', interval:}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'a', interval:}))") .show()); // Missing cluster value for multiple clusters @@ -129,7 +130,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval:'12H'}, {cluster:, interval: '12H'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:, interval:'12H'}, {cluster:, interval: '12H'}))") .show()); // Missing cluster keyword for multiple clusters @@ -138,7 +139,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({interval:'a'}, {interval: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:'a'}, {interval: '12h'}))") .show()); // Missing cluster keyword @@ -156,7 +157,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interv: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interv: '12h'}))") .show()); // Typo in keyword cluster @@ -165,7 +166,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({clustr: 'aa', interval: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destina: 'aa', interval: '12h'}))") .show()); // Missing quote in cluster value @@ -174,7 +175,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: aa', interval: '12h}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: aa', interval: '12h}))") .show()); // Typo in REPLICATION keyword @@ -183,7 +184,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({cluster: 'aa', interval: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({destination: 'aa', interval: '12h'}))") .show()); // Interval input does not follow 'h/H' format @@ -192,7 +193,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '12'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12'}))") .show()); Assertions.assertThrows( @@ -200,7 +201,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '1D'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '1D'}))") .show()); Assertions.assertThrows( @@ -208,7 +209,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '12d'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12d'}))") .show()); // Missing cluster and interval values @@ -247,11 +248,11 @@ private boolean isPlanValid(Dataset dataframe, String replicationConfigJson boolean isValid = false; for (JsonElement element : jsonArray) { JsonObject entry = element.getAsJsonObject(); - String cluster = entry.get("cluster").getAsString(); - isValid = queryStr.contains(cluster); + String destination = entry.get("destination").getAsString(); + isValid = queryStr.contains(destination); if (entry.has("interval")) { String interval = entry.get("interval").getAsString(); - isValid = queryStr.contains(cluster) && queryStr.contains(interval); + isValid = queryStr.contains(destination) && queryStr.contains(interval); } } return isValid; diff --git a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index b6d0343e..67fa6585 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -97,7 +97,7 @@ replicationPolicyClause ; replicationPolicyClusterClause - : CLUSTER ':' STRING + : DESTINATION ':' STRING ; replicationPolicyIntervalClause @@ -172,7 +172,7 @@ DATABASE: 'DATABASE'; SHOW: 'SHOW'; GRANTS: 'GRANTS'; PATTERN: 'PATTERN'; -CLUSTER: 'CLUSTER'; +DESTINATION: 'DESTINATION'; INTERVAL: 'INTERVAL'; WHERE: 'WHERE'; COLUMN: 'COLUMN';