diff --git a/CHANGES.md b/CHANGES.md index bbe9d539531b..a990a5fd7304 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -75,6 +75,7 @@ * Removed fastjson library dependency for Beam SQL. Table property is changed to be based on jackson ObjectNode (Java) ([#24154](https://github.com/apache/beam/issues/24154)). * Removed TensorFlow from Beam Python container images [PR](https://github.com/apache/beam/pull/28424). If you have been negatively affected by this change, please comment on [#20605](https://github.com/apache/beam/issues/20605). * Removed the parameter `t reflect.Type` from `parquetio.Write`. The element type is derived from the input PCollection (Go) ([#28490](https://github.com/apache/beam/issues/28490)) +* Refactor BeamSqlSeekableTable.setUp adding a parameter joinSubsetType. [#28283](https://github.com/apache/beam/issues/28283) ## Deprecations diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java index 7b924cf6b6da..4dc9bd5777ff 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.Row; @@ -28,8 +29,12 @@ * FROM FACT_TABLE JOIN LOOKUP_TABLE ON ...}. */ public interface BeamSqlSeekableTable extends Serializable { - /** prepare the instance. */ - default void setUp() {} + /** + * prepare the instance. + * + * @param joinSubsetType joining subset schema + */ + default void setUp(Schema joinSubsetType) {} default void startBundle( DoFn.StartBundleContext context, PipelineOptions pipelineOptions) {} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java index e4d62c2b5de7..d25f98729bd4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -153,7 +153,7 @@ public PCollection expand(PCollection input) { new DoFn() { @Setup public void setup() { - seekableTable.setUp(); + seekableTable.setUp(joinSubsetType); } @StartBundle diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java index 2e2971ebd6e9..b5fd03045cbc 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.hamcrest.core.StringContains; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -47,11 +48,18 @@ public class BeamSideInputLookupJoinRelTest extends BaseRelTest { /** Test table for JOIN-AS-LOOKUP. */ public static class SiteLookupTable extends SchemaBaseBeamTable implements BeamSqlSeekableTable { + private Schema joinSubsetType; public SiteLookupTable(Schema schema) { super(schema); } + @Override + public void setUp(Schema joinSubsetType) { + this.joinSubsetType = joinSubsetType; + Assert.assertNotNull(joinSubsetType); + } + @Override public PCollection.IsBounded isBounded() { return PCollection.IsBounded.BOUNDED; @@ -69,6 +77,7 @@ public POutput buildIOWriter(PCollection input) { @Override public List seekRow(Row lookupSubRow) { + Assert.assertEquals(joinSubsetType, lookupSubRow.getSchema()); if (lookupSubRow.getInt32("site_id") == 2) { return Arrays.asList(Row.withSchema(getSchema()).addValues(2, "SITE1").build()); }