From 0c8464d7857397def6f49c28dc671aa1f3fadc5c Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 15:45:51 -0400 Subject: [PATCH 1/9] add test for unset timestamp --- .../apache_beam/io/gcp/bigtableio_it_test.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 341f2983c8bc..0f0613a1f2d8 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -223,6 +223,9 @@ def test_set_mutation(self): row1_col2_cell = Cell(b'val1-2', 200_000_000) row2_col1_cell = Cell(b'val2-1', 100_000_000) row2_col2_cell = Cell(b'val2-2', 200_000_000) + # When setting this cell, we won't set a timestamp. We expect the timestamp + # to default to -1, and Bigtable will set it to system time at insertion. + row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time()) # rows sent to write transform row1.set_cell( 'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp) @@ -232,6 +235,8 @@ def test_set_mutation(self): 'col_fam', b'col-1', row2_col1_cell.value, row2_col1_cell.timestamp) row2.set_cell( 'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp) + # don't set a timestamp here. it should default to -1 + row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value) self.run_pipeline([row1, row2]) @@ -249,6 +254,16 @@ def test_set_mutation(self): self.assertEqual( row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0]) + # check cells that don't have a timestamp set are handled properly: + self.assertEqual( + row2_col1_no_timestamp.value, + actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value) + # Bigtable sets timestamp as insertion time, which is later than the + # time.time() we set when creating this test case + self.assertTrue( + row2_col1_no_timestamp.timestamp < actual_row2.find_cells( + 'col_fam', b'col-no-timestamp')[0].timestamp) + def test_delete_cells_mutation(self): col_fam = self.table.column_family('col_fam') col_fam.create() From 491c1ac5c092d086250f15216dfdb23682534f8f Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 16:07:37 -0400 Subject: [PATCH 2/9] add fix and test error message --- sdks/python/apache_beam/io/gcp/bigtableio.py | 2 +- .../python/apache_beam/io/gcp/bigtableio_it_test.py | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index b2b52bd675c5..f402e6d7b393 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -255,7 +255,7 @@ def process(self, direct_row): "value": mutation.set_cell.value } micros = mutation.set_cell.timestamp_micros - if micros > -1: + if micros >= -1: mutation_dict['timestamp_micros'] = struct.pack('>q', micros) elif mutation.__contains__("delete_from_column"): mutation_dict = { diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 0f0613a1f2d8..c9506e2a8a66 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -144,9 +144,9 @@ def test_read_xlang(self): @pytest.mark.uses_gcp_java_expansion_service @pytest.mark.uses_transform_service -@unittest.skipUnless( - os.environ.get('EXPANSION_PORT'), - "EXPANSION_PORT environment var is not provided.") +# @unittest.skipUnless( +# os.environ.get('EXPANSION_PORT'), +# "EXPANSION_PORT environment var is not provided.") @unittest.skipIf(client is None, 'Bigtable dependencies are not installed') class TestWriteToBigtableXlangIT(unittest.TestCase): # These are integration tests for the cross-language write transform. @@ -158,7 +158,7 @@ def setUpClass(cls): cls.test_pipeline = TestPipeline(is_integration_test=True) cls.project = cls.test_pipeline.get_option('project') cls.args = cls.test_pipeline.get_full_options_as_args() - cls.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) + cls.expansion_service = None #('localhost:%s' % os.environ.get('EXPANSION_PORT')) instance_id = '%s-%s-%s' % ( cls.INSTANCE, str(int(time.time())), secrets.token_hex(3)) @@ -262,7 +262,10 @@ def test_set_mutation(self): # time.time() we set when creating this test case self.assertTrue( row2_col1_no_timestamp.timestamp < actual_row2.find_cells( - 'col_fam', b'col-no-timestamp')[0].timestamp) + 'col_fam', b'col-no-timestamp')[0].timestamp, + msg="Expected cell with unset timestamp to have ingestion time attached, " + f"but was {actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].timestamp}" + ) def test_delete_cells_mutation(self): col_fam = self.table.column_family('col_fam') From babff9484f4304342a2f15db69ec57e138f5be5f Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 16:09:22 -0400 Subject: [PATCH 3/9] re-enable expansion service --- sdks/python/apache_beam/io/gcp/bigtableio_it_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index c9506e2a8a66..9a54c50fcf19 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -144,9 +144,9 @@ def test_read_xlang(self): @pytest.mark.uses_gcp_java_expansion_service @pytest.mark.uses_transform_service -# @unittest.skipUnless( -# os.environ.get('EXPANSION_PORT'), -# "EXPANSION_PORT environment var is not provided.") +@unittest.skipUnless( + os.environ.get('EXPANSION_PORT'), + "EXPANSION_PORT environment var is not provided.") @unittest.skipIf(client is None, 'Bigtable dependencies are not installed') class TestWriteToBigtableXlangIT(unittest.TestCase): # These are integration tests for the cross-language write transform. @@ -158,7 +158,7 @@ def setUpClass(cls): cls.test_pipeline = TestPipeline(is_integration_test=True) cls.project = cls.test_pipeline.get_option('project') cls.args = cls.test_pipeline.get_full_options_as_args() - cls.expansion_service = None #('localhost:%s' % os.environ.get('EXPANSION_PORT')) + cls.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) instance_id = '%s-%s-%s' % ( cls.INSTANCE, str(int(time.time())), secrets.token_hex(3)) From deb26a7fe1e36b23c42bddf92ba77ce5f800a2db Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 16:30:59 -0400 Subject: [PATCH 4/9] pass thru all timestamps; add explicit -1 timestamp test --- sdks/python/apache_beam/io/gcp/bigtableio.py | 7 +++--- .../apache_beam/io/gcp/bigtableio_it_test.py | 25 ++++++++++++++++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index f402e6d7b393..f8534f38ddfc 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -252,11 +252,10 @@ def process(self, direct_row): "type": b'SetCell', "family_name": mutation.set_cell.family_name.encode('utf-8'), "column_qualifier": mutation.set_cell.column_qualifier, - "value": mutation.set_cell.value + "value": mutation.set_cell.value, + "timestamp_micros": struct.pack( + '>q', mutation.set_cell.timestamp_micros) } - micros = mutation.set_cell.timestamp_micros - if micros >= -1: - mutation_dict['timestamp_micros'] = struct.pack('>q', micros) elif mutation.__contains__("delete_from_column"): mutation_dict = { "type": b'DeleteFromColumn', diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 9a54c50fcf19..794bbf1ce5ca 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -226,6 +226,7 @@ def test_set_mutation(self): # When setting this cell, we won't set a timestamp. We expect the timestamp # to default to -1, and Bigtable will set it to system time at insertion. row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time()) + row2_col1_neg1_timestamp = Cell(b'val2-2-neg1-timestamp', time.time()) # rows sent to write transform row1.set_cell( 'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp) @@ -237,6 +238,8 @@ def test_set_mutation(self): 'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp) # don't set a timestamp here. it should default to -1 row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value) + row2.set_cell( + 'col_fam', b'col-neg1-timestamp', row2_col1_no_timestamp.value) self.run_pipeline([row1, row2]) @@ -254,18 +257,32 @@ def test_set_mutation(self): self.assertEqual( row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0]) - # check cells that don't have a timestamp set are handled properly: + # check cell that doesn't have a timestamp set is handled properly: self.assertEqual( row2_col1_no_timestamp.value, actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value) # Bigtable sets timestamp as insertion time, which is later than the # time.time() we set when creating this test case + cell_timestamp = actual_row2.find_cells('col_fam', + b'col-no-timestamp')[0].timestamp self.assertTrue( row2_col1_no_timestamp.timestamp < actual_row2.find_cells( 'col_fam', b'col-no-timestamp')[0].timestamp, - msg="Expected cell with unset timestamp to have ingestion time attached, " - f"but was {actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].timestamp}" - ) + msg="Expected cell with unset timestamp to have ingestion time " + f"attached, but was {cell_timestamp}") + # check cell that has timestamp of `-1` is handled properly: + self.assertEqual( + row2_col1_neg1_timestamp.value, + actual_row2.find_cells('col_fam', b'col-neg1-timestamp')[0].value) + # Bigtable sets -1 timestamp as insertion time, which is later than the + # time.time() we set when creating this test case + cell_timestamp = actual_row2.find_cells('col_fam', + b'col-neg1-timestamp')[0].timestamp + self.assertTrue( + row2_col1_neg1_timestamp.timestamp < actual_row2.find_cells( + 'col_fam', b'col-neg1-timestamp')[0].timestamp, + msg="Expected cell with `-1` timestamp to have ingestion time " + f"attached, but was {cell_timestamp}") def test_delete_cells_mutation(self): col_fam = self.table.column_family('col_fam') From 1539537cad020590de8abdf92d4c5fa747333132 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 16:49:07 -0400 Subject: [PATCH 5/9] actually set test timestamp to -1 --- sdks/python/apache_beam/io/gcp/bigtableio_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 794bbf1ce5ca..1ed9016e9687 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -239,7 +239,7 @@ def test_set_mutation(self): # don't set a timestamp here. it should default to -1 row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value) row2.set_cell( - 'col_fam', b'col-neg1-timestamp', row2_col1_no_timestamp.value) + 'col_fam', b'col-neg1-timestamp', row2_col1_no_timestamp.value, -1) self.run_pipeline([row1, row2]) From 951ba0363c8e1e294e7e5eae51caffdb6629362d Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 20:02:49 -0400 Subject: [PATCH 6/9] remove explicit -1 test; default to -1 in schematransform --- .../BigtableWriteSchemaTransformProvider.java | 13 ++++++------ .../apache_beam/io/gcp/bigtableio_it_test.py | 21 ++----------------- 2 files changed, 9 insertions(+), 25 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java index d38bdae2f092..b99b69621a84 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java @@ -179,12 +179,13 @@ public KV> apply(Row row) { .setColumnQualifier( ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get())) .setFamilyNameBytes( - ByteString.copyFrom(ofNullable(mutation.get("family_name")).get())); - if (mutation.containsKey("timestamp_micros")) { - setMutation = - setMutation.setTimestampMicros( - Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get())); - } + ByteString.copyFrom(ofNullable(mutation.get("family_name")).get())) + // Use timestamp if provided, else default to -1 (current Bigtable server time) + .setTimestampMicros( + mutation.containsKey("timestamp_micros") + ? Longs.fromByteArray( + ofNullable(mutation.get("timestamp_micros")).get()) + : -1); bigtableMutation = Mutation.newBuilder().setSetCell(setMutation.build()).build(); break; case "DeleteFromColumn": diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py index 1ed9016e9687..f61e346cff9f 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py @@ -226,7 +226,6 @@ def test_set_mutation(self): # When setting this cell, we won't set a timestamp. We expect the timestamp # to default to -1, and Bigtable will set it to system time at insertion. row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time()) - row2_col1_neg1_timestamp = Cell(b'val2-2-neg1-timestamp', time.time()) # rows sent to write transform row1.set_cell( 'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp) @@ -238,8 +237,6 @@ def test_set_mutation(self): 'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp) # don't set a timestamp here. it should default to -1 row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value) - row2.set_cell( - 'col_fam', b'col-neg1-timestamp', row2_col1_no_timestamp.value, -1) self.run_pipeline([row1, row2]) @@ -257,7 +254,7 @@ def test_set_mutation(self): self.assertEqual( row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0]) - # check cell that doesn't have a timestamp set is handled properly: + # check mutation that doesn't have a timestamp set is handled properly: self.assertEqual( row2_col1_no_timestamp.value, actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value) @@ -266,23 +263,9 @@ def test_set_mutation(self): cell_timestamp = actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].timestamp self.assertTrue( - row2_col1_no_timestamp.timestamp < actual_row2.find_cells( - 'col_fam', b'col-no-timestamp')[0].timestamp, + row2_col1_no_timestamp.timestamp < cell_timestamp, msg="Expected cell with unset timestamp to have ingestion time " f"attached, but was {cell_timestamp}") - # check cell that has timestamp of `-1` is handled properly: - self.assertEqual( - row2_col1_neg1_timestamp.value, - actual_row2.find_cells('col_fam', b'col-neg1-timestamp')[0].value) - # Bigtable sets -1 timestamp as insertion time, which is later than the - # time.time() we set when creating this test case - cell_timestamp = actual_row2.find_cells('col_fam', - b'col-neg1-timestamp')[0].timestamp - self.assertTrue( - row2_col1_neg1_timestamp.timestamp < actual_row2.find_cells( - 'col_fam', b'col-neg1-timestamp')[0].timestamp, - msg="Expected cell with `-1` timestamp to have ingestion time " - f"attached, but was {cell_timestamp}") def test_delete_cells_mutation(self): col_fam = self.table.column_family('col_fam') From 74cb85e76c5648644abf27b47623cab9ab078c0e Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 21:24:01 -0400 Subject: [PATCH 7/9] fix schematransform test --- ...igtableWriteSchemaTransformProviderIT.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java index 14bb04b0315d..e3519269cf62 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteSchemaTransformProvider.BigtableWriteSchemaTransformConfiguration; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline; @@ -104,7 +103,7 @@ public void testInvalidConfigs() { public void setup() throws Exception { BigtableTestOptions options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class); - projectId = options.as(GcpOptions.class).getProject(); + projectId = "google.com:clouddfe"; // options.as(GcpOptions.class).getProject(); instanceId = options.getInstanceId(); BigtableDataSettings settings = @@ -154,8 +153,8 @@ public void tearDown() { public void testSetMutationsExistingColumn() { RowMutation rowMutation = RowMutation.create(tableId, "key-1") - .setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a") - .setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c"); + .setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a") + .setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c"); dataClient.mutateRow(rowMutation); List> mutations = new ArrayList<>(); @@ -165,13 +164,15 @@ public void testSetMutationsExistingColumn() { "type", "SetCell".getBytes(StandardCharsets.UTF_8), "value", "new-val-1-a".getBytes(StandardCharsets.UTF_8), "column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8), - "family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8))); + "family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8), + "timestamp_micros", Longs.toByteArray(2000))); mutations.add( ImmutableMap.of( "type", "SetCell".getBytes(StandardCharsets.UTF_8), "value", "new-val-1-c".getBytes(StandardCharsets.UTF_8), "column_qualifier", "col_c".getBytes(StandardCharsets.UTF_8), - "family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8))); + "family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8), + "timestamp_micros", Longs.toByteArray(2000))); Row mutationRow = Row.withSchema(SCHEMA) .withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8)) @@ -202,10 +203,8 @@ public void testSetMutationsExistingColumn() { .collect(Collectors.toList()); assertEquals(2, cellsColA.size()); assertEquals(2, cellsColC.size()); - System.out.println(cellsColA); - System.out.println(cellsColC); - assertEquals("new-val-1-a", cellsColA.get(1).getValue().toStringUtf8()); - assertEquals("new-val-1-c", cellsColC.get(1).getValue().toStringUtf8()); + assertEquals("new-val-1-a", cellsColA.get(0).getValue().toStringUtf8()); + assertEquals("new-val-1-c", cellsColC.get(0).getValue().toStringUtf8()); } @Test From 29a7888e56d4500b212565151973055135945cfd Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 21:41:22 -0400 Subject: [PATCH 8/9] make the test a lil stronger --- .../gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java index e3519269cf62..20070284ba86 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java @@ -103,7 +103,7 @@ public void testInvalidConfigs() { public void setup() throws Exception { BigtableTestOptions options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class); - projectId = "google.com:clouddfe"; // options.as(GcpOptions.class).getProject(); + projectId = options.as(GcpOptions.class).getProject(); instanceId = options.getInstanceId(); BigtableDataSettings settings = @@ -203,8 +203,11 @@ public void testSetMutationsExistingColumn() { .collect(Collectors.toList()); assertEquals(2, cellsColA.size()); assertEquals(2, cellsColC.size()); + // Bigtable keeps cell history ordered by descending timestamp assertEquals("new-val-1-a", cellsColA.get(0).getValue().toStringUtf8()); assertEquals("new-val-1-c", cellsColC.get(0).getValue().toStringUtf8()); + assertEquals("val-1-a", cellsColA.get(1).getValue().toStringUtf8()); + assertEquals("val-1-c", cellsColC.get(1).getValue().toStringUtf8()); } @Test From deb221c1d1ade08064392eb1c9b3b19e624a4597 Mon Sep 17 00:00:00 2001 From: ahmedabu98 Date: Fri, 22 Sep 2023 22:22:31 -0400 Subject: [PATCH 9/9] import options --- .../io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java index 20070284ba86..1a60fe661b52 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteSchemaTransformProvider.BigtableWriteSchemaTransformConfiguration; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline;