Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add file loads streaming integration tests #28312

Merged
merged 10 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,9 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) {
exclude '**/FhirIOLROIT.class'
exclude '**/FhirIOSearchIT.class'
exclude '**/FhirIOPatientEverythingIT.class'
// failing due to pane index not incrementing after Reshuffle:
// https://github.com/apache/beam/issues/28219
exclude '**/FileLoadsStreamingIT.class'

maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
Expand Down Expand Up @@ -399,10 +398,12 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
"Window Into Global Windows",
Window.<KV<DestinationT, WriteTables.Result>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
.apply("Add Void Key", WithKeys.of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
.apply("GroupByKey", GroupByKey.create())
.apply("Extract Values", Values.create())
// We use this and the following GBK to aggregate by final destination.
// This way, each destination has its own pane sequence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this fixes #28309, thanks!

.apply("AddDestinationKeys", WithKeys.of(result -> result.getKey()))
.setCoder(KvCoder.of(destinationCoder, tempTables.getCoder()))
.apply("GroupTempTablesByFinalDestination", GroupByKey.create())
.apply("ExtractTempTables", Values.create())
.apply(
ParDo.of(
new UpdateSchemaDestination<DestinationT>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
}
}

if (jsonBQValue instanceof byte[] && fieldType.getTypeName() == TypeName.BYTES) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious - what does this short cut return do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type (bye[]) was previously unsupported so converting a TableRow with byte[] field to Beam Row would throw an UnsupportedOperationException below.

return jsonBQValue;
}

if (jsonBQValue instanceof List) {
if (fieldType.getCollectionElementType() == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class BigQueryUtilsTest {
.addNullableField("time0s_0ns", Schema.FieldType.logicalType(SqlTypes.TIME))
.addNullableField("valid", Schema.FieldType.BOOLEAN)
.addNullableField("binary", Schema.FieldType.BYTES)
.addNullableField("raw_bytes", Schema.FieldType.BYTES)
.addNullableField("numeric", Schema.FieldType.DECIMAL)
.addNullableField("boolean", Schema.FieldType.BOOLEAN)
.addNullableField("long", Schema.FieldType.INT64)
Expand Down Expand Up @@ -188,6 +189,9 @@ public class BigQueryUtilsTest {
private static final TableFieldSchema BINARY =
new TableFieldSchema().setName("binary").setType(StandardSQLTypeName.BYTES.toString());

private static final TableFieldSchema RAW_BYTES =
new TableFieldSchema().setName("raw_bytes").setType(StandardSQLTypeName.BYTES.toString());

private static final TableFieldSchema NUMERIC =
new TableFieldSchema().setName("numeric").setType(StandardSQLTypeName.NUMERIC.toString());

Expand Down Expand Up @@ -246,6 +250,7 @@ public class BigQueryUtilsTest {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -276,6 +281,7 @@ public class BigQueryUtilsTest {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -316,6 +322,7 @@ public class BigQueryUtilsTest {
LocalTime.parse("12:34"),
false,
Base64.getDecoder().decode("ABCD1234"),
Base64.getDecoder().decode("ABCD1234"),
new BigDecimal("123.456").setScale(3, RoundingMode.HALF_UP),
true,
123L,
Expand Down Expand Up @@ -346,6 +353,7 @@ public class BigQueryUtilsTest {
.set("time0s_0ns", "12:34:00")
.set("valid", "false")
.set("binary", "ABCD1234")
.set("raw_bytes", Base64.getDecoder().decode("ABCD1234"))
.set("numeric", "123.456")
.set("boolean", true)
.set("long", 123L)
Expand All @@ -355,7 +363,7 @@ public class BigQueryUtilsTest {
Row.withSchema(FLAT_TYPE)
.addValues(
null, null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null)
null, null, null, null, null, null, null, null, null)
.build();

private static final TableRow BQ_NULL_FLAT_ROW =
Expand All @@ -378,6 +386,7 @@ public class BigQueryUtilsTest {
.set("time0s_0ns", null)
.set("valid", null)
.set("binary", null)
.set("raw_bytes", null)
.set("numeric", null)
.set("boolean", null)
.set("long", null)
Expand Down Expand Up @@ -457,6 +466,7 @@ public class BigQueryUtilsTest {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -512,6 +522,7 @@ public void testToTableSchema_flat() {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -562,6 +573,7 @@ public void testToTableSchema_row() {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand Down Expand Up @@ -598,6 +610,7 @@ public void testToTableSchema_array_row() {
TIME_0S_0NS,
VALID,
BINARY,
RAW_BYTES,
NUMERIC,
BOOLEAN,
LONG,
Expand All @@ -620,7 +633,7 @@ public void testToTableSchema_map() {
public void testToTableRow_flat() {
TableRow row = toTableRow().apply(FLAT_ROW);

assertThat(row.size(), equalTo(22));
assertThat(row.size(), equalTo(23));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
Expand All @@ -635,6 +648,7 @@ public void testToTableRow_flat() {
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("raw_bytes", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
assertThat(row, hasEntry("boolean", "true"));
assertThat(row, hasEntry("long", "123"));
Expand Down Expand Up @@ -674,7 +688,7 @@ public void testToTableRow_row() {

assertThat(row.size(), equalTo(1));
row = (TableRow) row.get("row");
assertThat(row.size(), equalTo(22));
assertThat(row.size(), equalTo(23));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
Expand All @@ -689,6 +703,7 @@ public void testToTableRow_row() {
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("raw_bytes", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
assertThat(row, hasEntry("boolean", "true"));
assertThat(row, hasEntry("long", "123"));
Expand All @@ -701,7 +716,7 @@ public void testToTableRow_array_row() {

assertThat(row.size(), equalTo(1));
row = ((List<TableRow>) row.get("rows")).get(0);
assertThat(row.size(), equalTo(22));
assertThat(row.size(), equalTo(23));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
Expand All @@ -716,6 +731,7 @@ public void testToTableRow_array_row() {
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("raw_bytes", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
assertThat(row, hasEntry("boolean", "true"));
assertThat(row, hasEntry("long", "123"));
Expand All @@ -726,7 +742,7 @@ public void testToTableRow_array_row() {
public void testToTableRow_null_row() {
TableRow row = toTableRow().apply(NULL_FLAT_ROW);

assertThat(row.size(), equalTo(22));
assertThat(row.size(), equalTo(23));
assertThat(row, hasEntry("id", null));
assertThat(row, hasEntry("value", null));
assertThat(row, hasEntry("name", null));
Expand All @@ -745,6 +761,7 @@ public void testToTableRow_null_row() {
assertThat(row, hasEntry("time0s_0ns", null));
assertThat(row, hasEntry("valid", null));
assertThat(row, hasEntry("binary", null));
assertThat(row, hasEntry("raw_bytes", null));
assertThat(row, hasEntry("numeric", null));
assertThat(row, hasEntry("boolean", null));
assertThat(row, hasEntry("long", null));
Expand Down
Loading
Loading