Skip to content

Commit

Permalink
Add file loads streaming integration tests (#28312)
Browse files Browse the repository at this point in the history
* file loads streaming integration tests

* fix dynamic destinations copy jobs

* disable for runnerV2 until pane index is fixed
  • Loading branch information
ahmedabu98 authored Sep 13, 2023
1 parent 6ac4e82 commit 1032533
Show file tree
Hide file tree
Showing 5 changed files with 532 additions and 10 deletions.
3 changes: 3 additions & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,9 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) {
exclude '**/FhirIOLROIT.class'
exclude '**/FhirIOSearchIT.class'
exclude '**/FhirIOPatientEverythingIT.class'
// failing due to pane index not incrementing after Reshuffle:
// https://github.com/apache/beam/issues/28219
exclude '**/FileLoadsStreamingIT.class'

maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
Expand Down Expand Up @@ -399,10 +398,12 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
"Window Into Global Windows",
Window.<KV<DestinationT, WriteTables.Result>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
.apply("Add Void Key", WithKeys.of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
.apply("GroupByKey", GroupByKey.create())
.apply("Extract Values", Values.create())
// We use this and the following GBK to aggregate by final destination.
// This way, each destination has its own pane sequence
.apply("AddDestinationKeys", WithKeys.of(result -> result.getKey()))
.setCoder(KvCoder.of(destinationCoder, tempTables.getCoder()))
.apply("GroupTempTablesByFinalDestination", GroupByKey.create())
.apply("ExtractTempTables", Values.create())
.apply(
ParDo.of(
new UpdateSchemaDestination<DestinationT>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
}
}

if (jsonBQValue instanceof byte[] && fieldType.getTypeName() == TypeName.BYTES) {
return jsonBQValue;
}

if (jsonBQValue instanceof List) {
if (fieldType.getCollectionElementType() == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class BigQueryUtilsTest {
.addNullableField("time0s_0ns", Schema.FieldType.logicalType(SqlTypes.TIME))
.addNullableField("valid", Schema.FieldType.BOOLEAN)
.addNullableField("binary", Schema.FieldType.BYTES)
.addNullableField("raw_bytes", Schema.FieldType.BYTES)
.addNullableField("numeric", Schema.FieldType.DECIMAL)
.addNullableField("boolean", Schema.FieldType.BOOLEAN)
.addNullableField("long", Schema.FieldType.INT64)
Expand Down Expand Up @@ -188,6 +189,9 @@ public class BigQueryUtilsTest {
private static final TableFieldSchema BINARY =
new TableFieldSchema().setName("binary").setType(StandardSQLTypeName.BYTES.toString());

private static final TableFieldSchema RAW_BYTES =
new TableFieldSchema().setName("raw_bytes").setType(StandardSQLTypeName.BYTES.toString());

private static final TableFieldSchema NUMERIC =
new TableFieldSchema().setName("numeric").setType(StandardSQLTypeName.NUMERIC.toString());

Expand Down Expand Up @@ -246,6 +250,7 @@ public class BigQueryUtilsTest {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -276,6 +281,7 @@ public class BigQueryUtilsTest {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -316,6 +322,7 @@ public class BigQueryUtilsTest {
LocalTime.parse("12:34"),
false,
Base64.getDecoder().decode("ABCD1234"),
Base64.getDecoder().decode("ABCD1234"),
new BigDecimal("123.456").setScale(3, RoundingMode.HALF_UP),
true,
123L,
Expand Down Expand Up @@ -346,6 +353,7 @@ public class BigQueryUtilsTest {
.set("time0s_0ns", "12:34:00")
.set("valid", "false")
.set("binary", "ABCD1234")
.set("raw_bytes", Base64.getDecoder().decode("ABCD1234"))
.set("numeric", "123.456")
.set("boolean", true)
.set("long", 123L)
Expand All @@ -355,7 +363,7 @@ public class BigQueryUtilsTest {
Row.withSchema(FLAT_TYPE)
.addValues(
null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null)
null, null, null, null, null, null, null, null, null)
.build();

private static final TableRow BQ_NULL_FLAT_ROW =
Expand All @@ -378,6 +386,7 @@ public class BigQueryUtilsTest {
.set("time0s_0ns", null)
.set("valid", null)
.set("binary", null)
.set("raw_bytes", null)
.set("numeric", null)
.set("boolean", null)
.set("long", null)
Expand Down Expand Up @@ -457,6 +466,7 @@ public class BigQueryUtilsTest {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -512,6 +522,7 @@ public void testToTableSchema_flat() {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -562,6 +573,7 @@ public void testToTableSchema_row() {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -598,6 +610,7 @@ public void testToTableSchema_array_row() {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand All @@ -620,7 +633,7 @@ public void testToTableSchema_map() {
public void testToTableRow_flat() {
TableRow row = toTableRow().apply(FLAT_ROW);

assertThat(row.size(), equalTo(22));
assertThat(row.size(), equalTo(23));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
Expand All @@ -635,6 +648,7 @@ public void testToTableRow_flat() {
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("raw_bytes", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
assertThat(row, hasEntry("boolean", "true"));
assertThat(row, hasEntry("long", "123"));
Expand Down Expand Up @@ -674,7 +688,7 @@ public void testToTableRow_row() {

assertThat(row.size(), equalTo(1));
row = (TableRow) row.get("row");
assertThat(row.size(), equalTo(22));
assertThat(row.size(), equalTo(23));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
Expand All @@ -689,6 +703,7 @@ public void testToTableRow_row() {
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("raw_bytes", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
assertThat(row, hasEntry("boolean", "true"));
assertThat(row, hasEntry("long", "123"));
Expand All @@ -701,7 +716,7 @@ public void testToTableRow_array_row() {

assertThat(row.size(), equalTo(1));
row = ((List<TableRow>) row.get("rows")).get(0);
assertThat(row.size(), equalTo(22));
assertThat(row.size(), equalTo(23));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
Expand All @@ -716,6 +731,7 @@ public void testToTableRow_array_row() {
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("raw_bytes", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
assertThat(row, hasEntry("boolean", "true"));
assertThat(row, hasEntry("long", "123"));
Expand All @@ -726,7 +742,7 @@ public void testToTableRow_array_row() {
public void testToTableRow_null_row() {
TableRow row = toTableRow().apply(NULL_FLAT_ROW);

assertThat(row.size(), equalTo(22));
assertThat(row.size(), equalTo(23));
assertThat(row, hasEntry("id", null));
assertThat(row, hasEntry("value", null));
assertThat(row, hasEntry("name", null));
Expand All @@ -745,6 +761,7 @@ public void testToTableRow_null_row() {
assertThat(row, hasEntry("time0s_0ns", null));
assertThat(row, hasEntry("valid", null));
assertThat(row, hasEntry("binary", null));
assertThat(row, hasEntry("raw_bytes", null));
assertThat(row, hasEntry("numeric", null));
assertThat(row, hasEntry("boolean", null));
assertThat(row, hasEntry("long", null));
Expand Down
Loading

0 comments on commit 1032533

Please sign in to comment.