From a5c00d9481e1b3ff2d20381274babd8031043704 Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Wed, 9 Oct 2024 11:49:40 -0700 Subject: [PATCH 1/6] Add changes from previous branch for SQL API side changes --- ...etTableReplicationPolicyStatementTest.java | 158 ++++++++++++++++++ .../extensions/OpenhouseSqlExtensions.g4 | 18 +- .../OpenhouseSqlExtensionsAstBuilder.scala | 22 ++- .../plans/logical/SetReplicationPolicy.scala | 9 + .../v2/OpenhouseDataSourceV2Strategy.scala | 4 +- .../v2/SetReplicationPolicyExec.scala | 26 +++ 6 files changed, 234 insertions(+), 3 deletions(-) create mode 100644 integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java create mode 100644 integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala create mode 100644 integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala diff --git a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java new file mode 100644 index 00000000..60643ae5 --- /dev/null +++ b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java @@ -0,0 +1,158 @@ +package com.linkedin.openhouse.spark.statementtest; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseException; +import java.nio.file.Files; +import lombok.SneakyThrows; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.ExplainMode; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class SetTableReplicationPolicyStatementTest { + private static SparkSession spark = null; + + @SneakyThrows + @BeforeAll + public void setupSpark() { + Path unittest = new Path(Files.createTempDirectory("unittest_settablepolicy").toString()); + spark = + SparkSession.builder() + .master("local[2]") + .config( + "spark.sql.extensions", + ("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions," + + "com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions")) + .config("spark.sql.catalog.openhouse", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.openhouse.type", "hadoop") + .config("spark.sql.catalog.openhouse.warehouse", unittest.toString()) + .getOrCreate(); + } + + @Test + public void testSimpleSetReplicationPolicy() { + String replicationConfigJson = + "{\"cluster\":\"a\", \"schedule\":\"b\"}, {\"cluster\": \"aa\", \"schedule\": \"bb\"}"; + Dataset ds = + spark.sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + + "({cluster:'a', schedule:'b'}, {cluster: 'aa', schedule: 'bb'}))"); + assert isPlanValid(ds, replicationConfigJson); + + replicationConfigJson = "{\"cluster\":\"a\", \"schedule\":\"b\"}"; + ds = + spark.sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:'a', schedule:'b'}))"); + assert isPlanValid(ds, replicationConfigJson); + } + + @Test + public void testReplicationPolicyWithoutProperSyntax() { + // missing schedule keyword + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa'}))") + .show()); + + // Missing cluster keyword + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({schedule: 'ss'}))") + .show()); + + // Typo in keyword schedule + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', schedul: 'ss'}))") + .show()); + + // Typo in keyword cluster + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({clustr: 'aa', schedule: 'ss'}))") + .show()); + + // Missing quote in cluster value + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: aa', schedule: 'ss}))") + .show()); + + // Type in REPLICATION keyword + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({cluster: 'aa', schedule: 'ss}))") + .show()); + + // Missing cluster and schedule value + Assertions.assertThrows( + OpenhouseParseException.class, + () -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({}))").show()); + } + + @BeforeEach + public void setup() { + spark.sql("CREATE TABLE openhouse.db.table (id bigint, data string) USING iceberg").show(); + spark.sql("CREATE TABLE openhouse.0_.0_ (id bigint, data string) USING iceberg").show(); + spark + .sql("ALTER TABLE openhouse.db.table SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')") + .show(); + spark + .sql("ALTER TABLE openhouse.0_.0_ SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')") + .show(); + } + + @AfterEach + public void tearDown() { + spark.sql("DROP TABLE openhouse.db.table").show(); + spark.sql("DROP TABLE openhouse.0_.0_").show(); + } + + @AfterAll + public void tearDownSpark() { + spark.close(); + } + + @SneakyThrows + private boolean isPlanValid(Dataset dataframe, String replicationConfigJson) { + replicationConfigJson = "[" + replicationConfigJson + "]"; + String queryStr = dataframe.queryExecution().explainString(ExplainMode.fromString("simple")); + JsonArray jsonArray = new Gson().fromJson(replicationConfigJson, JsonArray.class); + boolean isValid = false; + for (JsonElement element : jsonArray) { + JsonObject entry = element.getAsJsonObject(); + String cluster = entry.get("cluster").getAsString(); + String schedule = entry.get("schedule").getAsString(); + isValid = queryStr.contains(cluster) && queryStr.contains(schedule); + } + return isValid; + } +} diff --git a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index bf762af6..e2e59e1c 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -24,6 +24,7 @@ singleStatement statement : ALTER TABLE multipartIdentifier SET POLICY '(' retentionPolicy (columnRetentionPolicy)? ')' #setRetentionPolicy + | ALTER TABLE multipartIdentifier SET POLICY '(' replicationPolicy ')' #setReplicationPolicy | ALTER TABLE multipartIdentifier SET POLICY '(' sharingPolicy ')' #setSharingPolicy | ALTER TABLE multipartIdentifier MODIFY columnNameClause SET columnPolicy #setColumnPolicyTag | GRANT privilege ON grantableResource TO principal #grantStatement @@ -64,7 +65,7 @@ quotedIdentifier ; nonReserved - : ALTER | TABLE | SET | POLICY | RETENTION | SHARING + : ALTER | TABLE | SET | POLICY | RETENTION | SHARING | REPLICATION | GRANT | REVOKE | ON | TO | SHOW | GRANTS | PATTERN | WHERE | COLUMN ; @@ -83,6 +84,18 @@ columnRetentionPolicy : ON columnNameClause (columnRetentionPolicyPatternClause)? ; +replicationPolicy + : REPLICATION '=' tableReplicationPolicy + ; + +tableReplicationPolicy + : '(' replicationPolicyClause (',' replicationPolicyClause)* ')' + ; + +replicationPolicyClause + : '{' CLUSTER ':' STRING ',' SCHEDULE ':' STRING '}' + ; + columnRetentionPolicyPatternClause : WHERE retentionColumnPatternClause ; @@ -136,6 +149,7 @@ TABLE: 'TABLE'; SET: 'SET'; POLICY: 'POLICY'; RETENTION: 'RETENTION'; +REPLICATION: 'REPLICATION'; SHARING: 'SHARING'; GRANT: 'GRANT'; REVOKE: 'REVOKE'; @@ -150,6 +164,8 @@ DATABASE: 'DATABASE'; SHOW: 'SHOW'; GRANTS: 'GRANTS'; PATTERN: 'PATTERN'; +CLUSTER: 'CLUSTER'; +SCHEDULE: 'SCHEDULE'; WHERE: 'WHERE'; COLUMN: 'COLUMN'; PII: 'PII'; diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala index 4b8fc405..63468e3d 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala @@ -2,13 +2,14 @@ package com.linkedin.openhouse.spark.sql.catalyst.parser.extensions import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSqlExtensionsParser._ -import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetRetentionPolicy, SetSharingPolicy, SetColumnPolicyTag, ShowGrantsStatement} +import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement} import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType import com.linkedin.openhouse.gen.tables.client.model.TimePartitionSpec import org.antlr.v4.runtime.tree.ParseTree import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import scala.collection.JavaConversions.iterableAsScalaIterable import scala.collection.JavaConverters._ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends OpenhouseSqlExtensionsBaseVisitor[AnyRef] { @@ -26,6 +27,12 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh SetRetentionPolicy(tableName, granularity, count, Option(colName), Option(colPattern)) } + override def visitSetReplicationPolicy(ctx: SetReplicationPolicyContext): SetReplicationPolicy = { + val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier) + val replicationPolicies = typedVisit[Seq[String]](ctx.replicationPolicy()) + SetReplicationPolicy(tableName, replicationPolicies) + } + override def visitSetSharingPolicy(ctx: SetSharingPolicyContext): SetSharingPolicy = { val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier) val sharing = typedVisit[String](ctx.sharingPolicy()) @@ -86,6 +93,19 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh typedVisit[(String, Int)](ctx.duration()) } + override def visitReplicationPolicy(ctx: ReplicationPolicyContext): (Seq[String]) = { + typedVisit[(Seq[String])](ctx.tableReplicationPolicy()) + } + + override def visitTableReplicationPolicy(ctx: TableReplicationPolicyContext): (Seq[String]) = { + ctx.replicationPolicyClause().map(ele => typedVisit[String](ele)).toSeq + } + + override def visitReplicationPolicyClause(ctx: ReplicationPolicyClauseContext): (String) = { + val replicationPolicy = ctx.STRING().map(_.getText) + replicationPolicy.mkString(":") + } + override def visitColumnRetentionPolicy(ctx: ColumnRetentionPolicyContext): (String, String) = { if (ctx.columnRetentionPolicyPatternClause() != null) { (ctx.columnNameClause().identifier().getText(), ctx.columnRetentionPolicyPatternClause().retentionColumnPatternClause().STRING().getText) diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala new file mode 100644 index 00000000..e83b1bb0 --- /dev/null +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.Command + +case class SetReplicationPolicy(tableName: Seq[String], replicationPolicies: Seq[String]) extends Command { + override def simpleString(maxFields: Int): String = { + s"SetReplicationPolicy: ${tableName} ${replicationPolicies}}" + } +} diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala index 595c6e06..8545a2bc 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala @@ -1,6 +1,6 @@ package com.linkedin.openhouse.spark.sql.execution.datasources.v2 -import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetRetentionPolicy, SetSharingPolicy, SetColumnPolicyTag, ShowGrantsStatement} +import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement} import org.apache.iceberg.spark.{Spark3Util, SparkCatalog, SparkSessionCatalog} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.expressions.PredicateHelper @@ -15,6 +15,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case SetRetentionPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, count, colName, colPattern) => SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil + case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), replicationPolicies) => + SetReplicationPolicyExec(catalog, ident, replicationPolicies) :: Nil case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) => SetSharingPolicyExec(catalog, ident, sharing) :: Nil case SetColumnPolicyTag(CatalogAndIdentifierExtractor(catalog, ident), policyTag, cols) => diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala new file mode 100644 index 00000000..cfffd5cf --- /dev/null +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala @@ -0,0 +1,26 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.V2CommandExec + +case class SetReplicationPolicyExec(catalog: TableCatalog, ident: Identifier, replicationPolicies: Seq[String]) extends V2CommandExec{ + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = s"""{"replication":{"schedules":[${replicationPolicies.map(replication => s"""{config:{${replication}}}""").mkString(",")}]}}""" + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set replication policy for non-Openhouse table: $table") + } + Nil + } + + override def output: Seq[Attribute] = Nil +} From 1f7d13da94caafe993b7501b75501bbfe5cd4c9e Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Wed, 9 Oct 2024 15:37:17 -0700 Subject: [PATCH 2/6] Initial changes --- ...etTableReplicationPolicyStatementTest.java | 42 +++++++++---------- .../extensions/OpenhouseSqlExtensions.g4 | 12 ++++-- .../OpenhouseSqlExtensionsAstBuilder.scala | 25 +++++++---- .../plans/logical/SetReplicationPolicy.scala | 4 +- .../v2/OpenhouseDataSourceV2Strategy.scala | 4 +- .../v2/SetReplicationPolicyExec.scala | 4 +- 6 files changed, 49 insertions(+), 42 deletions(-) diff --git a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java index 60643ae5..5754f4b7 100644 --- a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java +++ b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java @@ -43,46 +43,39 @@ public void setupSpark() { @Test public void testSimpleSetReplicationPolicy() { - String replicationConfigJson = - "{\"cluster\":\"a\", \"schedule\":\"b\"}, {\"cluster\": \"aa\", \"schedule\": \"bb\"}"; + String replicationConfigJson = "{\"cluster\":\"a\", \"interval\":\"b\"}"; Dataset ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({cluster:'a', schedule:'b'}, {cluster: 'aa', schedule: 'bb'}))"); + + "({cluster:'a', interval:'b'}))"); assert isPlanValid(ds, replicationConfigJson); + } - replicationConfigJson = "{\"cluster\":\"a\", \"schedule\":\"b\"}"; - ds = - spark.sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:'a', schedule:'b'}))"); + @Test + public void testSimpleSetReplicationPolicyOptionalInterval() { + String replicationConfigJson = "{\"cluster\":\"a\"}"; + Dataset ds = + spark.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({cluster:'a'}))"); assert isPlanValid(ds, replicationConfigJson); } @Test public void testReplicationPolicyWithoutProperSyntax() { - // missing schedule keyword - Assertions.assertThrows( - OpenhouseParseException.class, - () -> - spark - .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa'}))") - .show()); - // Missing cluster keyword Assertions.assertThrows( OpenhouseParseException.class, () -> spark - .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({schedule: 'ss'}))") + .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({interval: 'ss'}))") .show()); - // Typo in keyword schedule + // Typo in keyword interval Assertions.assertThrows( OpenhouseParseException.class, () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', schedul: 'ss'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interv: 'ss'}))") .show()); // Typo in keyword cluster @@ -91,7 +84,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({clustr: 'aa', schedule: 'ss'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({clustr: 'aa', interval: 'ss'}))") .show()); // Missing quote in cluster value @@ -100,7 +93,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: aa', schedule: 'ss}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: aa', interval: 'ss}))") .show()); // Type in REPLICATION keyword @@ -109,7 +102,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({cluster: 'aa', schedule: 'ss}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({cluster: 'aa', interval: 'ss}))") .show()); // Missing cluster and schedule value @@ -150,8 +143,11 @@ private boolean isPlanValid(Dataset dataframe, String replicationConfigJson for (JsonElement element : jsonArray) { JsonObject entry = element.getAsJsonObject(); String cluster = entry.get("cluster").getAsString(); - String schedule = entry.get("schedule").getAsString(); - isValid = queryStr.contains(cluster) && queryStr.contains(schedule); + isValid = queryStr.contains(cluster); + if (entry.has("interval")) { + String interval = entry.get("interval").getAsString(); + isValid = queryStr.contains(cluster) && queryStr.contains(interval); + } } return isValid; } diff --git a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index e2e59e1c..26d13654 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -89,11 +89,15 @@ replicationPolicy ; tableReplicationPolicy - : '(' replicationPolicyClause (',' replicationPolicyClause)* ')' + : '(' '{' replicationPolicyClusterClause (',' replicationPolicyIntervalClause)? '}' ')' ; -replicationPolicyClause - : '{' CLUSTER ':' STRING ',' SCHEDULE ':' STRING '}' +replicationPolicyClusterClause + : CLUSTER ':' STRING + ; + +replicationPolicyIntervalClause + : INTERVAL ':' STRING ; columnRetentionPolicyPatternClause @@ -165,7 +169,7 @@ SHOW: 'SHOW'; GRANTS: 'GRANTS'; PATTERN: 'PATTERN'; CLUSTER: 'CLUSTER'; -SCHEDULE: 'SCHEDULE'; +INTERVAL: 'INTERVAL'; WHERE: 'WHERE'; COLUMN: 'COLUMN'; PII: 'PII'; diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala index 63468e3d..bf49d75a 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala @@ -29,8 +29,8 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh override def visitSetReplicationPolicy(ctx: SetReplicationPolicyContext): SetReplicationPolicy = { val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier) - val replicationPolicies = typedVisit[Seq[String]](ctx.replicationPolicy()) - SetReplicationPolicy(tableName, replicationPolicies) + val (clusterName, interval) = typedVisit[(String, String)](ctx.replicationPolicy()) + SetReplicationPolicy(tableName, clusterName, interval) } override def visitSetSharingPolicy(ctx: SetSharingPolicyContext): SetSharingPolicy = { @@ -93,17 +93,24 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh typedVisit[(String, Int)](ctx.duration()) } - override def visitReplicationPolicy(ctx: ReplicationPolicyContext): (Seq[String]) = { - typedVisit[(Seq[String])](ctx.tableReplicationPolicy()) + override def visitReplicationPolicy(ctx: ReplicationPolicyContext): (String, String) = { + typedVisit[(String, String)](ctx.tableReplicationPolicy()) } - override def visitTableReplicationPolicy(ctx: TableReplicationPolicyContext): (Seq[String]) = { - ctx.replicationPolicyClause().map(ele => typedVisit[String](ele)).toSeq + override def visitTableReplicationPolicy(ctx: TableReplicationPolicyContext): (String, String) = { + val clusterName = typedVisit[String](ctx.replicationPolicyClusterClause()) + val interval = if (ctx.replicationPolicyIntervalClause() != null) + typedVisit[String](ctx.replicationPolicyIntervalClause()) + else null + (clusterName, interval) } - override def visitReplicationPolicyClause(ctx: ReplicationPolicyClauseContext): (String) = { - val replicationPolicy = ctx.STRING().map(_.getText) - replicationPolicy.mkString(":") + override def visitReplicationPolicyClusterClause(ctx: ReplicationPolicyClusterClauseContext): (String) = { + ctx.STRING().getText + } + + override def visitReplicationPolicyIntervalClause(ctx: ReplicationPolicyIntervalClauseContext): (String) = { + ctx.STRING().getText } override def visitColumnRetentionPolicy(ctx: ColumnRetentionPolicyContext): (String, String) = { diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala index e83b1bb0..382bfd45 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala @@ -2,8 +2,8 @@ package com.linkedin.openhouse.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Command -case class SetReplicationPolicy(tableName: Seq[String], replicationPolicies: Seq[String]) extends Command { +case class SetReplicationPolicy(tableName: Seq[String], clusterName: String, interval: String) extends Command { override def simpleString(maxFields: Int): String = { - s"SetReplicationPolicy: ${tableName} ${replicationPolicies}}" + s"SetReplicationPolicy: ${tableName} ${clusterName} ${interval}" } } diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala index 8545a2bc..ef1e4726 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala @@ -15,8 +15,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case SetRetentionPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, count, colName, colPattern) => SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil - case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), replicationPolicies) => - SetReplicationPolicyExec(catalog, ident, replicationPolicies) :: Nil + case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), clusterName, interval) => + SetReplicationPolicyExec(catalog, ident, clusterName, interval) :: Nil case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) => SetSharingPolicyExec(catalog, ident, sharing) :: Nil case SetColumnPolicyTag(CatalogAndIdentifierExtractor(catalog, ident), policyTag, cols) => diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala index cfffd5cf..ab7d7c19 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala @@ -6,12 +6,12 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.execution.datasources.v2.V2CommandExec -case class SetReplicationPolicyExec(catalog: TableCatalog, ident: Identifier, replicationPolicies: Seq[String]) extends V2CommandExec{ +case class SetReplicationPolicyExec(catalog: TableCatalog, ident: Identifier, clusterName: String, interval: String) extends V2CommandExec{ override protected def run(): Seq[InternalRow] = { catalog.loadTable(ident) match { case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => val key = "updated.openhouse.policy" - val value = s"""{"replication":{"schedules":[${replicationPolicies.map(replication => s"""{config:{${replication}}}""").mkString(",")}]}}""" + val value = s"""{"replication":{"config":[{"destination":"$clusterName","interval":"$interval"}]}}""" iceberg.table().updateProperties() .set(key, value) .commit() From 0a1b0c6004b517aa412d6f066589515eebbfe8af Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Thu, 10 Oct 2024 15:27:21 -0700 Subject: [PATCH 3/6] Support for multiple cluster values in SQL and update tests --- ...etTableReplicationPolicyStatementTest.java | 89 +++++++++++++++++-- .../extensions/OpenhouseSqlExtensions.g4 | 6 +- .../OpenhouseSqlExtensionsAstBuilder.scala | 18 ++-- .../plans/logical/SetReplicationPolicy.scala | 4 +- .../v2/OpenhouseDataSourceV2Strategy.scala | 4 +- .../v2/SetReplicationPolicyExec.scala | 4 +- 6 files changed, 105 insertions(+), 20 deletions(-) diff --git a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java index 5754f4b7..931a6b08 100644 --- a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java +++ b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java @@ -49,18 +49,95 @@ public void testSimpleSetReplicationPolicy() { "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({cluster:'a', interval:'b'}))"); assert isPlanValid(ds, replicationConfigJson); - } - @Test - public void testSimpleSetReplicationPolicyOptionalInterval() { - String replicationConfigJson = "{\"cluster\":\"a\"}"; - Dataset ds = + // Test support with multiple clusters + replicationConfigJson = + "{\"cluster\":\"a\", \"interval\":\"b\"}, {\"cluster\":\"aa\", \"interval\":\"bb\"}"; + ds = + spark.sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + + "({cluster:'a', interval:'b'}, {cluster:'aa', interval:'bb'}))"); + assert isPlanValid(ds, replicationConfigJson); + + // Test with optional interval + replicationConfigJson = "{\"cluster\":\"a\"}"; + ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({cluster:'a'}))"); assert isPlanValid(ds, replicationConfigJson); + + // Test with optional interval for multiple clusters + replicationConfigJson = "{\"cluster\":\"a\"}, {\"cluster\":\"b\"}"; + ds = + spark.sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + + "({cluster:'a'}, {cluster:'b'}))"); + assert isPlanValid(ds, replicationConfigJson); } @Test public void testReplicationPolicyWithoutProperSyntax() { + // Empty cluster value + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:}))") + .show()); + + // Empty interval value + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval:}))") + .show()); + + // Empty interval value + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval:}))") + .show()); + + // Missing cluster value but interval present + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval: 'bb'}))") + .show()); + + // Missing interval value but keyword present + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'a', interval:}))") + .show()); + + // Missing cluster value for multiple clusters + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval:'a'}, {cluster:, interval: 'b'}))") + .show()); + + // Missing cluster keyword for multiple clusters + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({interval:'a'}, {interval: 'b'}))") + .show()); + // Missing cluster keyword Assertions.assertThrows( OpenhouseParseException.class, @@ -105,7 +182,7 @@ public void testReplicationPolicyWithoutProperSyntax() { "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({cluster: 'aa', interval: 'ss}))") .show()); - // Missing cluster and schedule value + // Missing cluster and interval values Assertions.assertThrows( OpenhouseParseException.class, () -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({}))").show()); diff --git a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index 26d13654..e7a0e051 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -89,7 +89,11 @@ replicationPolicy ; tableReplicationPolicy - : '(' '{' replicationPolicyClusterClause (',' replicationPolicyIntervalClause)? '}' ')' + : '(' replicationPolicyClause (',' replicationPolicyClause)* ')' + ; + +replicationPolicyClause + : '{' replicationPolicyClusterClause (',' replicationPolicyIntervalClause)? '}' ; replicationPolicyClusterClause diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala index bf49d75a..accd2d18 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala @@ -29,8 +29,8 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh override def visitSetReplicationPolicy(ctx: SetReplicationPolicyContext): SetReplicationPolicy = { val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier) - val (clusterName, interval) = typedVisit[(String, String)](ctx.replicationPolicy()) - SetReplicationPolicy(tableName, clusterName, interval) + val replicationPolicies = typedVisit[Seq[(String, Option[String])]](ctx.replicationPolicy()) + SetReplicationPolicy(tableName, replicationPolicies) } override def visitSetSharingPolicy(ctx: SetSharingPolicyContext): SetSharingPolicy = { @@ -93,16 +93,20 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh typedVisit[(String, Int)](ctx.duration()) } - override def visitReplicationPolicy(ctx: ReplicationPolicyContext): (String, String) = { - typedVisit[(String, String)](ctx.tableReplicationPolicy()) + override def visitReplicationPolicy(ctx: ReplicationPolicyContext): Seq[(String, Option[String])] = { + typedVisit[Seq[(String, Option[String])]](ctx.tableReplicationPolicy()) } - override def visitTableReplicationPolicy(ctx: TableReplicationPolicyContext): (String, String) = { - val clusterName = typedVisit[String](ctx.replicationPolicyClusterClause()) + override def visitTableReplicationPolicy(ctx: TableReplicationPolicyContext): Seq[(String, Option[String])] = { + toSeq(ctx.replicationPolicyClause()).map(typedVisit[(String, Option[String])]) + } + + override def visitReplicationPolicyClause(ctx: ReplicationPolicyClauseContext): (String, Option[String]) = { + val cluster = typedVisit[String](ctx.replicationPolicyClusterClause()) val interval = if (ctx.replicationPolicyIntervalClause() != null) typedVisit[String](ctx.replicationPolicyIntervalClause()) else null - (clusterName, interval) + (cluster, Option(interval)) } override def visitReplicationPolicyClusterClause(ctx: ReplicationPolicyClusterClauseContext): (String) = { diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala index 382bfd45..9c7b0791 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetReplicationPolicy.scala @@ -2,8 +2,8 @@ package com.linkedin.openhouse.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Command -case class SetReplicationPolicy(tableName: Seq[String], clusterName: String, interval: String) extends Command { +case class SetReplicationPolicy(tableName: Seq[String], replicationPolicies: Seq[(String, Option[String])]) extends Command { override def simpleString(maxFields: Int): String = { - s"SetReplicationPolicy: ${tableName} ${clusterName} ${interval}" + s"SetReplicationPolicy: ${tableName} ${replicationPolicies}" } } diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala index ef1e4726..8545a2bc 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala @@ -15,8 +15,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case SetRetentionPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, count, colName, colPattern) => SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil - case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), clusterName, interval) => - SetReplicationPolicyExec(catalog, ident, clusterName, interval) :: Nil + case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), replicationPolicies) => + SetReplicationPolicyExec(catalog, ident, replicationPolicies) :: Nil case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) => SetSharingPolicyExec(catalog, ident, sharing) :: Nil case SetColumnPolicyTag(CatalogAndIdentifierExtractor(catalog, ident), policyTag, cols) => diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala index ab7d7c19..5fd33236 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetReplicationPolicyExec.scala @@ -6,12 +6,12 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.execution.datasources.v2.V2CommandExec -case class SetReplicationPolicyExec(catalog: TableCatalog, ident: Identifier, clusterName: String, interval: String) extends V2CommandExec{ +case class SetReplicationPolicyExec(catalog: TableCatalog, ident: Identifier, replicationPolicies: Seq[(String, Option[String])]) extends V2CommandExec{ override protected def run(): Seq[InternalRow] = { catalog.loadTable(ident) match { case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => val key = "updated.openhouse.policy" - val value = s"""{"replication":{"config":[{"destination":"$clusterName","interval":"$interval"}]}}""" + val value = s"""{"replication":{"config":[${replicationPolicies.map(replication => s"""{"destination":"${replication._1}","interval":"${replication._2.getOrElse("")}"}""").mkString(",")}]}}""" iceberg.table().updateProperties() .set(key, value) .commit() From acd42eb59a68a78aefcb6ced8acddb0832dc05cb Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Tue, 15 Oct 2024 16:43:01 -0700 Subject: [PATCH 4/6] Use RETENTION_HOUR for further SQL restriction --- ...etTableReplicationPolicyStatementTest.java | 46 ++++++++++++------- .../extensions/OpenhouseSqlExtensions.g4 | 2 +- .../OpenhouseSqlExtensionsAstBuilder.scala | 5 +- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java index 931a6b08..80ef8dac 100644 --- a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java +++ b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java @@ -43,30 +43,33 @@ public void setupSpark() { @Test public void testSimpleSetReplicationPolicy() { - String replicationConfigJson = "{\"cluster\":\"a\", \"interval\":\"b\"}"; + String replicationConfigJson = "[{\"cluster\":\"a\", \"interval\":\"24H\"}]"; Dataset ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({cluster:'a', interval:'b'}))"); + + "({cluster:'a', interval:24H}))"); assert isPlanValid(ds, replicationConfigJson); // Test support with multiple clusters replicationConfigJson = - "{\"cluster\":\"a\", \"interval\":\"b\"}, {\"cluster\":\"aa\", \"interval\":\"bb\"}"; + "[{\"cluster\":\"a\", \"interval\":\"12H\"}, {\"cluster\":\"aa\", \"interval\":\"12H\"}]"; ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({cluster:'a', interval:'b'}, {cluster:'aa', interval:'bb'}))"); + + "({cluster:'a', interval:12h}, {cluster:'aa', interval:12H}))"); assert isPlanValid(ds, replicationConfigJson); + } + @Test + public void testSimpleSetReplicationPolicyOptionalInterval() { // Test with optional interval - replicationConfigJson = "{\"cluster\":\"a\"}"; - ds = + String replicationConfigJson = "[{\"cluster\":\"a\"}]"; + Dataset ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({cluster:'a'}))"); assert isPlanValid(ds, replicationConfigJson); // Test with optional interval for multiple clusters - replicationConfigJson = "{\"cluster\":\"a\"}, {\"cluster\":\"b\"}"; + replicationConfigJson = "[{\"cluster\":\"a\"}, {\"cluster\":\"b\"}]"; ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " @@ -108,7 +111,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval: 'bb'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval: '12h'}))") .show()); // Missing interval value but keyword present @@ -126,7 +129,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval:'a'}, {cluster:, interval: 'b'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval:'12H'}, {cluster:, interval: '12H'}))") .show()); // Missing cluster keyword for multiple clusters @@ -135,7 +138,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({interval:'a'}, {interval: 'b'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({interval:'a'}, {interval: '12h'}))") .show()); // Missing cluster keyword @@ -143,7 +146,8 @@ public void testReplicationPolicyWithoutProperSyntax() { OpenhouseParseException.class, () -> spark - .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({interval: 'ss'}))") + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({interval: '12h'}))") .show()); // Typo in keyword interval @@ -152,7 +156,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interv: 'ss'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interv: '12h'}))") .show()); // Typo in keyword cluster @@ -161,7 +165,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({clustr: 'aa', interval: 'ss'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({clustr: 'aa', interval: '12h'}))") .show()); // Missing quote in cluster value @@ -170,16 +174,25 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: aa', interval: 'ss}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: aa', interval: '12h}))") + .show()); + + // Typo in REPLICATION keyword + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({cluster: 'aa', interval: '12h'}))") .show()); - // Type in REPLICATION keyword + // Interval input does not follow 'h/H' format Assertions.assertThrows( OpenhouseParseException.class, () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({cluster: 'aa', interval: 'ss}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '12'}))") .show()); // Missing cluster and interval values @@ -213,7 +226,6 @@ public void tearDownSpark() { @SneakyThrows private boolean isPlanValid(Dataset dataframe, String replicationConfigJson) { - replicationConfigJson = "[" + replicationConfigJson + "]"; String queryStr = dataframe.queryExecution().explainString(ExplainMode.fromString("simple")); JsonArray jsonArray = new Gson().fromJson(replicationConfigJson, JsonArray.class); boolean isValid = false; diff --git a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index e7a0e051..b6d0343e 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -101,7 +101,7 @@ replicationPolicyClusterClause ; replicationPolicyIntervalClause - : INTERVAL ':' STRING + : INTERVAL ':' RETENTION_HOUR ; columnRetentionPolicyPatternClause diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala index accd2d18..408c9cf3 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala @@ -105,7 +105,8 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh val cluster = typedVisit[String](ctx.replicationPolicyClusterClause()) val interval = if (ctx.replicationPolicyIntervalClause() != null) typedVisit[String](ctx.replicationPolicyIntervalClause()) - else null + else + null (cluster, Option(interval)) } @@ -114,7 +115,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } override def visitReplicationPolicyIntervalClause(ctx: ReplicationPolicyIntervalClauseContext): (String) = { - ctx.STRING().getText + ctx.RETENTION_HOUR().getText.toUpperCase } override def visitColumnRetentionPolicy(ctx: ColumnRetentionPolicyContext): (String, String) = { From 0bfc80f4f47d17ad793fbbc60c50dd2fea7db100 Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Wed, 16 Oct 2024 14:46:27 -0700 Subject: [PATCH 5/6] Add test for day level granularity --- .../SetTableReplicationPolicyStatementTest.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java index 80ef8dac..99c6cbf9 100644 --- a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java +++ b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java @@ -195,6 +195,22 @@ public void testReplicationPolicyWithoutProperSyntax() { "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '12'}))") .show()); + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '1D'}))") + .show()); + + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '12d'}))") + .show()); + // Missing cluster and interval values Assertions.assertThrows( OpenhouseParseException.class, From 755c522db3057fc1af0984b1c9725d10df1ba802 Mon Sep 17 00:00:00 2001 From: Selena Chen Date: Wed, 16 Oct 2024 20:24:02 -0700 Subject: [PATCH 6/6] Change CLUSTER keyword to DESTINATION --- ...etTableReplicationPolicyStatementTest.java | 51 ++++++++++--------- .../extensions/OpenhouseSqlExtensions.g4 | 4 +- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java index 99c6cbf9..02138f06 100644 --- a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java +++ b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetTableReplicationPolicyStatementTest.java @@ -43,37 +43,38 @@ public void setupSpark() { @Test public void testSimpleSetReplicationPolicy() { - String replicationConfigJson = "[{\"cluster\":\"a\", \"interval\":\"24H\"}]"; + String replicationConfigJson = "[{\"destination\":\"a\", \"interval\":\"24H\"}]"; Dataset ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({cluster:'a', interval:24H}))"); + + "({destination:'a', interval:24H}))"); assert isPlanValid(ds, replicationConfigJson); // Test support with multiple clusters replicationConfigJson = - "[{\"cluster\":\"a\", \"interval\":\"12H\"}, {\"cluster\":\"aa\", \"interval\":\"12H\"}]"; + "[{\"destination\":\"a\", \"interval\":\"12H\"}, {\"destination\":\"aa\", \"interval\":\"12H\"}]"; ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({cluster:'a', interval:12h}, {cluster:'aa', interval:12H}))"); + + "({destination:'a', interval:12h}, {destination:'aa', interval:12H}))"); assert isPlanValid(ds, replicationConfigJson); } @Test public void testSimpleSetReplicationPolicyOptionalInterval() { // Test with optional interval - String replicationConfigJson = "[{\"cluster\":\"a\"}]"; + String replicationConfigJson = "[{\"destination\":\"a\"}]"; Dataset ds = - spark.sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({cluster:'a'}))"); + spark.sql( + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " + "({destination:'a'}))"); assert isPlanValid(ds, replicationConfigJson); // Test with optional interval for multiple clusters - replicationConfigJson = "[{\"cluster\":\"a\"}, {\"cluster\":\"b\"}]"; + replicationConfigJson = "[{\"destination\":\"a\"}, {\"destination\":\"b\"}]"; ds = spark.sql( "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = " - + "({cluster:'a'}, {cluster:'b'}))"); + + "({destination:'a'}, {destination:'b'}))"); assert isPlanValid(ds, replicationConfigJson); } @@ -84,7 +85,7 @@ public void testReplicationPolicyWithoutProperSyntax() { OpenhouseParseException.class, () -> spark - .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:}))") + .sql("ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:}))") .show()); // Empty interval value @@ -93,7 +94,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval:}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval:}))") .show()); // Empty interval value @@ -102,7 +103,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval:}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval:}))") .show()); // Missing cluster value but interval present @@ -111,7 +112,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:, interval: '12h'}))") .show()); // Missing interval value but keyword present @@ -120,7 +121,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'a', interval:}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'a', interval:}))") .show()); // Missing cluster value for multiple clusters @@ -129,7 +130,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster:, interval:'12H'}, {cluster:, interval: '12H'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:, interval:'12H'}, {cluster:, interval: '12H'}))") .show()); // Missing cluster keyword for multiple clusters @@ -138,7 +139,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({interval:'a'}, {interval: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination:'a'}, {interval: '12h'}))") .show()); // Missing cluster keyword @@ -156,7 +157,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interv: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interv: '12h'}))") .show()); // Typo in keyword cluster @@ -165,7 +166,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({clustr: 'aa', interval: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destina: 'aa', interval: '12h'}))") .show()); // Missing quote in cluster value @@ -174,7 +175,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: aa', interval: '12h}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: aa', interval: '12h}))") .show()); // Typo in REPLICATION keyword @@ -183,7 +184,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({cluster: 'aa', interval: '12h'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICAT = ({destination: 'aa', interval: '12h'}))") .show()); // Interval input does not follow 'h/H' format @@ -192,7 +193,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '12'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12'}))") .show()); Assertions.assertThrows( @@ -200,7 +201,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '1D'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '1D'}))") .show()); Assertions.assertThrows( @@ -208,7 +209,7 @@ public void testReplicationPolicyWithoutProperSyntax() { () -> spark .sql( - "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({cluster: 'aa', interval: '12d'}))") + "ALTER TABLE openhouse.db.table SET POLICY (REPLICATION = ({destination: 'aa', interval: '12d'}))") .show()); // Missing cluster and interval values @@ -247,11 +248,11 @@ private boolean isPlanValid(Dataset dataframe, String replicationConfigJson boolean isValid = false; for (JsonElement element : jsonArray) { JsonObject entry = element.getAsJsonObject(); - String cluster = entry.get("cluster").getAsString(); - isValid = queryStr.contains(cluster); + String destination = entry.get("destination").getAsString(); + isValid = queryStr.contains(destination); if (entry.has("interval")) { String interval = entry.get("interval").getAsString(); - isValid = queryStr.contains(cluster) && queryStr.contains(interval); + isValid = queryStr.contains(destination) && queryStr.contains(interval); } } return isValid; diff --git a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index b6d0343e..67fa6585 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -97,7 +97,7 @@ replicationPolicyClause ; replicationPolicyClusterClause - : CLUSTER ':' STRING + : DESTINATION ':' STRING ; replicationPolicyIntervalClause @@ -172,7 +172,7 @@ DATABASE: 'DATABASE'; SHOW: 'SHOW'; GRANTS: 'GRANTS'; PATTERN: 'PATTERN'; -CLUSTER: 'CLUSTER'; +DESTINATION: 'DESTINATION'; INTERVAL: 'INTERVAL'; WHERE: 'WHERE'; COLUMN: 'COLUMN';