From 2312dbeea47346ff2ae08b0bc1359b6b63c04b11 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Tue, 17 Sep 2024 14:03:41 +0200 Subject: [PATCH 1/4] Improve BQ avro handling --- .../io/gcp/bigquery/BigQueryAvroUtils.java | 460 ++++++++---------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 37 +- .../gcp/bigquery/BigQueryQuerySourceDef.java | 12 +- .../io/gcp/bigquery/BigQuerySourceBase.java | 3 +- .../io/gcp/bigquery/BigQuerySourceDef.java | 9 + .../bigquery/BigQueryStorageSourceBase.java | 28 +- .../gcp/bigquery/BigQueryTableSourceDef.java | 12 +- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 59 ++- .../gcp/bigquery/BigQueryAvroUtilsTest.java | 444 +++++++++++------ .../bigquery/BigQueryIOStorageQueryTest.java | 19 +- .../bigquery/BigQueryIOStorageReadTest.java | 80 ++- 11 files changed, 647 insertions(+), 516 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index bdee2eef570d..b24fe13262c2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -21,15 +21,12 @@ import static java.time.temporal.ChronoField.MINUTE_OF_HOUR; import static java.time.temporal.ChronoField.NANO_OF_SECOND; import static java.time.temporal.ChronoField.SECOND_OF_MINUTE; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verifyNotNull; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; -import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalTime; @@ -37,20 +34,17 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMultimap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.format.DateTimeFormat; @@ -64,35 +58,95 @@ */ class BigQueryAvroUtils { + static class DateTimeLogicalType extends LogicalType { + public DateTimeLogicalType() { + super("datetime"); + } + } + + static final DateTimeLogicalType DATETIME_LOGICAL_TYPE = new DateTimeLogicalType(); + /** * Defines the valid mapping between BigQuery types and native Avro types. * - *

Some BigQuery types are duplicated here since slightly different Avro records are produced - * when exporting data in Avro format and when reading data directly using the read API. + * @see BQ avro + * export + * @see BQ + * avro storage */ - static final ImmutableMultimap BIG_QUERY_TO_AVRO_TYPES = - ImmutableMultimap.builder() - .put("STRING", Type.STRING) - .put("GEOGRAPHY", Type.STRING) - .put("BYTES", Type.BYTES) - .put("INTEGER", Type.LONG) - .put("INT64", Type.LONG) - .put("FLOAT", Type.DOUBLE) - .put("FLOAT64", Type.DOUBLE) - .put("NUMERIC", Type.BYTES) - .put("BIGNUMERIC", Type.BYTES) - .put("BOOLEAN", Type.BOOLEAN) - .put("BOOL", Type.BOOLEAN) - .put("TIMESTAMP", Type.LONG) - .put("RECORD", Type.RECORD) - .put("STRUCT", Type.RECORD) - .put("DATE", Type.STRING) - .put("DATE", Type.INT) - .put("DATETIME", Type.STRING) - .put("TIME", Type.STRING) - .put("TIME", Type.LONG) - .put("JSON", Type.STRING) - .build(); + static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTypes) { + String bqType = schema.getType(); + switch (bqType) { + case "BOOL": + case "BOOLEAN": + // boolean + return SchemaBuilder.builder().booleanType(); + case "BYTES": + // bytes + return SchemaBuilder.builder().bytesType(); + case "FLOAT64": + case "FLOAT": // even if not a valid BQ type, it is used in the schema + // double + return SchemaBuilder.builder().doubleType(); + case "INT64": + case "INT": + case "SMALLINT": + case "INTEGER": + case "BIGINT": + case "TINYINT": + case "BYTEINT": + // long + return SchemaBuilder.builder().longType(); + case "STRING": + // string + return SchemaBuilder.builder().stringType(); + case "NUMERIC": + case "BIGNUMERIC": + // decimal + LogicalType logicalType; + if (schema.getScale() != null) { + logicalType = + LogicalTypes.decimal(schema.getPrecision().intValue(), schema.getScale().intValue()); + } else if (schema.getPrecision() != null) { + logicalType = LogicalTypes.decimal(schema.getPrecision().intValue()); + } else if (bqType.equals("NUMERIC")) { + logicalType = LogicalTypes.decimal(38, 9); + } else { + logicalType = LogicalTypes.decimal(77, 38); + } + return logicalType.addToSchema(SchemaBuilder.builder().bytesType()); + case "DATE": + if (useAvroLogicalTypes) { + return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); + } else { + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + } + case "DATETIME": + if (useAvroLogicalTypes) { + return DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType()); + } else { + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + } + case "TIME": + if (useAvroLogicalTypes) { + return LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType()); + } else { + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + } + case "TIMESTAMP": + // somehow the doc is wrong and BQ always uses logical type + return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType()); + case "GEOGRAPHY": + case "JSON": + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + case "RECORD": + case "STRUCT": + // record + throw new IllegalArgumentException("RECORD/STRUCT are not primitive types"); + default: + throw new IllegalArgumentException("Unknown BigQuery type: " + bqType); + } + } /** * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and @@ -176,60 +230,32 @@ private static String formatTime(long timeMicros) { return LocalTime.ofNanoOfDay(timeMicros * 1000).format(formatter); } - static TableSchema trimBigQueryTableSchema(TableSchema inputSchema, Schema avroSchema) { - List subSchemas = - inputSchema.getFields().stream() - .flatMap(fieldSchema -> mapTableFieldSchema(fieldSchema, avroSchema)) - .collect(Collectors.toList()); - - return new TableSchema().setFields(subSchemas); - } - - private static Stream mapTableFieldSchema( - TableFieldSchema fieldSchema, Schema avroSchema) { - Field avroFieldSchema = avroSchema.getField(fieldSchema.getName()); - if (avroFieldSchema == null) { - return Stream.empty(); - } else if (avroFieldSchema.schema().getType() != Type.RECORD) { - return Stream.of(fieldSchema); - } - - List subSchemas = - fieldSchema.getFields().stream() - .flatMap(subSchema -> mapTableFieldSchema(subSchema, avroFieldSchema.schema())) - .collect(Collectors.toList()); - - TableFieldSchema output = - new TableFieldSchema() - .setCategories(fieldSchema.getCategories()) - .setDescription(fieldSchema.getDescription()) - .setFields(subSchemas) - .setMode(fieldSchema.getMode()) - .setName(fieldSchema.getName()) - .setType(fieldSchema.getType()); - - return Stream.of(output); - } - /** * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}. * *

See "Avro * format" for more information. + * + * @deprecated Only kept for previous TableRowParser implementation */ + @Deprecated static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) { - return convertGenericRecordToTableRow(record, schema.getFields()); + return convertGenericRecordToTableRow(record); } - private static TableRow convertGenericRecordToTableRow( - GenericRecord record, List fields) { + /** + * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}. + * + *

See "Avro + * format" for more information. + */ + static TableRow convertGenericRecordToTableRow(GenericRecord record) { TableRow row = new TableRow(); - for (TableFieldSchema subSchema : fields) { - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field - // is required, so it may not be null. - Field field = record.getSchema().getField(subSchema.getName()); + Schema schema = record.getSchema(); + + for (Field field : schema.getFields()) { Object convertedValue = - getTypedCellValue(field.schema(), subSchema, record.get(field.name())); + getTypedCellValue(field.name(), field.schema(), record.get(field.pos())); if (convertedValue != null) { // To match the JSON files exported by BigQuery, do not include null values in the output. row.set(field.name(), convertedValue); @@ -239,32 +265,25 @@ private static TableRow convertGenericRecordToTableRow( return row; } - private static @Nullable Object getTypedCellValue( - Schema schema, TableFieldSchema fieldSchema, Object v) { + private static @Nullable Object getTypedCellValue(String name, Schema schema, Object v) { // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field // is optional (and so it may be null), but defaults to "NULLABLE". - String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE"); - switch (mode) { - case "REQUIRED": - return convertRequiredField(schema.getType(), schema.getLogicalType(), fieldSchema, v); - case "REPEATED": - return convertRepeatedField(schema, fieldSchema, v); - case "NULLABLE": - return convertNullableField(schema, fieldSchema, v); - default: + Type type = schema.getType(); + switch (type) { + case ARRAY: + return convertRepeatedField(name, schema.getElementType(), v); + case UNION: + return convertNullableField(name, schema, v); + case MAP: throw new UnsupportedOperationException( - "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode()); + String.format( + "Unexpected BigQuery field schema type %s for field named %s", type, name)); + default: + return convertRequiredField(name, schema, v); } } - private static List convertRepeatedField( - Schema schema, TableFieldSchema fieldSchema, Object v) { - Type arrayType = schema.getType(); - verify( - arrayType == Type.ARRAY, - "BigQuery REPEATED field %s should be Avro ARRAY, not %s", - fieldSchema.getName(), - arrayType); + private static List convertRepeatedField(String name, Schema elementType, Object v) { // REPEATED fields are represented as Avro arrays. if (v == null) { // Handle the case of an empty repeated field. @@ -273,145 +292,104 @@ private static List convertRepeatedField( @SuppressWarnings("unchecked") List elements = (List) v; ArrayList values = new ArrayList<>(); - Type elementType = schema.getElementType().getType(); - LogicalType elementLogicalType = schema.getElementType().getLogicalType(); for (Object element : elements) { - values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element)); + values.add(convertRequiredField(name, elementType, element)); } return values; } - private static Object convertRequiredField( - Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) { + private static Object convertRequiredField(String name, Schema schema, Object v) { // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. - checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName()); - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field - // is required, so it may not be null. - String bqType = fieldSchema.getType(); - ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); - verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); - verify( - expectedAvroTypes.contains(avroType), - "Expected Avro schema types %s for BigQuery %s field %s, but received %s", - expectedAvroTypes, - bqType, - fieldSchema.getName(), - avroType); + checkNotNull(v, "REQUIRED field %s should not be null", name); + // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. - switch (bqType) { - case "STRING": - case "DATETIME": - case "GEOGRAPHY": - case "JSON": - // Avro will use a CharSequence to represent String objects, but it may not always use - // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); - case "DATE": - if (avroType == Type.INT) { - verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Date logical type"); - verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type"); + Type type = schema.getType(); + LogicalType logicalType = schema.getLogicalType(); + switch (type) { + case BOOLEAN: + // SQL types BOOL, BOOLEAN + return v; + case INT: + if (logicalType instanceof LogicalTypes.Date) { + // SQL types DATE return formatDate((Integer) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + throw new UnsupportedOperationException( + String.format( + "Unexpected BigQuery field schema type %s for field named %s", type, name)); } - case "TIME": - if (avroType == Type.LONG) { - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected TimeMicros logical type"); - verify( - avroLogicalType instanceof LogicalTypes.TimeMicros, - "Expected TimeMicros logical type"); + case LONG: + if (logicalType instanceof LogicalTypes.TimeMicros) { + // SQL types TIME return formatTime((Long) v); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + // SQL types TIMESTAMP + return formatTimestamp((Long) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT) + return ((Long) v).toString(); } - case "INTEGER": - case "INT64": - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - return ((Long) v).toString(); - case "FLOAT": - case "FLOAT64": - verify(v instanceof Double, "Expected Double, got %s", v.getClass()); + case DOUBLE: + // SQL types FLOAT64 return v; - case "NUMERIC": - case "BIGNUMERIC": - // NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are - // converted back to Strings with precision and scale determined by the logical type. - verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Decimal logical type"); - verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type"); - BigDecimal numericValue = - new Conversions.DecimalConversion() - .fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType); - return numericValue.toString(); - case "BOOL": - case "BOOLEAN": - verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass()); - return v; - case "TIMESTAMP": - // TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch. - // Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC. - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - return formatTimestamp((Long) v); - case "RECORD": - case "STRUCT": - verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass()); - return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields()); - case "BYTES": - verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass()); - ByteBuffer byteBuffer = (ByteBuffer) v; - byte[] bytes = new byte[byteBuffer.limit()]; - byteBuffer.get(bytes); - return BaseEncoding.base64().encode(bytes); + case BYTES: + if (logicalType instanceof LogicalTypes.Decimal) { + // SQL tpe NUMERIC, BIGNUMERIC + return new Conversions.DecimalConversion() + .fromBytes((ByteBuffer) v, schema, logicalType) + .toString(); + } else { + // SQL types BYTES + return BaseEncoding.base64().encode(((ByteBuffer) v).array()); + } + case STRING: + // SQL types STRING, DATETIME, GEOGRAPHY, JSON + // when not using logical type DATE, TIME too + return v.toString(); + case RECORD: + return convertGenericRecordToTableRow((GenericRecord) v); default: throw new UnsupportedOperationException( String.format( - "Unexpected BigQuery field schema type %s for field named %s", - fieldSchema.getType(), fieldSchema.getName())); + "Unexpected BigQuery field schema type %s for field named %s", type, name)); } } - private static @Nullable Object convertNullableField( - Schema avroSchema, TableFieldSchema fieldSchema, Object v) { + private static @Nullable Object convertNullableField(String name, Schema union, Object v) { // NULLABLE fields are represented as an Avro Union of the corresponding type and "null". verify( - avroSchema.getType() == Type.UNION, + union.getType() == Type.UNION, "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s", - avroSchema.getType(), - fieldSchema.getName()); - List unionTypes = avroSchema.getTypes(); + union.getType(), + name); + List unionTypes = union.getTypes(); verify( unionTypes.size() == 2, "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s", - fieldSchema.getName(), - unionTypes); + name, + union); - if (v == null) { + Schema type = union.getTypes().get(GenericData.get().resolveUnion(union, v)); + if (type.getType() == Type.NULL) { return null; + } else { + return convertRequiredField(name, type, v); } - - Type firstType = unionTypes.get(0).getType(); - if (!firstType.equals(Type.NULL)) { - return convertRequiredField(firstType, unionTypes.get(0).getLogicalType(), fieldSchema, v); - } - return convertRequiredField( - unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(), fieldSchema, v); } - static Schema toGenericAvroSchema( - String schemaName, List fieldSchemas, @Nullable String namespace) { + private static Schema toGenericAvroSchema( + String schemaName, + List fieldSchemas, + Boolean useAvroLogicalTypes, + @Nullable String namespace) { String nextNamespace = namespace == null ? null : String.format("%s.%s", namespace, schemaName); List avroFields = new ArrayList<>(); for (TableFieldSchema bigQueryField : fieldSchemas) { - avroFields.add(convertField(bigQueryField, nextNamespace)); + avroFields.add(convertField(bigQueryField, useAvroLogicalTypes, nextNamespace)); } return Schema.createRecord( schemaName, @@ -421,11 +399,19 @@ static Schema toGenericAvroSchema( avroFields); } - static Schema toGenericAvroSchema(String schemaName, List fieldSchemas) { - return toGenericAvroSchema( - schemaName, - fieldSchemas, - hasNamespaceCollision(fieldSchemas) ? "org.apache.beam.sdk.io.gcp.bigquery" : null); + static Schema toGenericAvroSchema(TableSchema tableSchema) { + return toGenericAvroSchema("root", tableSchema.getFields(), true); + } + + static Schema toGenericAvroSchema(TableSchema tableSchema, Boolean useAvroLogicalTypes) { + return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes); + } + + static Schema toGenericAvroSchema( + String schemaName, List fieldSchemas, Boolean useAvroLogicalTypes) { + String namespace = + hasNamespaceCollision(fieldSchemas) ? "org.apache.beam.sdk.io.gcp.bigquery" : null; + return toGenericAvroSchema(schemaName, fieldSchemas, useAvroLogicalTypes, namespace); } // To maintain backwards compatibility we only disambiguate collisions in the field namespaces as @@ -452,64 +438,30 @@ private static boolean hasNamespaceCollision(List fieldSchemas @SuppressWarnings({ "nullness" // Avro library not annotated }) - private static Field convertField(TableFieldSchema bigQueryField, @Nullable String namespace) { - ImmutableCollection avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType()); - if (avroTypes.isEmpty()) { - throw new IllegalArgumentException( - "Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type."); - } - - Type avroType = avroTypes.iterator().next(); - Schema elementSchema; - if (avroType == Type.RECORD) { - elementSchema = - toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields(), namespace); - } else { - elementSchema = handleAvroLogicalTypes(bigQueryField, avroType); - } + private static Field convertField( + TableFieldSchema bigQueryField, Boolean useAvroLogicalTypes, @Nullable String namespace) { + String fieldName = bigQueryField.getName(); Schema fieldSchema; - if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema); - } else if ("REQUIRED".equals(bigQueryField.getMode())) { - fieldSchema = elementSchema; - } else if ("REPEATED".equals(bigQueryField.getMode())) { - fieldSchema = Schema.createArray(elementSchema); + String bqType = bigQueryField.getType(); + if ("RECORD".equals(bqType) || "STRUCT".equals(bqType)) { + fieldSchema = + toGenericAvroSchema(fieldName, bigQueryField.getFields(), useAvroLogicalTypes, namespace); } else { - throw new IllegalArgumentException( - String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode())); + fieldSchema = getPrimitiveType(bigQueryField, useAvroLogicalTypes); + } + + String bqMode = bigQueryField.getMode(); + if (bqMode == null || "NULLABLE".equals(bqMode)) { + fieldSchema = SchemaBuilder.unionOf().nullType().and().type(fieldSchema).endUnion(); + } else if ("REPEATED".equals(bqMode)) { + fieldSchema = SchemaBuilder.array().items(fieldSchema); + } else if (!"REQUIRED".equals(bqMode)) { + throw new IllegalArgumentException(String.format("Unknown BigQuery Field Mode: %s", bqMode)); } return new Field( - bigQueryField.getName(), + fieldName, fieldSchema, bigQueryField.getDescription(), (Object) null /* Cast to avoid deprecated JsonNode constructor. */); } - - private static Schema handleAvroLogicalTypes(TableFieldSchema bigQueryField, Type avroType) { - String bqType = bigQueryField.getType(); - switch (bqType) { - case "NUMERIC": - // Default value based on - // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types - int precision = Optional.ofNullable(bigQueryField.getPrecision()).orElse(38L).intValue(); - int scale = Optional.ofNullable(bigQueryField.getScale()).orElse(9L).intValue(); - return LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Type.BYTES)); - case "BIGNUMERIC": - // Default value based on - // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types - int precisionBigNumeric = - Optional.ofNullable(bigQueryField.getPrecision()).orElse(77L).intValue(); - int scaleBigNumeric = Optional.ofNullable(bigQueryField.getScale()).orElse(38L).intValue(); - return LogicalTypes.decimal(precisionBigNumeric, scaleBigNumeric) - .addToSchema(Schema.create(Type.BYTES)); - case "TIMESTAMP": - return LogicalTypes.timestampMicros().addToSchema(Schema.create(Type.LONG)); - case "GEOGRAPHY": - Schema geoSchema = Schema.create(Type.STRING); - geoSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "geography_wkt"); - return geoSchema; - default: - return Schema.create(avroType); - } - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 79a3249d6bc9..7bd2eb7da62a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -627,9 +627,7 @@ public class BigQueryIO { GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>(); private static final SerializableFunction - DEFAULT_AVRO_SCHEMA_FACTORY = - (SerializableFunction) - input -> BigQueryAvroUtils.toGenericAvroSchema("root", input.getFields()); + DEFAULT_AVRO_SCHEMA_FACTORY = BigQueryAvroUtils::toGenericAvroSchema; /** * @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link @@ -649,12 +647,12 @@ public static Read read() { * domain-specific type, due to the overhead of converting the rows to {@link TableRow}. */ public static TypedRead readTableRows() { - return read(new TableRowParser()).withCoder(TableRowJsonCoder.of()); + return read(TableRowParser.INSTANCE).withCoder(TableRowJsonCoder.of()); } /** Like {@link #readTableRows()} but with {@link Schema} support. */ public static TypedRead readTableRowsWithSchema() { - return read(new TableRowParser()) + return read(TableRowParser.INSTANCE) .withCoder(TableRowJsonCoder.of()) .withBeamRowConverters( TypeDescriptor.of(TableRow.class), @@ -793,8 +791,7 @@ static class TableRowParser implements SerializableFunction expand(PBegin input) { Schema beamSchema = null; if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) { - beamSchema = sourceDef.getBeamSchema(bqOptions); - beamSchema = getFinalSchema(beamSchema, getSelectedFields()); + TableSchema tableSchema = sourceDef.getTableSchema(bqOptions); + ValueProvider> selectedFields = getSelectedFields(); + if (selectedFields != null) { + tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFields.get()); + } + beamSchema = BigQueryUtils.fromTableSchema(tableSchema); } final Coder coder = inferCoder(p.getCoderRegistry()); @@ -1441,24 +1442,6 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception { return rows; } - private static Schema getFinalSchema( - Schema beamSchema, ValueProvider> selectedFields) { - List flds = - beamSchema.getFields().stream() - .filter( - field -> { - if (selectedFields != null - && selectedFields.isAccessible() - && selectedFields.get() != null) { - return selectedFields.get().contains(field.getName()); - } else { - return true; - } - }) - .collect(Collectors.toList()); - return Schema.builder().addFields(flds).build(); - } - private PCollection expandForDirectRead( PBegin input, Coder outputCoder, Schema beamSchema, BigQueryOptions bqOptions) { ValueProvider tableProvider = getTableProvider(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java index b4035a4e9ac3..9d0d82f72eb8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java @@ -178,7 +178,7 @@ public BigQuerySourceBase toSource( /** {@inheritDoc} */ @Override - public Schema getBeamSchema(BigQueryOptions bqOptions) { + public TableSchema getTableSchema(BigQueryOptions bqOptions) { try { JobStatistics stats = BigQueryQueryHelper.dryRunQueryIfNeeded( @@ -189,14 +189,20 @@ public Schema getBeamSchema(BigQueryOptions bqOptions) { flattenResults, useLegacySql, location); - TableSchema tableSchema = stats.getQuery().getSchema(); - return BigQueryUtils.fromTableSchema(tableSchema); + return stats.getQuery().getSchema(); } catch (IOException | InterruptedException | NullPointerException e) { throw new BigQuerySchemaRetrievalException( "Exception while trying to retrieve schema of query", e); } } + /** {@inheritDoc} */ + @Override + public Schema getBeamSchema(BigQueryOptions bqOptions) { + TableSchema tableSchema = getTableSchema(bqOptions); + return BigQueryUtils.fromTableSchema(tableSchema); + } + ValueProvider getQuery() { return query; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index a8985775cbe7..b7b83dccaece 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -243,8 +243,7 @@ private List executeExtract( List> createSources( List files, TableSchema schema, @Nullable List metadata) throws IOException, InterruptedException { - String avroSchema = - BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString(); + String avroSchema = BigQueryAvroUtils.toGenericAvroSchema(schema).toString(); AvroSource.DatumReaderFactory factory = readerFactory.apply(schema); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java index c9b1d5f73224..579a602ab552 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java @@ -45,6 +45,15 @@ BigQuerySourceBase toSource( SerializableFunction> readerFactory, boolean useAvroLogicalTypes); + /** + * Extract the {@link TableSchema} corresponding to this source. + * + * @param bqOptions BigQueryOptions + * @return table schema of the source + * @throws BigQuerySchemaRetrievalException if schema retrieval fails + */ + TableSchema getTableSchema(BigQueryOptions bqOptions); + /** * Extract the Beam {@link Schema} corresponding to this source. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java index 51a5a8f391a6..cef2e3aee3f4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java @@ -28,10 +28,7 @@ import com.google.cloud.bigquery.storage.v1.ReadStream; import java.io.IOException; import java.util.List; -import org.apache.avro.Schema; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.extensions.arrow.ArrowConversion; -import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; import org.apache.beam.sdk.metrics.Lineage; @@ -182,30 +179,17 @@ public List> split( LOG.info("Read session returned {} streams", readSession.getStreamsList().size()); } - Schema sessionSchema; - if (readSession.getDataFormat() == DataFormat.ARROW) { - org.apache.arrow.vector.types.pojo.Schema schema = - ArrowConversion.arrowSchemaFromInput( - readSession.getArrowSchema().getSerializedSchema().newInput()); - org.apache.beam.sdk.schemas.Schema beamSchema = - ArrowConversion.ArrowSchemaTranslator.toBeamSchema(schema); - sessionSchema = AvroUtils.toAvroSchema(beamSchema); - } else if (readSession.getDataFormat() == DataFormat.AVRO) { - sessionSchema = new Schema.Parser().parse(readSession.getAvroSchema().getSchema()); - } else { - throw new IllegalArgumentException( - "data is not in a supported dataFormat: " + readSession.getDataFormat()); + // TODO: this is inconsistent with method above, where it can be null + Preconditions.checkStateNotNull(targetTable); + TableSchema tableSchema = targetTable.getSchema(); + if (selectedFieldsProvider != null) { + tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFieldsProvider.get()); } - - Preconditions.checkStateNotNull( - targetTable); // TODO: this is inconsistent with method above, where it can be null - TableSchema trimmedSchema = - BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema); List> sources = Lists.newArrayList(); for (ReadStream readStream : readSession.getStreamsList()) { sources.add( BigQueryStorageStreamSource.create( - readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices)); + readSession, readStream, tableSchema, parseFn, outputCoder, bqServices)); } return ImmutableList.copyOf(sources); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java index b399900f9a24..cc1532a522bc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java @@ -102,16 +102,22 @@ public BigQuerySourceBase toSource( /** {@inheritDoc} */ @Override - public Schema getBeamSchema(BigQueryOptions bqOptions) { + public TableSchema getTableSchema(BigQueryOptions bqOptions) { try { try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) { TableReference tableRef = getTableReference(bqOptions); Table table = datasetService.getTable(tableRef); - TableSchema tableSchema = Preconditions.checkStateNotNull(table).getSchema(); - return BigQueryUtils.fromTableSchema(tableSchema); + return Preconditions.checkStateNotNull(table).getSchema(); } } catch (Exception e) { throw new BigQuerySchemaRetrievalException("Exception while trying to retrieve schema", e); } } + + /** {@inheritDoc} */ + @Override + public Schema getBeamSchema(BigQueryOptions bqOptions) { + TableSchema tableSchema = getTableSchema(bqOptions); + return BigQueryUtils.fromTableSchema(tableSchema); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 305abad5783a..d9d2b135f22b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -322,15 +322,21 @@ private static FieldType fromTableFieldSchemaType( case "BYTES": return FieldType.BYTES; case "INT64": + case "INT": + case "SMALLINT": case "INTEGER": + case "BIGINT": + case "TINYINT": + case "BYTEINT": return FieldType.INT64; case "FLOAT64": - case "FLOAT": + case "FLOAT": // even if not a valid BQ type, it is used in the schema return FieldType.DOUBLE; case "BOOL": case "BOOLEAN": return FieldType.BOOLEAN; case "NUMERIC": + case "BIGNUMERIC": return FieldType.DECIMAL; case "TIMESTAMP": return FieldType.DATETIME; @@ -355,6 +361,10 @@ private static FieldType fromTableFieldSchemaType( Schema rowSchema = fromTableFieldSchema(nestedFields, options); return FieldType.row(rowSchema); + case "GEOGRAPHY": + case "JSON": + // TODO Add metadata for custom sql types + return FieldType.STRING; default: throw new UnsupportedOperationException( "Converting BigQuery type " + typeName + " to Beam type is unsupported"); @@ -446,10 +456,27 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp return fromTableFieldSchema(tableSchema.getFields(), options); } + /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */ + public static org.apache.avro.Schema toGenericAvroSchema(TableSchema tableSchema) { + return toGenericAvroSchema(tableSchema, false); + } + + /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */ + public static org.apache.avro.Schema toGenericAvroSchema( + TableSchema tableSchema, Boolean stringLogicalTypes) { + return toGenericAvroSchema("root", tableSchema.getFields(), stringLogicalTypes); + } + /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ public static org.apache.avro.Schema toGenericAvroSchema( String schemaName, List fieldSchemas) { - return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas); + return toGenericAvroSchema(schemaName, fieldSchemas, false); + } + + /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ + public static org.apache.avro.Schema toGenericAvroSchema( + String schemaName, List fieldSchemas, Boolean stringLogicalTypes) { + return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, stringLogicalTypes); } private static final BigQueryIO.TypedRead.ToBeamRowFunction @@ -514,9 +541,20 @@ public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptio return Row.withSchema(schema).addValues(valuesInOrder).build(); } + /** + * Convert generic record to Bq TableRow. + * + * @deprecated use {@link #convertGenericRecordToTableRow(GenericRecord)} + */ + @Deprecated public static TableRow convertGenericRecordToTableRow( GenericRecord record, TableSchema tableSchema) { - return BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); + return convertGenericRecordToTableRow(record); + } + + /** Convert generic record to Bq TableRow. */ + public static TableRow convertGenericRecordToTableRow(GenericRecord record) { + return BigQueryAvroUtils.convertGenericRecordToTableRow(record); } /** Convert a Beam Row to a BigQuery TableRow. */ @@ -1039,6 +1077,21 @@ private static Object convertAvroNumeric(Object value) { return tableSpec; } + static TableSchema trimSchema(TableSchema schema, @Nullable List selectedFields) { + if (selectedFields == null || selectedFields.isEmpty()) { + return schema; + } + + List fields = schema.getFields(); + List trimmedFields = new ArrayList<>(); + for (TableFieldSchema field : fields) { + if (selectedFields.contains(field.getName())) { + trimmedFields.add(field); + } + } + return new TableSchema().setFields(trimmedFields); + } + private static @Nullable ServiceCallMetric callMetricForMethod( @Nullable TableReference tableReference, String method) { if (tableReference != null) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index c87888134c8a..0447b2d07b1a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryAvroUtils.DATETIME_LOGICAL_TYPE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -34,18 +35,16 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericData; +import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.reflect.AvroSchema; import org.apache.avro.reflect.Nullable; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.util.Utf8; -import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; -import org.apache.commons.lang3.tuple.Pair; +import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -96,64 +95,29 @@ public class BigQueryAvroUtilsTest { .setFields(subFields), new TableFieldSchema().setName("geoPositions").setType("GEOGRAPHY").setMode("NULLABLE")); - private Pair convertToByteBuffer(BigDecimal bigDecimal, Schema schema) { - LogicalType bigDecimalLogicalType = - LogicalTypes.decimal(bigDecimal.precision(), bigDecimal.scale()); - // DecimalConversion.toBytes returns a ByteBuffer, which can be mutated by callees if passed - // to other methods. We wrap the byte array as a ByteBuffer before adding it to the - // GenericRecords. - byte[] bigDecimalBytes = - new Conversions.DecimalConversion() - .toBytes(bigDecimal, schema, bigDecimalLogicalType) - .array(); - return Pair.of(bigDecimalLogicalType, bigDecimalBytes); + private ByteBuffer convertToBytes(BigDecimal bigDecimal, int precision, int scale) { + LogicalType bigDecimalLogicalType = LogicalTypes.decimal(precision, scale); + return new Conversions.DecimalConversion().toBytes(bigDecimal, null, bigDecimalLogicalType); } @Test public void testConvertGenericRecordToTableRow() throws Exception { - TableSchema tableSchema = new TableSchema(); - tableSchema.setFields(fields); - - // BigQuery encodes NUMERIC and BIGNUMERIC values to Avro using the BYTES type with the DECIMAL - // logical type. AvroCoder can't apply logical types to Schemas directly, so we need to get the - // Schema for the Bird class defined below, then replace the field used to test NUMERIC with - // a field that has the appropriate Schema. - Schema numericSchema = Schema.create(Type.BYTES); BigDecimal numeric = new BigDecimal("123456789.123456789"); - Pair numericPair = convertToByteBuffer(numeric, numericSchema); - Schema bigNumericSchema = Schema.create(Type.BYTES); + ByteBuffer numericBytes = convertToBytes(numeric, 38, 9); BigDecimal bigNumeric = new BigDecimal( "578960446186580977117854925043439539266.34992332820282019728792003956564819967"); - Pair bigNumericPair = convertToByteBuffer(bigNumeric, bigNumericSchema); - - // In order to update the Schema for NUMERIC and BIGNUMERIC values, we need to recreate all of - // the Fields. - List avroFields = new ArrayList<>(); - for (Schema.Field field : AvroCoder.of(Bird.class).getSchema().getFields()) { - Schema schema = field.schema(); - if ("birthdayMoney".equals(field.name())) { - // birthdayMoney is nullable field with type BYTES/DECIMAL. - schema = - Schema.createUnion( - Schema.create(Type.NULL), numericPair.getLeft().addToSchema(numericSchema)); - } else if ("lotteryWinnings".equals(field.name())) { - // lotteryWinnings is nullable field with type BYTES/DECIMAL. - schema = - Schema.createUnion( - Schema.create(Type.NULL), bigNumericPair.getLeft().addToSchema(bigNumericSchema)); - } - // After a Field is added to a Schema, it is assigned a position, so we can't simply reuse - // the existing Field. - avroFields.add(new Schema.Field(field.name(), schema, field.doc(), field.defaultVal())); - } - Schema avroSchema = Schema.createRecord(avroFields); + ByteBuffer bigNumericBytes = convertToBytes(bigNumeric, 77, 38); + Schema avroSchema = ReflectData.get().getSchema(Bird.class); { // Test nullable fields. - GenericRecord record = new GenericData.Record(avroSchema); - record.put("number", 5L); - TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); + GenericRecord record = + new GenericRecordBuilder(avroSchema) + .set("number", 5L) + .set("associates", new ArrayList()) + .build(); + TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow().set("number", "5").set("associates", new ArrayList()); assertEquals(row, convertedRow); TableRow clonedRow = convertedRow.clone(); @@ -162,51 +126,55 @@ public void testConvertGenericRecordToTableRow() throws Exception { { // Test type conversion for: // INTEGER, FLOAT, NUMERIC, TIMESTAMP, BOOLEAN, BYTES, DATE, DATETIME, TIME. - GenericRecord record = new GenericData.Record(avroSchema); byte[] soundBytes = "chirp,chirp".getBytes(StandardCharsets.UTF_8); - ByteBuffer soundByteBuffer = ByteBuffer.wrap(soundBytes); - soundByteBuffer.rewind(); - record.put("number", 5L); - record.put("quality", 5.0); - record.put("birthday", 5L); - record.put("birthdayMoney", ByteBuffer.wrap(numericPair.getRight())); - record.put("lotteryWinnings", ByteBuffer.wrap(bigNumericPair.getRight())); - record.put("flighted", Boolean.TRUE); - record.put("sound", soundByteBuffer); - record.put("anniversaryDate", new Utf8("2000-01-01")); - record.put("anniversaryDatetime", new String("2000-01-01 00:00:00.000005")); - record.put("anniversaryTime", new Utf8("00:00:00.000005")); - record.put("geoPositions", new String("LINESTRING(1 2, 3 4, 5 6, 7 8)")); - TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); + GenericRecord record = + new GenericRecordBuilder(avroSchema) + .set("number", 5L) + .set("quality", 5.0) + .set("birthday", 5L) + .set("birthdayMoney", numericBytes) + .set("lotteryWinnings", bigNumericBytes) + .set("flighted", Boolean.TRUE) + .set("sound", ByteBuffer.wrap(soundBytes)) + .set("anniversaryDate", new Utf8("2000-01-01")) + .set("anniversaryDatetime", "2000-01-01 00:00:00.000005") + .set("anniversaryTime", new Utf8("00:00:00.000005")) + .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)") + .set("associates", new ArrayList()) + .build(); + + TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow() - .set("number", "5") + .set("anniversaryDate", "2000-01-01") + .set("anniversaryDatetime", "2000-01-01 00:00:00.000005") + .set("anniversaryTime", "00:00:00.000005") + .set("associates", new ArrayList()) .set("birthday", "1970-01-01 00:00:00.000005 UTC") .set("birthdayMoney", numeric.toString()) + .set("flighted", Boolean.TRUE) + .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)") .set("lotteryWinnings", bigNumeric.toString()) + .set("number", "5") .set("quality", 5.0) - .set("associates", new ArrayList()) - .set("flighted", Boolean.TRUE) - .set("sound", BaseEncoding.base64().encode(soundBytes)) - .set("anniversaryDate", "2000-01-01") - .set("anniversaryDatetime", "2000-01-01 00:00:00.000005") - .set("anniversaryTime", "00:00:00.000005") - .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)"); - TableRow clonedRow = convertedRow.clone(); - assertEquals(convertedRow, clonedRow); + .set("sound", BaseEncoding.base64().encode(soundBytes)); assertEquals(row, convertedRow); + TableRow clonedRow = convertedRow.clone(); + assertEquals(clonedRow, convertedRow); } { // Test repeated fields. - Schema subBirdSchema = AvroCoder.of(Bird.SubBird.class).getSchema(); - GenericRecord nestedRecord = new GenericData.Record(subBirdSchema); - nestedRecord.put("species", "other"); - GenericRecord record = new GenericData.Record(avroSchema); - record.put("number", 5L); - record.put("associates", Lists.newArrayList(nestedRecord)); - record.put("birthdayMoney", ByteBuffer.wrap(numericPair.getRight())); - record.put("lotteryWinnings", ByteBuffer.wrap(bigNumericPair.getRight())); - TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema); + Schema subBirdSchema = ReflectData.get().getSchema(Bird.SubBird.class); + GenericRecord nestedRecord = + new GenericRecordBuilder(subBirdSchema).set("species", "other").build(); + GenericRecord record = + new GenericRecordBuilder(avroSchema) + .set("number", 5L) + .set("associates", Lists.newArrayList(nestedRecord)) + .set("birthdayMoney", numericBytes) + .set("lotteryWinnings", bigNumericBytes) + .build(); + TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow() .set("associates", Lists.newArrayList(new TableRow().set("species", "other"))) @@ -219,94 +187,270 @@ public void testConvertGenericRecordToTableRow() throws Exception { } } + @Test + public void testConvertBigQuerySchemaToAvroSchemaDisabledLogicalTypes() { + TableSchema tableSchema = new TableSchema(); + tableSchema.setFields(fields); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false); + + assertThat(avroSchema.getField("number").schema(), equalTo(SchemaBuilder.builder().longType())); + assertThat( + avroSchema.getField("species").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().stringType().endUnion())); + assertThat( + avroSchema.getField("quality").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().doubleType().endUnion())); + assertThat( + avroSchema.getField("quantity").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().longType().endUnion())); + assertThat( + avroSchema.getField("birthday").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type( + LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType())) + .endUnion())); + assertThat( + avroSchema.getField("birthdayMoney").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type(LogicalTypes.decimal(38, 9).addToSchema(SchemaBuilder.builder().bytesType())) + .endUnion())); + assertThat( + avroSchema.getField("lotteryWinnings").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type(LogicalTypes.decimal(77, 38).addToSchema(SchemaBuilder.builder().bytesType())) + .endUnion())); + assertThat( + avroSchema.getField("flighted").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().booleanType().endUnion())); + assertThat( + avroSchema.getField("sound").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().bytesType().endUnion())); + assertThat( + avroSchema.getField("anniversaryDate").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .stringBuilder() + .prop("sqlType", "DATE") + .endString() + .endUnion())); + assertThat( + avroSchema.getField("anniversaryDatetime").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .stringBuilder() + .prop("sqlType", "DATETIME") + .endString() + .endUnion())); + assertThat( + avroSchema.getField("anniversaryTime").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .stringBuilder() + .prop("sqlType", "TIME") + .endString() + .endUnion())); + assertThat( + avroSchema.getField("geoPositions").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .stringBuilder() + .prop("sqlType", "GEOGRAPHY") + .endString() + .endUnion())); + assertThat( + avroSchema.getField("scion").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .record("scion") + .doc("Translated Avro Schema for scion") + .namespace("org.apache.beam.sdk.io.gcp.bigquery") + .fields() + .name("species") + .type() + .unionOf() + .nullType() + .and() + .stringType() + .endUnion() + .noDefault() + .endRecord() + .endUnion())); + + assertThat( + avroSchema.getField("associates").schema(), + equalTo( + SchemaBuilder.array() + .items() + .record("associates") + .doc("Translated Avro Schema for associates") + .namespace("org.apache.beam.sdk.io.gcp.bigquery") + .fields() + .name("species") + .type() + .unionOf() + .nullType() + .and() + .stringType() + .endUnion() + .noDefault() + .endRecord())); + } + @Test public void testConvertBigQuerySchemaToAvroSchema() { TableSchema tableSchema = new TableSchema(); tableSchema.setFields(fields); - Schema avroSchema = - BigQueryAvroUtils.toGenericAvroSchema("testSchema", tableSchema.getFields()); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema); - assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Type.LONG))); + assertThat(avroSchema.getField("number").schema(), equalTo(SchemaBuilder.builder().longType())); assertThat( avroSchema.getField("species").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))); + equalTo(SchemaBuilder.builder().unionOf().nullType().and().stringType().endUnion())); assertThat( avroSchema.getField("quality").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.DOUBLE)))); + equalTo(SchemaBuilder.builder().unionOf().nullType().and().doubleType().endUnion())); assertThat( avroSchema.getField("quantity").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.LONG)))); + equalTo(SchemaBuilder.builder().unionOf().nullType().and().longType().endUnion())); assertThat( avroSchema.getField("birthday").schema(), equalTo( - Schema.createUnion( - Schema.create(Type.NULL), - LogicalTypes.timestampMicros().addToSchema(Schema.create(Type.LONG))))); + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type( + LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType())) + .endUnion())); assertThat( avroSchema.getField("birthdayMoney").schema(), equalTo( - Schema.createUnion( - Schema.create(Type.NULL), - LogicalTypes.decimal(38, 9).addToSchema(Schema.create(Type.BYTES))))); + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type(LogicalTypes.decimal(38, 9).addToSchema(SchemaBuilder.builder().bytesType())) + .endUnion())); assertThat( avroSchema.getField("lotteryWinnings").schema(), equalTo( - Schema.createUnion( - Schema.create(Type.NULL), - LogicalTypes.decimal(77, 38).addToSchema(Schema.create(Type.BYTES))))); + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type(LogicalTypes.decimal(77, 38).addToSchema(SchemaBuilder.builder().bytesType())) + .endUnion())); assertThat( avroSchema.getField("flighted").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BOOLEAN)))); + equalTo(SchemaBuilder.builder().unionOf().nullType().and().booleanType().endUnion())); assertThat( avroSchema.getField("sound").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES)))); + equalTo(SchemaBuilder.builder().unionOf().nullType().and().bytesType().endUnion())); assertThat( avroSchema.getField("anniversaryDate").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))); + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())) + .endUnion())); assertThat( avroSchema.getField("anniversaryDatetime").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))); + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type(DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType())) + .endUnion())); assertThat( avroSchema.getField("anniversaryTime").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))); - Schema geoSchema = Schema.create(Type.STRING); - geoSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "geography_wkt"); + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type(LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType())) + .endUnion())); assertThat( avroSchema.getField("geoPositions").schema(), - equalTo(Schema.createUnion(Schema.create(Type.NULL), geoSchema))); + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .stringBuilder() + .prop("sqlType", "GEOGRAPHY") + .endString() + .endUnion())); assertThat( avroSchema.getField("scion").schema(), equalTo( - Schema.createUnion( - Schema.create(Type.NULL), - Schema.createRecord( - "scion", - "Translated Avro Schema for scion", - "org.apache.beam.sdk.io.gcp.bigquery", - false, - ImmutableList.of( - new Field( - "species", - Schema.createUnion( - Schema.create(Type.NULL), Schema.create(Type.STRING)), - null, - (Object) null)))))); + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .record("scion") + .doc("Translated Avro Schema for scion") + .namespace("org.apache.beam.sdk.io.gcp.bigquery") + .fields() + .name("species") + .type() + .unionOf() + .nullType() + .and() + .stringType() + .endUnion() + .noDefault() + .endRecord() + .endUnion())); + assertThat( avroSchema.getField("associates").schema(), equalTo( - Schema.createArray( - Schema.createRecord( - "associates", - "Translated Avro Schema for associates", - "org.apache.beam.sdk.io.gcp.bigquery", - false, - ImmutableList.of( - new Field( - "species", - Schema.createUnion( - Schema.create(Type.NULL), Schema.create(Type.STRING)), - null, - (Object) null)))))); + SchemaBuilder.array() + .items() + .record("associates") + .doc("Translated Avro Schema for associates") + .namespace("org.apache.beam.sdk.io.gcp.bigquery") + .fields() + .name("species") + .type() + .unionOf() + .nullType() + .and() + .stringType() + .endUnion() + .noDefault() + .endRecord())); } @Test @@ -427,22 +571,34 @@ public void testSchemaCollisionsInAvroConversion() { .setType("FLOAT"))))))))), new TableFieldSchema().setName("platform").setType("STRING"))); // To string should be sufficient here as this exercises Avro's conversion feature - String output = BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString(); + String output = BigQueryAvroUtils.toGenericAvroSchema(schema, false).toString(); assertThat(output.length(), greaterThan(0)); } /** Pojo class used as the record type in tests. */ - @DefaultCoder(AvroCoder.class) @SuppressWarnings("unused") // Used by Avro reflection. static class Bird { long number; @Nullable String species; @Nullable Double quality; @Nullable Long quantity; - @Nullable Long birthday; // Exercises TIMESTAMP. - @Nullable ByteBuffer birthdayMoney; // Exercises NUMERIC. - @Nullable ByteBuffer lotteryWinnings; // Exercises BIGNUMERIC. - @Nullable String geoPositions; // Exercises GEOGRAPHY. + + @AvroSchema(value = "[\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]") + Instant birthday; + + @AvroSchema( + value = + "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 38, \"scale\": 9}]") + BigDecimal birthdayMoney; + + @AvroSchema( + value = + "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 77, \"scale\": 38}]") + BigDecimal lotteryWinnings; + + @AvroSchema(value = "[\"null\", {\"type\": \"string\", \"sqlType\": \"GEOGRAPHY\"}]") + String geoPositions; + @Nullable Boolean flighted; @Nullable ByteBuffer sound; @Nullable Utf8 anniversaryDate; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java index 497653f9ab8d..570435a4c95f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java @@ -170,9 +170,7 @@ public void testDefaultQueryBasedSource() throws Exception { @Test public void testQueryBasedSourceWithCustomQuery() throws Exception { TypedRead typedRead = - BigQueryIO.read(new TableRowParser()) - .fromQuery("SELECT * FROM `google.com:project.dataset.table`") - .withCoder(TableRowJsonCoder.of()); + BigQueryIO.readTableRows().fromQuery("SELECT * FROM `google.com:project.dataset.table`"); checkTypedReadQueryObject(typedRead, "SELECT * FROM `google.com:project.dataset.table`"); } @@ -227,10 +225,7 @@ public void testQueryBasedSourceWithTemplateCompatibility() throws Exception { } private TypedRead getDefaultTypedRead() { - return BigQueryIO.read(new TableRowParser()) - .fromQuery(DEFAULT_QUERY) - .withCoder(TableRowJsonCoder.of()) - .withMethod(Method.DIRECT_READ); + return BigQueryIO.readTableRows().fromQuery(DEFAULT_QUERY).withMethod(Method.DIRECT_READ); } private void checkTypedReadQueryObject(TypedRead typedRead, String query) { @@ -310,7 +305,7 @@ public void testQuerySourceEstimatedSize() throws Exception { /* queryTempProject = */ null, /* kmsKey = */ null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), fakeBigQueryServices); @@ -423,7 +418,7 @@ private void doQuerySourceInitialSplit( /* queryTempProject = */ null, /* kmsKey = */ null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -672,7 +667,7 @@ public void testQuerySourceInitialSplitWithBigQueryProject_EmptyResult() throws /* queryTempProject = */ null, /* kmsKey = */ null, DataFormat.AVRO, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -744,7 +739,7 @@ public void testQuerySourceInitialSplit_EmptyResult() throws Exception { /* queryTempProject = */ null, /* kmsKey = */ null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -769,7 +764,7 @@ public void testQuerySourceCreateReader() throws Exception { /* queryTempProject = */ null, /* kmsKey = */ null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), fakeBigQueryServices); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index d7930b595538..4740b61d808d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -193,8 +193,7 @@ public void teardown() { @Test public void testBuildTableBasedSource() { BigQueryIO.TypedRead typedRead = - BigQueryIO.read(new TableRowParser()) - .withCoder(TableRowJsonCoder.of()) + BigQueryIO.readTableRows() .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table"); checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table"); @@ -204,8 +203,7 @@ public void testBuildTableBasedSource() { @Test public void testBuildTableBasedSourceWithoutValidation() { BigQueryIO.TypedRead typedRead = - BigQueryIO.read(new TableRowParser()) - .withCoder(TableRowJsonCoder.of()) + BigQueryIO.readTableRows() .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .withoutValidation(); @@ -216,10 +214,7 @@ public void testBuildTableBasedSourceWithoutValidation() { @Test public void testBuildTableBasedSourceWithDefaultProject() { BigQueryIO.TypedRead typedRead = - BigQueryIO.read(new TableRowParser()) - .withCoder(TableRowJsonCoder.of()) - .withMethod(Method.DIRECT_READ) - .from("myDataset.myTable"); + BigQueryIO.readTableRows().withMethod(Method.DIRECT_READ).from("myDataset.myTable"); checkTypedReadTableObject(typedRead, null, "myDataset", "myTable"); } @@ -231,10 +226,7 @@ public void testBuildTableBasedSourceWithTableReference() { .setDatasetId("dataset") .setTableId("table"); BigQueryIO.TypedRead typedRead = - BigQueryIO.read(new TableRowParser()) - .withCoder(TableRowJsonCoder.of()) - .withMethod(Method.DIRECT_READ) - .from(tableReference); + BigQueryIO.readTableRows().withMethod(Method.DIRECT_READ).from(tableReference); checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table"); } @@ -255,8 +247,7 @@ public void testBuildSourceWithTableAndFlatten() { + " which only applies to queries"); p.apply( "ReadMyTable", - BigQueryIO.read(new TableRowParser()) - .withCoder(TableRowJsonCoder.of()) + BigQueryIO.readTableRows() .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .withoutResultFlattening()); @@ -271,8 +262,7 @@ public void testBuildSourceWithTableAndSqlDialect() { + " which only applies to queries"); p.apply( "ReadMyTable", - BigQueryIO.read(new TableRowParser()) - .withCoder(TableRowJsonCoder.of()) + BigQueryIO.readTableRows() .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .usingStandardSql()); @@ -283,8 +273,7 @@ public void testBuildSourceWithTableAndSqlDialect() { public void testDisplayData() { String tableSpec = "foo.com:project:dataset.table"; BigQueryIO.TypedRead typedRead = - BigQueryIO.read(new TableRowParser()) - .withCoder(TableRowJsonCoder.of()) + BigQueryIO.readTableRows() .withMethod(Method.DIRECT_READ) .withSelectedFields(ImmutableList.of("foo", "bar")) .withProjectionPushdownApplied() @@ -299,8 +288,7 @@ public void testDisplayData() { public void testName() { assertEquals( "BigQueryIO.TypedRead", - BigQueryIO.read(new TableRowParser()) - .withCoder(TableRowJsonCoder.of()) + BigQueryIO.readTableRows() .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .getName()); @@ -347,7 +335,7 @@ private void doTableSourceEstimatedSizeTest(boolean useStreamingBuffer) throws E ValueProvider.StaticValueProvider.of(tableRef), null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -367,7 +355,7 @@ public void testTableSourceEstimatedSize_WithBigQueryProject() throws Exception ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -386,7 +374,7 @@ public void testTableSourceEstimatedSize_WithDefaultProject() throws Exception { ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -478,7 +466,7 @@ private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) thr ValueProvider.StaticValueProvider.of(tableRef), null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -526,7 +514,7 @@ public void testTableSourceInitialSplit_WithSelectedFieldsAndRowRestriction() th ValueProvider.StaticValueProvider.of(tableRef), StaticValueProvider.of(Lists.newArrayList("name")), StaticValueProvider.of("number > 5"), - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -571,7 +559,7 @@ public void testTableSourceInitialSplit_WithDefaultProject() throws Exception { ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -612,7 +600,7 @@ public void testTableSourceInitialSplit_EmptyTable() throws Exception { ValueProvider.StaticValueProvider.of(tableRef), null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -630,7 +618,7 @@ public void testTableSourceCreateReader() throws Exception { BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table")), null, null, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -749,7 +737,7 @@ public void testStreamSourceEstimatedSizeBytes() throws Exception { ReadSession.getDefaultInstance(), ReadStream.getDefaultInstance(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices()); @@ -764,7 +752,7 @@ public void testStreamSourceSplit() throws Exception { ReadSession.getDefaultInstance(), ReadStream.getDefaultInstance(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices()); @@ -793,7 +781,7 @@ public void testSplitReadStreamAtFraction() throws IOException { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -836,7 +824,7 @@ public void testReadFromStreamSource() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -892,7 +880,7 @@ public void testFractionConsumed() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -977,7 +965,7 @@ public void testFractionConsumedWithSplit() throws Exception { readSession, ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1062,7 +1050,7 @@ public void testStreamSourceSplitAtFractionSucceeds() throws Exception { .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1199,7 +1187,7 @@ public void testStreamSourceSplitAtFractionRepeated() throws Exception { .build(), readStreams.get(0), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1263,7 +1251,7 @@ public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossible() throws .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1352,7 +1340,7 @@ public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPoint() thr .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1444,7 +1432,7 @@ public void testStreamSourceSplitAtFractionFailsWhenReaderRunning() throws Excep readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1776,7 +1764,7 @@ public void testReadFromStreamSourceArrow() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1825,7 +1813,7 @@ public void testFractionConsumedArrow() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1907,7 +1895,7 @@ public void testFractionConsumedWithSplitArrow() throws Exception { readSession, ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1988,7 +1976,7 @@ public void testStreamSourceSplitAtFractionSucceedsArrow() throws Exception { .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -2107,7 +2095,7 @@ public void testStreamSourceSplitAtFractionRepeatedArrow() throws Exception { .build(), readStreams.get(0), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -2167,7 +2155,7 @@ public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossibleArrow() th .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -2253,7 +2241,7 @@ public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPointArrow( .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - new TableRowParser(), + TableRowParser.INSTANCE, TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); From e1f6d5a1ea7a8555e1c499a8118c33c7912e07d7 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 20 Sep 2024 11:52:45 +0200 Subject: [PATCH 2/4] Include minimal changeset --- .../io/gcp/bigquery/BigQueryAvroUtils.java | 41 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 30 +- .../gcp/bigquery/BigQueryQuerySourceDef.java | 12 +- .../io/gcp/bigquery/BigQuerySourceDef.java | 9 - .../bigquery/BigQueryStorageSourceBase.java | 28 +- .../gcp/bigquery/BigQueryTableSourceDef.java | 12 +- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 60 +-- .../gcp/bigquery/BigQueryAvroUtilsTest.java | 393 +++++++----------- .../bigquery/BigQueryIOStorageQueryTest.java | 19 +- .../bigquery/BigQueryIOStorageReadTest.java | 80 ++-- 10 files changed, 328 insertions(+), 356 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index b24fe13262c2..06a8e4e100ef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -35,6 +35,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -58,6 +60,7 @@ */ class BigQueryAvroUtils { + // org.apache.avro.LogicalType static class DateTimeLogicalType extends LogicalType { public DateTimeLogicalType() { super("datetime"); @@ -112,6 +115,7 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy } else if (bqType.equals("NUMERIC")) { logicalType = LogicalTypes.decimal(38, 9); } else { + // BIGNUMERIC logicalType = LogicalTypes.decimal(77, 38); } return logicalType.addToSchema(SchemaBuilder.builder().bytesType()); @@ -230,6 +234,41 @@ private static String formatTime(long timeMicros) { return LocalTime.ofNanoOfDay(timeMicros * 1000).format(formatter); } + static TableSchema trimBigQueryTableSchema(TableSchema inputSchema, Schema avroSchema) { + List subSchemas = + inputSchema.getFields().stream() + .flatMap(fieldSchema -> mapTableFieldSchema(fieldSchema, avroSchema)) + .collect(Collectors.toList()); + + return new TableSchema().setFields(subSchemas); + } + + private static Stream mapTableFieldSchema( + TableFieldSchema fieldSchema, Schema avroSchema) { + Field avroFieldSchema = avroSchema.getField(fieldSchema.getName()); + if (avroFieldSchema == null) { + return Stream.empty(); + } else if (avroFieldSchema.schema().getType() != Type.RECORD) { + return Stream.of(fieldSchema); + } + + List subSchemas = + fieldSchema.getFields().stream() + .flatMap(subSchema -> mapTableFieldSchema(subSchema, avroFieldSchema.schema())) + .collect(Collectors.toList()); + + TableFieldSchema output = + new TableFieldSchema() + .setCategories(fieldSchema.getCategories()) + .setDescription(fieldSchema.getDescription()) + .setFields(subSchemas) + .setMode(fieldSchema.getMode()) + .setName(fieldSchema.getName()) + .setType(fieldSchema.getType()); + + return Stream.of(output); + } + /** * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}. * @@ -303,8 +342,6 @@ private static Object convertRequiredField(String name, Schema schema, Object v) // INTEGER type maps to an Avro LONG type. checkNotNull(v, "REQUIRED field %s should not be null", name); - // For historical reasons, don't validate avroLogicalType except for with NUMERIC. - // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); switch (type) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7bd2eb7da62a..19ff1576bb24 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -647,12 +647,12 @@ public static Read read() { * domain-specific type, due to the overhead of converting the rows to {@link TableRow}. */ public static TypedRead readTableRows() { - return read(TableRowParser.INSTANCE).withCoder(TableRowJsonCoder.of()); + return read(new TableRowParser()).withCoder(TableRowJsonCoder.of()); } /** Like {@link #readTableRows()} but with {@link Schema} support. */ public static TypedRead readTableRowsWithSchema() { - return read(TableRowParser.INSTANCE) + return read(new TableRowParser()) .withCoder(TableRowJsonCoder.of()) .withBeamRowConverters( TypeDescriptor.of(TableRow.class), @@ -1272,12 +1272,8 @@ public PCollection expand(PBegin input) { Schema beamSchema = null; if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) { - TableSchema tableSchema = sourceDef.getTableSchema(bqOptions); - ValueProvider> selectedFields = getSelectedFields(); - if (selectedFields != null) { - tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFields.get()); - } - beamSchema = BigQueryUtils.fromTableSchema(tableSchema); + beamSchema = sourceDef.getBeamSchema(bqOptions); + beamSchema = getFinalSchema(beamSchema, getSelectedFields()); } final Coder coder = inferCoder(p.getCoderRegistry()); @@ -1442,6 +1438,24 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception { return rows; } + private static Schema getFinalSchema( + Schema beamSchema, ValueProvider> selectedFields) { + List flds = + beamSchema.getFields().stream() + .filter( + field -> { + if (selectedFields != null + && selectedFields.isAccessible() + && selectedFields.get() != null) { + return selectedFields.get().contains(field.getName()); + } else { + return true; + } + }) + .collect(Collectors.toList()); + return Schema.builder().addFields(flds).build(); + } + private PCollection expandForDirectRead( PBegin input, Coder outputCoder, Schema beamSchema, BigQueryOptions bqOptions) { ValueProvider tableProvider = getTableProvider(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java index 9d0d82f72eb8..b4035a4e9ac3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java @@ -178,7 +178,7 @@ public BigQuerySourceBase toSource( /** {@inheritDoc} */ @Override - public TableSchema getTableSchema(BigQueryOptions bqOptions) { + public Schema getBeamSchema(BigQueryOptions bqOptions) { try { JobStatistics stats = BigQueryQueryHelper.dryRunQueryIfNeeded( @@ -189,20 +189,14 @@ public TableSchema getTableSchema(BigQueryOptions bqOptions) { flattenResults, useLegacySql, location); - return stats.getQuery().getSchema(); + TableSchema tableSchema = stats.getQuery().getSchema(); + return BigQueryUtils.fromTableSchema(tableSchema); } catch (IOException | InterruptedException | NullPointerException e) { throw new BigQuerySchemaRetrievalException( "Exception while trying to retrieve schema of query", e); } } - /** {@inheritDoc} */ - @Override - public Schema getBeamSchema(BigQueryOptions bqOptions) { - TableSchema tableSchema = getTableSchema(bqOptions); - return BigQueryUtils.fromTableSchema(tableSchema); - } - ValueProvider getQuery() { return query; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java index 579a602ab552..c9b1d5f73224 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java @@ -45,15 +45,6 @@ BigQuerySourceBase toSource( SerializableFunction> readerFactory, boolean useAvroLogicalTypes); - /** - * Extract the {@link TableSchema} corresponding to this source. - * - * @param bqOptions BigQueryOptions - * @return table schema of the source - * @throws BigQuerySchemaRetrievalException if schema retrieval fails - */ - TableSchema getTableSchema(BigQueryOptions bqOptions); - /** * Extract the Beam {@link Schema} corresponding to this source. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java index cef2e3aee3f4..51a5a8f391a6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java @@ -28,7 +28,10 @@ import com.google.cloud.bigquery.storage.v1.ReadStream; import java.io.IOException; import java.util.List; +import org.apache.avro.Schema; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.arrow.ArrowConversion; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; import org.apache.beam.sdk.metrics.Lineage; @@ -179,17 +182,30 @@ public List> split( LOG.info("Read session returned {} streams", readSession.getStreamsList().size()); } - // TODO: this is inconsistent with method above, where it can be null - Preconditions.checkStateNotNull(targetTable); - TableSchema tableSchema = targetTable.getSchema(); - if (selectedFieldsProvider != null) { - tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFieldsProvider.get()); + Schema sessionSchema; + if (readSession.getDataFormat() == DataFormat.ARROW) { + org.apache.arrow.vector.types.pojo.Schema schema = + ArrowConversion.arrowSchemaFromInput( + readSession.getArrowSchema().getSerializedSchema().newInput()); + org.apache.beam.sdk.schemas.Schema beamSchema = + ArrowConversion.ArrowSchemaTranslator.toBeamSchema(schema); + sessionSchema = AvroUtils.toAvroSchema(beamSchema); + } else if (readSession.getDataFormat() == DataFormat.AVRO) { + sessionSchema = new Schema.Parser().parse(readSession.getAvroSchema().getSchema()); + } else { + throw new IllegalArgumentException( + "data is not in a supported dataFormat: " + readSession.getDataFormat()); } + + Preconditions.checkStateNotNull( + targetTable); // TODO: this is inconsistent with method above, where it can be null + TableSchema trimmedSchema = + BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), sessionSchema); List> sources = Lists.newArrayList(); for (ReadStream readStream : readSession.getStreamsList()) { sources.add( BigQueryStorageStreamSource.create( - readSession, readStream, tableSchema, parseFn, outputCoder, bqServices)); + readSession, readStream, trimmedSchema, parseFn, outputCoder, bqServices)); } return ImmutableList.copyOf(sources); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java index cc1532a522bc..b399900f9a24 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java @@ -102,22 +102,16 @@ public BigQuerySourceBase toSource( /** {@inheritDoc} */ @Override - public TableSchema getTableSchema(BigQueryOptions bqOptions) { + public Schema getBeamSchema(BigQueryOptions bqOptions) { try { try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) { TableReference tableRef = getTableReference(bqOptions); Table table = datasetService.getTable(tableRef); - return Preconditions.checkStateNotNull(table).getSchema(); + TableSchema tableSchema = Preconditions.checkStateNotNull(table).getSchema(); + return BigQueryUtils.fromTableSchema(tableSchema); } } catch (Exception e) { throw new BigQuerySchemaRetrievalException("Exception while trying to retrieve schema", e); } } - - /** {@inheritDoc} */ - @Override - public Schema getBeamSchema(BigQueryOptions bqOptions) { - TableSchema tableSchema = getTableSchema(bqOptions); - return BigQueryUtils.fromTableSchema(tableSchema); - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index d9d2b135f22b..606ce31b8bea 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -310,44 +310,45 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { * *

Supports both standard and legacy SQL types. * - * @param typeName Name of the type + * @param typeName Name of the type returned by {@link TableFieldSchema#getType()} * @param nestedFields Nested fields for the given type (eg. RECORD type) * @return Corresponding Beam {@link FieldType} */ private static FieldType fromTableFieldSchemaType( String typeName, List nestedFields, SchemaConversionOptions options) { + // see + // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- switch (typeName) { case "STRING": return FieldType.STRING; case "BYTES": return FieldType.BYTES; - case "INT64": - case "INT": - case "SMALLINT": case "INTEGER": - case "BIGINT": - case "TINYINT": - case "BYTEINT": + case "INT64": return FieldType.INT64; + case "FLOAT": case "FLOAT64": - case "FLOAT": // even if not a valid BQ type, it is used in the schema return FieldType.DOUBLE; - case "BOOL": case "BOOLEAN": + case "BOOL": return FieldType.BOOLEAN; - case "NUMERIC": - case "BIGNUMERIC": - return FieldType.DECIMAL; case "TIMESTAMP": return FieldType.DATETIME; - case "TIME": - return FieldType.logicalType(SqlTypes.TIME); case "DATE": return FieldType.logicalType(SqlTypes.DATE); + case "TIME": + return FieldType.logicalType(SqlTypes.TIME); case "DATETIME": return FieldType.logicalType(SqlTypes.DATETIME); - case "STRUCT": + case "NUMERIC": + case "BIGNUMERIC": + return FieldType.DECIMAL; + case "GEOGRAPHY": + case "JSON": + // TODO Add metadata for custom sql types ? + return FieldType.STRING; case "RECORD": + case "STRUCT": if (options.getInferMaps() && nestedFields.size() == 2) { TableFieldSchema key = nestedFields.get(0); TableFieldSchema value = nestedFields.get(1); @@ -358,13 +359,9 @@ private static FieldType fromTableFieldSchemaType( fromTableFieldSchemaType(value.getType(), value.getFields(), options)); } } - Schema rowSchema = fromTableFieldSchema(nestedFields, options); return FieldType.row(rowSchema); - case "GEOGRAPHY": - case "JSON": - // TODO Add metadata for custom sql types - return FieldType.STRING; + case "RANGE": // TODO add support for range type default: throw new UnsupportedOperationException( "Converting BigQuery type " + typeName + " to Beam type is unsupported"); @@ -463,8 +460,8 @@ public static org.apache.avro.Schema toGenericAvroSchema(TableSchema tableSchema /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */ public static org.apache.avro.Schema toGenericAvroSchema( - TableSchema tableSchema, Boolean stringLogicalTypes) { - return toGenericAvroSchema("root", tableSchema.getFields(), stringLogicalTypes); + TableSchema tableSchema, Boolean useAvroLogicalTypes) { + return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes); } /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ @@ -475,8 +472,8 @@ public static org.apache.avro.Schema toGenericAvroSchema( /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ public static org.apache.avro.Schema toGenericAvroSchema( - String schemaName, List fieldSchemas, Boolean stringLogicalTypes) { - return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, stringLogicalTypes); + String schemaName, List fieldSchemas, Boolean useAvroLogicalTypes) { + return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, useAvroLogicalTypes); } private static final BigQueryIO.TypedRead.ToBeamRowFunction @@ -1077,21 +1074,6 @@ private static Object convertAvroNumeric(Object value) { return tableSpec; } - static TableSchema trimSchema(TableSchema schema, @Nullable List selectedFields) { - if (selectedFields == null || selectedFields.isEmpty()) { - return schema; - } - - List fields = schema.getFields(); - List trimmedFields = new ArrayList<>(); - for (TableFieldSchema field : fields) { - if (selectedFields.contains(field.getName())) { - trimmedFields.add(field); - } - } - return new TableSchema().setFields(trimmedFields); - } - private static @Nullable ServiceCallMetric callMetricForMethod( @Nullable TableReference tableReference, String method) { if (tableReference != null) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index 0447b2d07b1a..662f2658eb6b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryAvroUtils.DATETIME_LOGICAL_TYPE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -29,22 +28,25 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.reflect.AvroSchema; import org.apache.avro.reflect.Nullable; import org.apache.avro.reflect.ReflectData; import org.apache.avro.util.Utf8; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; -import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -112,11 +114,8 @@ public void testConvertGenericRecordToTableRow() throws Exception { { // Test nullable fields. - GenericRecord record = - new GenericRecordBuilder(avroSchema) - .set("number", 5L) - .set("associates", new ArrayList()) - .build(); + GenericRecord record = new GenericData.Record(avroSchema); + record.put("number", 5L); TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow().set("number", "5").set("associates", new ArrayList()); assertEquals(row, convertedRow); @@ -126,54 +125,50 @@ public void testConvertGenericRecordToTableRow() throws Exception { { // Test type conversion for: // INTEGER, FLOAT, NUMERIC, TIMESTAMP, BOOLEAN, BYTES, DATE, DATETIME, TIME. + GenericRecord record = new GenericData.Record(avroSchema); byte[] soundBytes = "chirp,chirp".getBytes(StandardCharsets.UTF_8); - GenericRecord record = - new GenericRecordBuilder(avroSchema) - .set("number", 5L) - .set("quality", 5.0) - .set("birthday", 5L) - .set("birthdayMoney", numericBytes) - .set("lotteryWinnings", bigNumericBytes) - .set("flighted", Boolean.TRUE) - .set("sound", ByteBuffer.wrap(soundBytes)) - .set("anniversaryDate", new Utf8("2000-01-01")) - .set("anniversaryDatetime", "2000-01-01 00:00:00.000005") - .set("anniversaryTime", new Utf8("00:00:00.000005")) - .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)") - .set("associates", new ArrayList()) - .build(); - + ByteBuffer soundByteBuffer = ByteBuffer.wrap(soundBytes); + soundByteBuffer.rewind(); + record.put("number", 5L); + record.put("quality", 5.0); + record.put("birthday", 5L); + record.put("birthdayMoney", numericBytes); + record.put("lotteryWinnings", bigNumericBytes); + record.put("flighted", Boolean.TRUE); + record.put("sound", soundByteBuffer); + record.put("anniversaryDate", new Utf8("2000-01-01")); + record.put("anniversaryDatetime", new String("2000-01-01 00:00:00.000005")); + record.put("anniversaryTime", new Utf8("00:00:00.000005")); + record.put("geoPositions", new String("LINESTRING(1 2, 3 4, 5 6, 7 8)")); TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow() - .set("anniversaryDate", "2000-01-01") - .set("anniversaryDatetime", "2000-01-01 00:00:00.000005") - .set("anniversaryTime", "00:00:00.000005") - .set("associates", new ArrayList()) + .set("number", "5") .set("birthday", "1970-01-01 00:00:00.000005 UTC") .set("birthdayMoney", numeric.toString()) - .set("flighted", Boolean.TRUE) - .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)") .set("lotteryWinnings", bigNumeric.toString()) - .set("number", "5") .set("quality", 5.0) - .set("sound", BaseEncoding.base64().encode(soundBytes)); - assertEquals(row, convertedRow); + .set("associates", new ArrayList()) + .set("flighted", Boolean.TRUE) + .set("sound", BaseEncoding.base64().encode(soundBytes)) + .set("anniversaryDate", "2000-01-01") + .set("anniversaryDatetime", "2000-01-01 00:00:00.000005") + .set("anniversaryTime", "00:00:00.000005") + .set("geoPositions", "LINESTRING(1 2, 3 4, 5 6, 7 8)"); TableRow clonedRow = convertedRow.clone(); - assertEquals(clonedRow, convertedRow); + assertEquals(convertedRow, clonedRow); + assertEquals(row, convertedRow); } { // Test repeated fields. - Schema subBirdSchema = ReflectData.get().getSchema(Bird.SubBird.class); - GenericRecord nestedRecord = - new GenericRecordBuilder(subBirdSchema).set("species", "other").build(); - GenericRecord record = - new GenericRecordBuilder(avroSchema) - .set("number", 5L) - .set("associates", Lists.newArrayList(nestedRecord)) - .set("birthdayMoney", numericBytes) - .set("lotteryWinnings", bigNumericBytes) - .build(); + Schema subBirdSchema = AvroCoder.of(Bird.SubBird.class).getSchema(); + GenericRecord nestedRecord = new GenericData.Record(subBirdSchema); + nestedRecord.put("species", "other"); + GenericRecord record = new GenericData.Record(avroSchema); + record.put("number", 5L); + record.put("associates", Lists.newArrayList(nestedRecord)); + record.put("birthdayMoney", numericBytes); + record.put("lotteryWinnings", bigNumericBytes); TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record); TableRow row = new TableRow() @@ -188,269 +183,201 @@ public void testConvertGenericRecordToTableRow() throws Exception { } @Test - public void testConvertBigQuerySchemaToAvroSchemaDisabledLogicalTypes() { + public void testConvertBigQuerySchemaToAvroSchema() { TableSchema tableSchema = new TableSchema(); tableSchema.setFields(fields); - Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema); - assertThat(avroSchema.getField("number").schema(), equalTo(SchemaBuilder.builder().longType())); + assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Type.LONG))); assertThat( avroSchema.getField("species").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().stringType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))); assertThat( avroSchema.getField("quality").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().doubleType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.DOUBLE)))); assertThat( avroSchema.getField("quantity").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().longType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.LONG)))); assertThat( avroSchema.getField("birthday").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type( - LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType())) - .endUnion())); + Schema.createUnion( + Schema.create(Type.NULL), + LogicalTypes.timestampMicros().addToSchema(Schema.create(Type.LONG))))); assertThat( avroSchema.getField("birthdayMoney").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.decimal(38, 9).addToSchema(SchemaBuilder.builder().bytesType())) - .endUnion())); + Schema.createUnion( + Schema.create(Type.NULL), + LogicalTypes.decimal(38, 9).addToSchema(Schema.create(Type.BYTES))))); assertThat( avroSchema.getField("lotteryWinnings").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.decimal(77, 38).addToSchema(SchemaBuilder.builder().bytesType())) - .endUnion())); + Schema.createUnion( + Schema.create(Type.NULL), + LogicalTypes.decimal(77, 38).addToSchema(Schema.create(Type.BYTES))))); assertThat( avroSchema.getField("flighted").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().booleanType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BOOLEAN)))); assertThat( avroSchema.getField("sound").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().bytesType().endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES)))); + Schema dateSchema = Schema.create(Type.INT); + LogicalTypes.date().addToSchema(dateSchema); assertThat( avroSchema.getField("anniversaryDate").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "DATE") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), dateSchema))); + Schema dateTimeSchema = Schema.create(Type.STRING); + BigQueryAvroUtils.DATETIME_LOGICAL_TYPE.addToSchema(dateTimeSchema); assertThat( avroSchema.getField("anniversaryDatetime").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "DATETIME") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), dateTimeSchema))); + Schema timeSchema = Schema.create(Type.LONG); + LogicalTypes.timeMicros().addToSchema(timeSchema); assertThat( avroSchema.getField("anniversaryTime").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "TIME") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), timeSchema))); + Schema geoSchema = Schema.create(Type.STRING); + geoSchema.addProp("sqlType", "GEOGRAPHY"); assertThat( avroSchema.getField("geoPositions").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "GEOGRAPHY") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Type.NULL), geoSchema))); assertThat( avroSchema.getField("scion").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .record("scion") - .doc("Translated Avro Schema for scion") - .namespace("org.apache.beam.sdk.io.gcp.bigquery") - .fields() - .name("species") - .type() - .unionOf() - .nullType() - .and() - .stringType() - .endUnion() - .noDefault() - .endRecord() - .endUnion())); - + Schema.createUnion( + Schema.create(Type.NULL), + Schema.createRecord( + "scion", + "Translated Avro Schema for scion", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Field( + "species", + Schema.createUnion( + Schema.create(Type.NULL), Schema.create(Type.STRING)), + null, + (Object) null)))))); assertThat( avroSchema.getField("associates").schema(), equalTo( - SchemaBuilder.array() - .items() - .record("associates") - .doc("Translated Avro Schema for associates") - .namespace("org.apache.beam.sdk.io.gcp.bigquery") - .fields() - .name("species") - .type() - .unionOf() - .nullType() - .and() - .stringType() - .endUnion() - .noDefault() - .endRecord())); + Schema.createArray( + Schema.createRecord( + "associates", + "Translated Avro Schema for associates", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Field( + "species", + Schema.createUnion( + Schema.create(Type.NULL), Schema.create(Type.STRING)), + null, + (Object) null)))))); } @Test - public void testConvertBigQuerySchemaToAvroSchema() { + public void testConvertBigQuerySchemaToAvroSchemaWithoutLogicalTypes() { TableSchema tableSchema = new TableSchema(); tableSchema.setFields(fields); - Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false); - assertThat(avroSchema.getField("number").schema(), equalTo(SchemaBuilder.builder().longType())); + assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Schema.Type.LONG))); assertThat( avroSchema.getField("species").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().stringType().endUnion())); + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)))); assertThat( avroSchema.getField("quality").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().doubleType().endUnion())); + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.DOUBLE)))); assertThat( avroSchema.getField("quantity").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().longType().endUnion())); + equalTo( + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG)))); assertThat( avroSchema.getField("birthday").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type( - LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType())) - .endUnion())); + Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))))); assertThat( avroSchema.getField("birthdayMoney").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.decimal(38, 9).addToSchema(SchemaBuilder.builder().bytesType())) - .endUnion())); + Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.decimal(38, 9).addToSchema(Schema.create(Schema.Type.BYTES))))); assertThat( avroSchema.getField("lotteryWinnings").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.decimal(77, 38).addToSchema(SchemaBuilder.builder().bytesType())) - .endUnion())); + Schema.createUnion( + Schema.create(Schema.Type.NULL), + LogicalTypes.decimal(77, 38).addToSchema(Schema.create(Schema.Type.BYTES))))); assertThat( avroSchema.getField("flighted").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().booleanType().endUnion())); + equalTo( + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BOOLEAN)))); assertThat( avroSchema.getField("sound").schema(), - equalTo(SchemaBuilder.builder().unionOf().nullType().and().bytesType().endUnion())); + equalTo( + Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES)))); + Schema dateSchema = Schema.create(Schema.Type.STRING); + dateSchema.addProp("sqlType", "DATE"); assertThat( avroSchema.getField("anniversaryDate").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())) - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateSchema))); + Schema dateTimeSchema = Schema.create(Schema.Type.STRING); + dateTimeSchema.addProp("sqlType", "DATETIME"); assertThat( avroSchema.getField("anniversaryDatetime").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType())) - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateTimeSchema))); + Schema timeSchema = Schema.create(Schema.Type.STRING); + timeSchema.addProp("sqlType", "TIME"); assertThat( avroSchema.getField("anniversaryTime").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .type(LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType())) - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), timeSchema))); + Schema geoSchema = Schema.create(Type.STRING); + geoSchema.addProp("sqlType", "GEOGRAPHY"); assertThat( avroSchema.getField("geoPositions").schema(), - equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .stringBuilder() - .prop("sqlType", "GEOGRAPHY") - .endString() - .endUnion())); + equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), geoSchema))); assertThat( avroSchema.getField("scion").schema(), equalTo( - SchemaBuilder.builder() - .unionOf() - .nullType() - .and() - .record("scion") - .doc("Translated Avro Schema for scion") - .namespace("org.apache.beam.sdk.io.gcp.bigquery") - .fields() - .name("species") - .type() - .unionOf() - .nullType() - .and() - .stringType() - .endUnion() - .noDefault() - .endRecord() - .endUnion())); - + Schema.createUnion( + Schema.create(Schema.Type.NULL), + Schema.createRecord( + "scion", + "Translated Avro Schema for scion", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Schema.Field( + "species", + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), + null, + (Object) null)))))); assertThat( avroSchema.getField("associates").schema(), equalTo( - SchemaBuilder.array() - .items() - .record("associates") - .doc("Translated Avro Schema for associates") - .namespace("org.apache.beam.sdk.io.gcp.bigquery") - .fields() - .name("species") - .type() - .unionOf() - .nullType() - .and() - .stringType() - .endUnion() - .noDefault() - .endRecord())); + Schema.createArray( + Schema.createRecord( + "associates", + "Translated Avro Schema for associates", + "org.apache.beam.sdk.io.gcp.bigquery", + false, + ImmutableList.of( + new Schema.Field( + "species", + Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), + null, + (Object) null)))))); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java index 570435a4c95f..497653f9ab8d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java @@ -170,7 +170,9 @@ public void testDefaultQueryBasedSource() throws Exception { @Test public void testQueryBasedSourceWithCustomQuery() throws Exception { TypedRead typedRead = - BigQueryIO.readTableRows().fromQuery("SELECT * FROM `google.com:project.dataset.table`"); + BigQueryIO.read(new TableRowParser()) + .fromQuery("SELECT * FROM `google.com:project.dataset.table`") + .withCoder(TableRowJsonCoder.of()); checkTypedReadQueryObject(typedRead, "SELECT * FROM `google.com:project.dataset.table`"); } @@ -225,7 +227,10 @@ public void testQueryBasedSourceWithTemplateCompatibility() throws Exception { } private TypedRead getDefaultTypedRead() { - return BigQueryIO.readTableRows().fromQuery(DEFAULT_QUERY).withMethod(Method.DIRECT_READ); + return BigQueryIO.read(new TableRowParser()) + .fromQuery(DEFAULT_QUERY) + .withCoder(TableRowJsonCoder.of()) + .withMethod(Method.DIRECT_READ); } private void checkTypedReadQueryObject(TypedRead typedRead, String query) { @@ -305,7 +310,7 @@ public void testQuerySourceEstimatedSize() throws Exception { /* queryTempProject = */ null, /* kmsKey = */ null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), fakeBigQueryServices); @@ -418,7 +423,7 @@ private void doQuerySourceInitialSplit( /* queryTempProject = */ null, /* kmsKey = */ null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -667,7 +672,7 @@ public void testQuerySourceInitialSplitWithBigQueryProject_EmptyResult() throws /* queryTempProject = */ null, /* kmsKey = */ null, DataFormat.AVRO, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -739,7 +744,7 @@ public void testQuerySourceInitialSplit_EmptyResult() throws Exception { /* queryTempProject = */ null, /* kmsKey = */ null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -764,7 +769,7 @@ public void testQuerySourceCreateReader() throws Exception { /* queryTempProject = */ null, /* kmsKey = */ null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), fakeBigQueryServices); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index 4740b61d808d..d7930b595538 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -193,7 +193,8 @@ public void teardown() { @Test public void testBuildTableBasedSource() { BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table"); checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table"); @@ -203,7 +204,8 @@ public void testBuildTableBasedSource() { @Test public void testBuildTableBasedSourceWithoutValidation() { BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .withoutValidation(); @@ -214,7 +216,10 @@ public void testBuildTableBasedSourceWithoutValidation() { @Test public void testBuildTableBasedSourceWithDefaultProject() { BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows().withMethod(Method.DIRECT_READ).from("myDataset.myTable"); + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) + .withMethod(Method.DIRECT_READ) + .from("myDataset.myTable"); checkTypedReadTableObject(typedRead, null, "myDataset", "myTable"); } @@ -226,7 +231,10 @@ public void testBuildTableBasedSourceWithTableReference() { .setDatasetId("dataset") .setTableId("table"); BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows().withMethod(Method.DIRECT_READ).from(tableReference); + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) + .withMethod(Method.DIRECT_READ) + .from(tableReference); checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table"); } @@ -247,7 +255,8 @@ public void testBuildSourceWithTableAndFlatten() { + " which only applies to queries"); p.apply( "ReadMyTable", - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .withoutResultFlattening()); @@ -262,7 +271,8 @@ public void testBuildSourceWithTableAndSqlDialect() { + " which only applies to queries"); p.apply( "ReadMyTable", - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .usingStandardSql()); @@ -273,7 +283,8 @@ public void testBuildSourceWithTableAndSqlDialect() { public void testDisplayData() { String tableSpec = "foo.com:project:dataset.table"; BigQueryIO.TypedRead typedRead = - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .withSelectedFields(ImmutableList.of("foo", "bar")) .withProjectionPushdownApplied() @@ -288,7 +299,8 @@ public void testDisplayData() { public void testName() { assertEquals( "BigQueryIO.TypedRead", - BigQueryIO.readTableRows() + BigQueryIO.read(new TableRowParser()) + .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) .from("foo.com:project:dataset.table") .getName()); @@ -335,7 +347,7 @@ private void doTableSourceEstimatedSizeTest(boolean useStreamingBuffer) throws E ValueProvider.StaticValueProvider.of(tableRef), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -355,7 +367,7 @@ public void testTableSourceEstimatedSize_WithBigQueryProject() throws Exception ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -374,7 +386,7 @@ public void testTableSourceEstimatedSize_WithDefaultProject() throws Exception { ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -466,7 +478,7 @@ private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) thr ValueProvider.StaticValueProvider.of(tableRef), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -514,7 +526,7 @@ public void testTableSourceInitialSplit_WithSelectedFieldsAndRowRestriction() th ValueProvider.StaticValueProvider.of(tableRef), StaticValueProvider.of(Lists.newArrayList("name")), StaticValueProvider.of("number > 5"), - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -559,7 +571,7 @@ public void testTableSourceInitialSplit_WithDefaultProject() throws Exception { ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -600,7 +612,7 @@ public void testTableSourceInitialSplit_EmptyTable() throws Exception { ValueProvider.StaticValueProvider.of(tableRef), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices() .withDatasetService(fakeDatasetService) @@ -618,7 +630,7 @@ public void testTableSourceCreateReader() throws Exception { BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table")), null, null, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withDatasetService(fakeDatasetService)); @@ -737,7 +749,7 @@ public void testStreamSourceEstimatedSizeBytes() throws Exception { ReadSession.getDefaultInstance(), ReadStream.getDefaultInstance(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices()); @@ -752,7 +764,7 @@ public void testStreamSourceSplit() throws Exception { ReadSession.getDefaultInstance(), ReadStream.getDefaultInstance(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices()); @@ -781,7 +793,7 @@ public void testSplitReadStreamAtFraction() throws IOException { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -824,7 +836,7 @@ public void testReadFromStreamSource() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -880,7 +892,7 @@ public void testFractionConsumed() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -965,7 +977,7 @@ public void testFractionConsumedWithSplit() throws Exception { readSession, ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1050,7 +1062,7 @@ public void testStreamSourceSplitAtFractionSucceeds() throws Exception { .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1187,7 +1199,7 @@ public void testStreamSourceSplitAtFractionRepeated() throws Exception { .build(), readStreams.get(0), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1251,7 +1263,7 @@ public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossible() throws .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1340,7 +1352,7 @@ public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPoint() thr .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1432,7 +1444,7 @@ public void testStreamSourceSplitAtFractionFailsWhenReaderRunning() throws Excep readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1764,7 +1776,7 @@ public void testReadFromStreamSourceArrow() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1813,7 +1825,7 @@ public void testFractionConsumedArrow() throws Exception { readSession, ReadStream.newBuilder().setName("readStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1895,7 +1907,7 @@ public void testFractionConsumedWithSplitArrow() throws Exception { readSession, ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -1976,7 +1988,7 @@ public void testStreamSourceSplitAtFractionSucceedsArrow() throws Exception { .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -2095,7 +2107,7 @@ public void testStreamSourceSplitAtFractionRepeatedArrow() throws Exception { .build(), readStreams.get(0), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -2155,7 +2167,7 @@ public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossibleArrow() th .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); @@ -2241,7 +2253,7 @@ public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPointArrow( .build(), ReadStream.newBuilder().setName("parentStream").build(), TABLE_SCHEMA, - TableRowParser.INSTANCE, + new TableRowParser(), TableRowJsonCoder.of(), new FakeBigQueryServices().withStorageClient(fakeStorageClient)); From c82dc0561700e65afdf718aab989b3bac613e125 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 23 Sep 2024 09:40:09 +0200 Subject: [PATCH 3/4] Apply review comments --- .../beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index 06a8e4e100ef..f0a79cdcf5a7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -305,8 +305,6 @@ static TableRow convertGenericRecordToTableRow(GenericRecord record) { } private static @Nullable Object getTypedCellValue(String name, Schema schema, Object v) { - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field - // is optional (and so it may be null), but defaults to "NULLABLE". Type type = schema.getType(); switch (type) { case ARRAY: @@ -315,8 +313,7 @@ static TableRow convertGenericRecordToTableRow(GenericRecord record) { return convertNullableField(name, schema, v); case MAP: throw new UnsupportedOperationException( - String.format( - "Unexpected BigQuery field schema type %s for field named %s", type, name)); + String.format("Unexpected Avro field schema type %s for field named %s", type, name)); default: return convertRequiredField(name, schema, v); } @@ -354,8 +351,7 @@ private static Object convertRequiredField(String name, Schema schema, Object v) return formatDate((Integer) v); } else { throw new UnsupportedOperationException( - String.format( - "Unexpected BigQuery field schema type %s for field named %s", type, name)); + String.format("Unexpected Avro field schema type %s for field named %s", type, name)); } case LONG: if (logicalType instanceof LogicalTypes.TimeMicros) { @@ -389,8 +385,7 @@ private static Object convertRequiredField(String name, Schema schema, Object v) return convertGenericRecordToTableRow((GenericRecord) v); default: throw new UnsupportedOperationException( - String.format( - "Unexpected BigQuery field schema type %s for field named %s", type, name)); + String.format("Unexpected Avro field schema type %s for field named %s", type, name)); } } From 85a7616b37e984208e9c2e5f3a55ae239d24cb91 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 23 Sep 2024 11:11:18 +0200 Subject: [PATCH 4/4] Streamline avro type conversion with beam schema --- .../io/gcp/bigquery/BigQueryAvroUtils.java | 75 +++++++++---------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index f0a79cdcf5a7..ccfa87e577ed 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -79,30 +79,49 @@ public DateTimeLogicalType() { */ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTypes) { String bqType = schema.getType(); + // see + // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType-- switch (bqType) { - case "BOOL": - case "BOOLEAN": - // boolean - return SchemaBuilder.builder().booleanType(); + case "STRING": + // string + return SchemaBuilder.builder().stringType(); case "BYTES": // bytes return SchemaBuilder.builder().bytesType(); - case "FLOAT64": - case "FLOAT": // even if not a valid BQ type, it is used in the schema - // double - return SchemaBuilder.builder().doubleType(); - case "INT64": - case "INT": - case "SMALLINT": case "INTEGER": - case "BIGINT": - case "TINYINT": - case "BYTEINT": + case "INT64": // long return SchemaBuilder.builder().longType(); - case "STRING": - // string - return SchemaBuilder.builder().stringType(); + case "FLOAT": + case "FLOAT64": + // double + return SchemaBuilder.builder().doubleType(); + case "BOOLEAN": + case "BOOL": + // boolean + return SchemaBuilder.builder().booleanType(); + case "TIMESTAMP": + // in Extract Jobs, it always uses the Avro logical type + // we may have to change this if we move to EXPORT DATA + return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType()); + case "DATE": + if (useAvroLogicalTypes) { + return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); + } else { + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + } + case "TIME": + if (useAvroLogicalTypes) { + return LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType()); + } else { + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + } + case "DATETIME": + if (useAvroLogicalTypes) { + return DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType()); + } else { + return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); + } case "NUMERIC": case "BIGNUMERIC": // decimal @@ -119,27 +138,6 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy logicalType = LogicalTypes.decimal(77, 38); } return logicalType.addToSchema(SchemaBuilder.builder().bytesType()); - case "DATE": - if (useAvroLogicalTypes) { - return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); - } else { - return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); - } - case "DATETIME": - if (useAvroLogicalTypes) { - return DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType()); - } else { - return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); - } - case "TIME": - if (useAvroLogicalTypes) { - return LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType()); - } else { - return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); - } - case "TIMESTAMP": - // somehow the doc is wrong and BQ always uses logical type - return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType()); case "GEOGRAPHY": case "JSON": return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString(); @@ -147,6 +145,7 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy case "STRUCT": // record throw new IllegalArgumentException("RECORD/STRUCT are not primitive types"); + case "RANGE": // TODO add support for range type default: throw new IllegalArgumentException("Unknown BigQuery type: " + bqType); }