diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index bdee2eef570d..ccfa87e577ed 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -21,15 +21,12 @@
import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
import static java.time.temporal.ChronoField.NANO_OF_SECOND;
import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verify;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify.verifyNotNull;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalTime;
@@ -37,7 +34,6 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -47,10 +43,10 @@
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.format.DateTimeFormat;
@@ -64,35 +60,96 @@
*/
class BigQueryAvroUtils {
+ // org.apache.avro.LogicalType
+ static class DateTimeLogicalType extends LogicalType {
+ public DateTimeLogicalType() {
+ super("datetime");
+ }
+ }
+
+ static final DateTimeLogicalType DATETIME_LOGICAL_TYPE = new DateTimeLogicalType();
+
/**
* Defines the valid mapping between BigQuery types and native Avro types.
*
- *
Some BigQuery types are duplicated here since slightly different Avro records are produced
- * when exporting data in Avro format and when reading data directly using the read API.
+ * @see BQ avro
+ * export
+ * @see BQ
+ * avro storage
*/
- static final ImmutableMultimap BIG_QUERY_TO_AVRO_TYPES =
- ImmutableMultimap.builder()
- .put("STRING", Type.STRING)
- .put("GEOGRAPHY", Type.STRING)
- .put("BYTES", Type.BYTES)
- .put("INTEGER", Type.LONG)
- .put("INT64", Type.LONG)
- .put("FLOAT", Type.DOUBLE)
- .put("FLOAT64", Type.DOUBLE)
- .put("NUMERIC", Type.BYTES)
- .put("BIGNUMERIC", Type.BYTES)
- .put("BOOLEAN", Type.BOOLEAN)
- .put("BOOL", Type.BOOLEAN)
- .put("TIMESTAMP", Type.LONG)
- .put("RECORD", Type.RECORD)
- .put("STRUCT", Type.RECORD)
- .put("DATE", Type.STRING)
- .put("DATE", Type.INT)
- .put("DATETIME", Type.STRING)
- .put("TIME", Type.STRING)
- .put("TIME", Type.LONG)
- .put("JSON", Type.STRING)
- .build();
+ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTypes) {
+ String bqType = schema.getType();
+ // see
+ // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType--
+ switch (bqType) {
+ case "STRING":
+ // string
+ return SchemaBuilder.builder().stringType();
+ case "BYTES":
+ // bytes
+ return SchemaBuilder.builder().bytesType();
+ case "INTEGER":
+ case "INT64":
+ // long
+ return SchemaBuilder.builder().longType();
+ case "FLOAT":
+ case "FLOAT64":
+ // double
+ return SchemaBuilder.builder().doubleType();
+ case "BOOLEAN":
+ case "BOOL":
+ // boolean
+ return SchemaBuilder.builder().booleanType();
+ case "TIMESTAMP":
+ // in Extract Jobs, it always uses the Avro logical type
+ // we may have to change this if we move to EXPORT DATA
+ return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+ case "DATE":
+ if (useAvroLogicalTypes) {
+ return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ } else {
+ return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
+ }
+ case "TIME":
+ if (useAvroLogicalTypes) {
+ return LogicalTypes.timeMicros().addToSchema(SchemaBuilder.builder().longType());
+ } else {
+ return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
+ }
+ case "DATETIME":
+ if (useAvroLogicalTypes) {
+ return DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType());
+ } else {
+ return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
+ }
+ case "NUMERIC":
+ case "BIGNUMERIC":
+ // decimal
+ LogicalType logicalType;
+ if (schema.getScale() != null) {
+ logicalType =
+ LogicalTypes.decimal(schema.getPrecision().intValue(), schema.getScale().intValue());
+ } else if (schema.getPrecision() != null) {
+ logicalType = LogicalTypes.decimal(schema.getPrecision().intValue());
+ } else if (bqType.equals("NUMERIC")) {
+ logicalType = LogicalTypes.decimal(38, 9);
+ } else {
+ // BIGNUMERIC
+ logicalType = LogicalTypes.decimal(77, 38);
+ }
+ return logicalType.addToSchema(SchemaBuilder.builder().bytesType());
+ case "GEOGRAPHY":
+ case "JSON":
+ return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
+ case "RECORD":
+ case "STRUCT":
+ // record
+ throw new IllegalArgumentException("RECORD/STRUCT are not primitive types");
+ case "RANGE": // TODO add support for range type
+ default:
+ throw new IllegalArgumentException("Unknown BigQuery type: " + bqType);
+ }
+ }
/**
* Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
@@ -216,20 +273,27 @@ private static Stream mapTableFieldSchema(
*
* See "Avro
* format" for more information.
+ *
+ * @deprecated Only kept for previous TableRowParser implementation
*/
+ @Deprecated
static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
- return convertGenericRecordToTableRow(record, schema.getFields());
+ return convertGenericRecordToTableRow(record);
}
- private static TableRow convertGenericRecordToTableRow(
- GenericRecord record, List fields) {
+ /**
+ * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
+ *
+ * See "Avro
+ * format" for more information.
+ */
+ static TableRow convertGenericRecordToTableRow(GenericRecord record) {
TableRow row = new TableRow();
- for (TableFieldSchema subSchema : fields) {
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
- // is required, so it may not be null.
- Field field = record.getSchema().getField(subSchema.getName());
+ Schema schema = record.getSchema();
+
+ for (Field field : schema.getFields()) {
Object convertedValue =
- getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
+ getTypedCellValue(field.name(), field.schema(), record.get(field.pos()));
if (convertedValue != null) {
// To match the JSON files exported by BigQuery, do not include null values in the output.
row.set(field.name(), convertedValue);
@@ -239,32 +303,22 @@ private static TableRow convertGenericRecordToTableRow(
return row;
}
- private static @Nullable Object getTypedCellValue(
- Schema schema, TableFieldSchema fieldSchema, Object v) {
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
- // is optional (and so it may be null), but defaults to "NULLABLE".
- String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
- switch (mode) {
- case "REQUIRED":
- return convertRequiredField(schema.getType(), schema.getLogicalType(), fieldSchema, v);
- case "REPEATED":
- return convertRepeatedField(schema, fieldSchema, v);
- case "NULLABLE":
- return convertNullableField(schema, fieldSchema, v);
- default:
+ private static @Nullable Object getTypedCellValue(String name, Schema schema, Object v) {
+ Type type = schema.getType();
+ switch (type) {
+ case ARRAY:
+ return convertRepeatedField(name, schema.getElementType(), v);
+ case UNION:
+ return convertNullableField(name, schema, v);
+ case MAP:
throw new UnsupportedOperationException(
- "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
+ String.format("Unexpected Avro field schema type %s for field named %s", type, name));
+ default:
+ return convertRequiredField(name, schema, v);
}
}
- private static List convertRepeatedField(
- Schema schema, TableFieldSchema fieldSchema, Object v) {
- Type arrayType = schema.getType();
- verify(
- arrayType == Type.ARRAY,
- "BigQuery REPEATED field %s should be Avro ARRAY, not %s",
- fieldSchema.getName(),
- arrayType);
+ private static List convertRepeatedField(String name, Schema elementType, Object v) {
// REPEATED fields are represented as Avro arrays.
if (v == null) {
// Handle the case of an empty repeated field.
@@ -273,145 +327,100 @@ private static List convertRepeatedField(
@SuppressWarnings("unchecked")
List elements = (List) v;
ArrayList values = new ArrayList<>();
- Type elementType = schema.getElementType().getType();
- LogicalType elementLogicalType = schema.getElementType().getLogicalType();
for (Object element : elements) {
- values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element));
+ values.add(convertRequiredField(name, elementType, element));
}
return values;
}
- private static Object convertRequiredField(
- Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) {
+ private static Object convertRequiredField(String name, Schema schema, Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
- checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
- // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
- // is required, so it may not be null.
- String bqType = fieldSchema.getType();
- ImmutableCollection expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType);
- verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
- verify(
- expectedAvroTypes.contains(avroType),
- "Expected Avro schema types %s for BigQuery %s field %s, but received %s",
- expectedAvroTypes,
- bqType,
- fieldSchema.getName(),
- avroType);
- // For historical reasons, don't validate avroLogicalType except for with NUMERIC.
- // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type.
- switch (bqType) {
- case "STRING":
- case "DATETIME":
- case "GEOGRAPHY":
- case "JSON":
- // Avro will use a CharSequence to represent String objects, but it may not always use
- // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
- verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
- return v.toString();
- case "DATE":
- if (avroType == Type.INT) {
- verify(v instanceof Integer, "Expected Integer, got %s", v.getClass());
- verifyNotNull(avroLogicalType, "Expected Date logical type");
- verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type");
+ checkNotNull(v, "REQUIRED field %s should not be null", name);
+
+ Type type = schema.getType();
+ LogicalType logicalType = schema.getLogicalType();
+ switch (type) {
+ case BOOLEAN:
+ // SQL types BOOL, BOOLEAN
+ return v;
+ case INT:
+ if (logicalType instanceof LogicalTypes.Date) {
+ // SQL types DATE
return formatDate((Integer) v);
} else {
- verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
- return v.toString();
+ throw new UnsupportedOperationException(
+ String.format("Unexpected Avro field schema type %s for field named %s", type, name));
}
- case "TIME":
- if (avroType == Type.LONG) {
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
- verify(
- avroLogicalType instanceof LogicalTypes.TimeMicros,
- "Expected TimeMicros logical type");
+ case LONG:
+ if (logicalType instanceof LogicalTypes.TimeMicros) {
+ // SQL types TIME
return formatTime((Long) v);
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ // SQL types TIMESTAMP
+ return formatTimestamp((Long) v);
} else {
- verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
- return v.toString();
+ // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+ return ((Long) v).toString();
}
- case "INTEGER":
- case "INT64":
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- return ((Long) v).toString();
- case "FLOAT":
- case "FLOAT64":
- verify(v instanceof Double, "Expected Double, got %s", v.getClass());
+ case DOUBLE:
+ // SQL types FLOAT64
return v;
- case "NUMERIC":
- case "BIGNUMERIC":
- // NUMERIC data types are represented as BYTES with the DECIMAL logical type. They are
- // converted back to Strings with precision and scale determined by the logical type.
- verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
- verifyNotNull(avroLogicalType, "Expected Decimal logical type");
- verify(avroLogicalType instanceof LogicalTypes.Decimal, "Expected Decimal logical type");
- BigDecimal numericValue =
- new Conversions.DecimalConversion()
- .fromBytes((ByteBuffer) v, Schema.create(avroType), avroLogicalType);
- return numericValue.toString();
- case "BOOL":
- case "BOOLEAN":
- verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
- return v;
- case "TIMESTAMP":
- // TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch.
- // Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC.
- verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- return formatTimestamp((Long) v);
- case "RECORD":
- case "STRUCT":
- verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
- return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
- case "BYTES":
- verify(v instanceof ByteBuffer, "Expected ByteBuffer, got %s", v.getClass());
- ByteBuffer byteBuffer = (ByteBuffer) v;
- byte[] bytes = new byte[byteBuffer.limit()];
- byteBuffer.get(bytes);
- return BaseEncoding.base64().encode(bytes);
+ case BYTES:
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ // SQL tpe NUMERIC, BIGNUMERIC
+ return new Conversions.DecimalConversion()
+ .fromBytes((ByteBuffer) v, schema, logicalType)
+ .toString();
+ } else {
+ // SQL types BYTES
+ return BaseEncoding.base64().encode(((ByteBuffer) v).array());
+ }
+ case STRING:
+ // SQL types STRING, DATETIME, GEOGRAPHY, JSON
+ // when not using logical type DATE, TIME too
+ return v.toString();
+ case RECORD:
+ return convertGenericRecordToTableRow((GenericRecord) v);
default:
throw new UnsupportedOperationException(
- String.format(
- "Unexpected BigQuery field schema type %s for field named %s",
- fieldSchema.getType(), fieldSchema.getName()));
+ String.format("Unexpected Avro field schema type %s for field named %s", type, name));
}
}
- private static @Nullable Object convertNullableField(
- Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
+ private static @Nullable Object convertNullableField(String name, Schema union, Object v) {
// NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
verify(
- avroSchema.getType() == Type.UNION,
+ union.getType() == Type.UNION,
"Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
- avroSchema.getType(),
- fieldSchema.getName());
- List unionTypes = avroSchema.getTypes();
+ union.getType(),
+ name);
+ List unionTypes = union.getTypes();
verify(
unionTypes.size() == 2,
"BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
- fieldSchema.getName(),
- unionTypes);
+ name,
+ union);
- if (v == null) {
+ Schema type = union.getTypes().get(GenericData.get().resolveUnion(union, v));
+ if (type.getType() == Type.NULL) {
return null;
+ } else {
+ return convertRequiredField(name, type, v);
}
-
- Type firstType = unionTypes.get(0).getType();
- if (!firstType.equals(Type.NULL)) {
- return convertRequiredField(firstType, unionTypes.get(0).getLogicalType(), fieldSchema, v);
- }
- return convertRequiredField(
- unionTypes.get(1).getType(), unionTypes.get(1).getLogicalType(), fieldSchema, v);
}
- static Schema toGenericAvroSchema(
- String schemaName, List fieldSchemas, @Nullable String namespace) {
+ private static Schema toGenericAvroSchema(
+ String schemaName,
+ List fieldSchemas,
+ Boolean useAvroLogicalTypes,
+ @Nullable String namespace) {
String nextNamespace = namespace == null ? null : String.format("%s.%s", namespace, schemaName);
List avroFields = new ArrayList<>();
for (TableFieldSchema bigQueryField : fieldSchemas) {
- avroFields.add(convertField(bigQueryField, nextNamespace));
+ avroFields.add(convertField(bigQueryField, useAvroLogicalTypes, nextNamespace));
}
return Schema.createRecord(
schemaName,
@@ -421,11 +430,19 @@ static Schema toGenericAvroSchema(
avroFields);
}
- static Schema toGenericAvroSchema(String schemaName, List fieldSchemas) {
- return toGenericAvroSchema(
- schemaName,
- fieldSchemas,
- hasNamespaceCollision(fieldSchemas) ? "org.apache.beam.sdk.io.gcp.bigquery" : null);
+ static Schema toGenericAvroSchema(TableSchema tableSchema) {
+ return toGenericAvroSchema("root", tableSchema.getFields(), true);
+ }
+
+ static Schema toGenericAvroSchema(TableSchema tableSchema, Boolean useAvroLogicalTypes) {
+ return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes);
+ }
+
+ static Schema toGenericAvroSchema(
+ String schemaName, List fieldSchemas, Boolean useAvroLogicalTypes) {
+ String namespace =
+ hasNamespaceCollision(fieldSchemas) ? "org.apache.beam.sdk.io.gcp.bigquery" : null;
+ return toGenericAvroSchema(schemaName, fieldSchemas, useAvroLogicalTypes, namespace);
}
// To maintain backwards compatibility we only disambiguate collisions in the field namespaces as
@@ -452,64 +469,30 @@ private static boolean hasNamespaceCollision(List fieldSchemas
@SuppressWarnings({
"nullness" // Avro library not annotated
})
- private static Field convertField(TableFieldSchema bigQueryField, @Nullable String namespace) {
- ImmutableCollection avroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
- if (avroTypes.isEmpty()) {
- throw new IllegalArgumentException(
- "Unable to map BigQuery field type " + bigQueryField.getType() + " to avro type.");
- }
-
- Type avroType = avroTypes.iterator().next();
- Schema elementSchema;
- if (avroType == Type.RECORD) {
- elementSchema =
- toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields(), namespace);
- } else {
- elementSchema = handleAvroLogicalTypes(bigQueryField, avroType);
- }
+ private static Field convertField(
+ TableFieldSchema bigQueryField, Boolean useAvroLogicalTypes, @Nullable String namespace) {
+ String fieldName = bigQueryField.getName();
Schema fieldSchema;
- if (bigQueryField.getMode() == null || "NULLABLE".equals(bigQueryField.getMode())) {
- fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema);
- } else if ("REQUIRED".equals(bigQueryField.getMode())) {
- fieldSchema = elementSchema;
- } else if ("REPEATED".equals(bigQueryField.getMode())) {
- fieldSchema = Schema.createArray(elementSchema);
+ String bqType = bigQueryField.getType();
+ if ("RECORD".equals(bqType) || "STRUCT".equals(bqType)) {
+ fieldSchema =
+ toGenericAvroSchema(fieldName, bigQueryField.getFields(), useAvroLogicalTypes, namespace);
} else {
- throw new IllegalArgumentException(
- String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode()));
+ fieldSchema = getPrimitiveType(bigQueryField, useAvroLogicalTypes);
+ }
+
+ String bqMode = bigQueryField.getMode();
+ if (bqMode == null || "NULLABLE".equals(bqMode)) {
+ fieldSchema = SchemaBuilder.unionOf().nullType().and().type(fieldSchema).endUnion();
+ } else if ("REPEATED".equals(bqMode)) {
+ fieldSchema = SchemaBuilder.array().items(fieldSchema);
+ } else if (!"REQUIRED".equals(bqMode)) {
+ throw new IllegalArgumentException(String.format("Unknown BigQuery Field Mode: %s", bqMode));
}
return new Field(
- bigQueryField.getName(),
+ fieldName,
fieldSchema,
bigQueryField.getDescription(),
(Object) null /* Cast to avoid deprecated JsonNode constructor. */);
}
-
- private static Schema handleAvroLogicalTypes(TableFieldSchema bigQueryField, Type avroType) {
- String bqType = bigQueryField.getType();
- switch (bqType) {
- case "NUMERIC":
- // Default value based on
- // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
- int precision = Optional.ofNullable(bigQueryField.getPrecision()).orElse(38L).intValue();
- int scale = Optional.ofNullable(bigQueryField.getScale()).orElse(9L).intValue();
- return LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Type.BYTES));
- case "BIGNUMERIC":
- // Default value based on
- // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
- int precisionBigNumeric =
- Optional.ofNullable(bigQueryField.getPrecision()).orElse(77L).intValue();
- int scaleBigNumeric = Optional.ofNullable(bigQueryField.getScale()).orElse(38L).intValue();
- return LogicalTypes.decimal(precisionBigNumeric, scaleBigNumeric)
- .addToSchema(Schema.create(Type.BYTES));
- case "TIMESTAMP":
- return LogicalTypes.timestampMicros().addToSchema(Schema.create(Type.LONG));
- case "GEOGRAPHY":
- Schema geoSchema = Schema.create(Type.STRING);
- geoSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "geography_wkt");
- return geoSchema;
- default:
- return Schema.create(avroType);
- }
- }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 79a3249d6bc9..19ff1576bb24 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -627,9 +627,7 @@ public class BigQueryIO {
GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>();
private static final SerializableFunction
- DEFAULT_AVRO_SCHEMA_FACTORY =
- (SerializableFunction)
- input -> BigQueryAvroUtils.toGenericAvroSchema("root", input.getFields());
+ DEFAULT_AVRO_SCHEMA_FACTORY = BigQueryAvroUtils::toGenericAvroSchema;
/**
* @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link
@@ -793,8 +791,7 @@ static class TableRowParser implements SerializableFunction executeExtract(
List> createSources(
List files, TableSchema schema, @Nullable List metadata)
throws IOException, InterruptedException {
- String avroSchema =
- BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString();
+ String avroSchema = BigQueryAvroUtils.toGenericAvroSchema(schema).toString();
AvroSource.DatumReaderFactory factory = readerFactory.apply(schema);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 305abad5783a..606ce31b8bea 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -310,38 +310,45 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
*
* Supports both standard and legacy SQL types.
*
- * @param typeName Name of the type
+ * @param typeName Name of the type returned by {@link TableFieldSchema#getType()}
* @param nestedFields Nested fields for the given type (eg. RECORD type)
* @return Corresponding Beam {@link FieldType}
*/
private static FieldType fromTableFieldSchemaType(
String typeName, List nestedFields, SchemaConversionOptions options) {
+ // see
+ // https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType--
switch (typeName) {
case "STRING":
return FieldType.STRING;
case "BYTES":
return FieldType.BYTES;
- case "INT64":
case "INTEGER":
+ case "INT64":
return FieldType.INT64;
- case "FLOAT64":
case "FLOAT":
+ case "FLOAT64":
return FieldType.DOUBLE;
- case "BOOL":
case "BOOLEAN":
+ case "BOOL":
return FieldType.BOOLEAN;
- case "NUMERIC":
- return FieldType.DECIMAL;
case "TIMESTAMP":
return FieldType.DATETIME;
- case "TIME":
- return FieldType.logicalType(SqlTypes.TIME);
case "DATE":
return FieldType.logicalType(SqlTypes.DATE);
+ case "TIME":
+ return FieldType.logicalType(SqlTypes.TIME);
case "DATETIME":
return FieldType.logicalType(SqlTypes.DATETIME);
- case "STRUCT":
+ case "NUMERIC":
+ case "BIGNUMERIC":
+ return FieldType.DECIMAL;
+ case "GEOGRAPHY":
+ case "JSON":
+ // TODO Add metadata for custom sql types ?
+ return FieldType.STRING;
case "RECORD":
+ case "STRUCT":
if (options.getInferMaps() && nestedFields.size() == 2) {
TableFieldSchema key = nestedFields.get(0);
TableFieldSchema value = nestedFields.get(1);
@@ -352,9 +359,9 @@ private static FieldType fromTableFieldSchemaType(
fromTableFieldSchemaType(value.getType(), value.getFields(), options));
}
}
-
Schema rowSchema = fromTableFieldSchema(nestedFields, options);
return FieldType.row(rowSchema);
+ case "RANGE": // TODO add support for range type
default:
throw new UnsupportedOperationException(
"Converting BigQuery type " + typeName + " to Beam type is unsupported");
@@ -446,10 +453,27 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp
return fromTableFieldSchema(tableSchema.getFields(), options);
}
+ /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */
+ public static org.apache.avro.Schema toGenericAvroSchema(TableSchema tableSchema) {
+ return toGenericAvroSchema(tableSchema, false);
+ }
+
+ /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */
+ public static org.apache.avro.Schema toGenericAvroSchema(
+ TableSchema tableSchema, Boolean useAvroLogicalTypes) {
+ return toGenericAvroSchema("root", tableSchema.getFields(), useAvroLogicalTypes);
+ }
+
/** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */
public static org.apache.avro.Schema toGenericAvroSchema(
String schemaName, List fieldSchemas) {
- return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas);
+ return toGenericAvroSchema(schemaName, fieldSchemas, false);
+ }
+
+ /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */
+ public static org.apache.avro.Schema toGenericAvroSchema(
+ String schemaName, List fieldSchemas, Boolean useAvroLogicalTypes) {
+ return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, useAvroLogicalTypes);
}
private static final BigQueryIO.TypedRead.ToBeamRowFunction
@@ -514,9 +538,20 @@ public static Row toBeamRow(GenericRecord record, Schema schema, ConversionOptio
return Row.withSchema(schema).addValues(valuesInOrder).build();
}
+ /**
+ * Convert generic record to Bq TableRow.
+ *
+ * @deprecated use {@link #convertGenericRecordToTableRow(GenericRecord)}
+ */
+ @Deprecated
public static TableRow convertGenericRecordToTableRow(
GenericRecord record, TableSchema tableSchema) {
- return BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ return convertGenericRecordToTableRow(record);
+ }
+
+ /** Convert generic record to Bq TableRow. */
+ public static TableRow convertGenericRecordToTableRow(GenericRecord record) {
+ return BigQueryAvroUtils.convertGenericRecordToTableRow(record);
}
/** Convert a Beam Row to a BigQuery TableRow. */
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index c87888134c8a..662f2658eb6b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -28,6 +28,7 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Conversions;
@@ -38,14 +39,14 @@
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.AvroSchema;
import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
import org.apache.avro.util.Utf8;
-import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
-import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -96,64 +97,26 @@ public class BigQueryAvroUtilsTest {
.setFields(subFields),
new TableFieldSchema().setName("geoPositions").setType("GEOGRAPHY").setMode("NULLABLE"));
- private Pair convertToByteBuffer(BigDecimal bigDecimal, Schema schema) {
- LogicalType bigDecimalLogicalType =
- LogicalTypes.decimal(bigDecimal.precision(), bigDecimal.scale());
- // DecimalConversion.toBytes returns a ByteBuffer, which can be mutated by callees if passed
- // to other methods. We wrap the byte array as a ByteBuffer before adding it to the
- // GenericRecords.
- byte[] bigDecimalBytes =
- new Conversions.DecimalConversion()
- .toBytes(bigDecimal, schema, bigDecimalLogicalType)
- .array();
- return Pair.of(bigDecimalLogicalType, bigDecimalBytes);
+ private ByteBuffer convertToBytes(BigDecimal bigDecimal, int precision, int scale) {
+ LogicalType bigDecimalLogicalType = LogicalTypes.decimal(precision, scale);
+ return new Conversions.DecimalConversion().toBytes(bigDecimal, null, bigDecimalLogicalType);
}
@Test
public void testConvertGenericRecordToTableRow() throws Exception {
- TableSchema tableSchema = new TableSchema();
- tableSchema.setFields(fields);
-
- // BigQuery encodes NUMERIC and BIGNUMERIC values to Avro using the BYTES type with the DECIMAL
- // logical type. AvroCoder can't apply logical types to Schemas directly, so we need to get the
- // Schema for the Bird class defined below, then replace the field used to test NUMERIC with
- // a field that has the appropriate Schema.
- Schema numericSchema = Schema.create(Type.BYTES);
BigDecimal numeric = new BigDecimal("123456789.123456789");
- Pair numericPair = convertToByteBuffer(numeric, numericSchema);
- Schema bigNumericSchema = Schema.create(Type.BYTES);
+ ByteBuffer numericBytes = convertToBytes(numeric, 38, 9);
BigDecimal bigNumeric =
new BigDecimal(
"578960446186580977117854925043439539266.34992332820282019728792003956564819967");
- Pair bigNumericPair = convertToByteBuffer(bigNumeric, bigNumericSchema);
-
- // In order to update the Schema for NUMERIC and BIGNUMERIC values, we need to recreate all of
- // the Fields.
- List avroFields = new ArrayList<>();
- for (Schema.Field field : AvroCoder.of(Bird.class).getSchema().getFields()) {
- Schema schema = field.schema();
- if ("birthdayMoney".equals(field.name())) {
- // birthdayMoney is nullable field with type BYTES/DECIMAL.
- schema =
- Schema.createUnion(
- Schema.create(Type.NULL), numericPair.getLeft().addToSchema(numericSchema));
- } else if ("lotteryWinnings".equals(field.name())) {
- // lotteryWinnings is nullable field with type BYTES/DECIMAL.
- schema =
- Schema.createUnion(
- Schema.create(Type.NULL), bigNumericPair.getLeft().addToSchema(bigNumericSchema));
- }
- // After a Field is added to a Schema, it is assigned a position, so we can't simply reuse
- // the existing Field.
- avroFields.add(new Schema.Field(field.name(), schema, field.doc(), field.defaultVal()));
- }
- Schema avroSchema = Schema.createRecord(avroFields);
+ ByteBuffer bigNumericBytes = convertToBytes(bigNumeric, 77, 38);
+ Schema avroSchema = ReflectData.get().getSchema(Bird.class);
{
// Test nullable fields.
GenericRecord record = new GenericData.Record(avroSchema);
record.put("number", 5L);
- TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
TableRow row = new TableRow().set("number", "5").set("associates", new ArrayList());
assertEquals(row, convertedRow);
TableRow clonedRow = convertedRow.clone();
@@ -169,15 +132,15 @@ public void testConvertGenericRecordToTableRow() throws Exception {
record.put("number", 5L);
record.put("quality", 5.0);
record.put("birthday", 5L);
- record.put("birthdayMoney", ByteBuffer.wrap(numericPair.getRight()));
- record.put("lotteryWinnings", ByteBuffer.wrap(bigNumericPair.getRight()));
+ record.put("birthdayMoney", numericBytes);
+ record.put("lotteryWinnings", bigNumericBytes);
record.put("flighted", Boolean.TRUE);
record.put("sound", soundByteBuffer);
record.put("anniversaryDate", new Utf8("2000-01-01"));
record.put("anniversaryDatetime", new String("2000-01-01 00:00:00.000005"));
record.put("anniversaryTime", new Utf8("00:00:00.000005"));
record.put("geoPositions", new String("LINESTRING(1 2, 3 4, 5 6, 7 8)"));
- TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
TableRow row =
new TableRow()
.set("number", "5")
@@ -204,9 +167,9 @@ public void testConvertGenericRecordToTableRow() throws Exception {
GenericRecord record = new GenericData.Record(avroSchema);
record.put("number", 5L);
record.put("associates", Lists.newArrayList(nestedRecord));
- record.put("birthdayMoney", ByteBuffer.wrap(numericPair.getRight()));
- record.put("lotteryWinnings", ByteBuffer.wrap(bigNumericPair.getRight()));
- TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ record.put("birthdayMoney", numericBytes);
+ record.put("lotteryWinnings", bigNumericBytes);
+ TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
TableRow row =
new TableRow()
.set("associates", Lists.newArrayList(new TableRow().set("species", "other")))
@@ -223,8 +186,7 @@ public void testConvertGenericRecordToTableRow() throws Exception {
public void testConvertBigQuerySchemaToAvroSchema() {
TableSchema tableSchema = new TableSchema();
tableSchema.setFields(fields);
- Schema avroSchema =
- BigQueryAvroUtils.toGenericAvroSchema("testSchema", tableSchema.getFields());
+ Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema);
assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Type.LONG)));
assertThat(
@@ -260,17 +222,23 @@ public void testConvertBigQuerySchemaToAvroSchema() {
assertThat(
avroSchema.getField("sound").schema(),
equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES))));
+ Schema dateSchema = Schema.create(Type.INT);
+ LogicalTypes.date().addToSchema(dateSchema);
assertThat(
avroSchema.getField("anniversaryDate").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), dateSchema)));
+ Schema dateTimeSchema = Schema.create(Type.STRING);
+ BigQueryAvroUtils.DATETIME_LOGICAL_TYPE.addToSchema(dateTimeSchema);
assertThat(
avroSchema.getField("anniversaryDatetime").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), dateTimeSchema)));
+ Schema timeSchema = Schema.create(Type.LONG);
+ LogicalTypes.timeMicros().addToSchema(timeSchema);
assertThat(
avroSchema.getField("anniversaryTime").schema(),
- equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), timeSchema)));
Schema geoSchema = Schema.create(Type.STRING);
- geoSchema.addProp(LogicalType.LOGICAL_TYPE_PROP, "geography_wkt");
+ geoSchema.addProp("sqlType", "GEOGRAPHY");
assertThat(
avroSchema.getField("geoPositions").schema(),
equalTo(Schema.createUnion(Schema.create(Type.NULL), geoSchema)));
@@ -309,6 +277,109 @@ public void testConvertBigQuerySchemaToAvroSchema() {
(Object) null))))));
}
+ @Test
+ public void testConvertBigQuerySchemaToAvroSchemaWithoutLogicalTypes() {
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setFields(fields);
+ Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false);
+
+ assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Schema.Type.LONG)));
+ assertThat(
+ avroSchema.getField("species").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))));
+ assertThat(
+ avroSchema.getField("quality").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.DOUBLE))));
+ assertThat(
+ avroSchema.getField("quantity").schema(),
+ equalTo(
+ Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG))));
+ assertThat(
+ avroSchema.getField("birthday").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))));
+ assertThat(
+ avroSchema.getField("birthdayMoney").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ LogicalTypes.decimal(38, 9).addToSchema(Schema.create(Schema.Type.BYTES)))));
+ assertThat(
+ avroSchema.getField("lotteryWinnings").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ LogicalTypes.decimal(77, 38).addToSchema(Schema.create(Schema.Type.BYTES)))));
+ assertThat(
+ avroSchema.getField("flighted").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BOOLEAN))));
+ assertThat(
+ avroSchema.getField("sound").schema(),
+ equalTo(
+ Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES))));
+ Schema dateSchema = Schema.create(Schema.Type.STRING);
+ dateSchema.addProp("sqlType", "DATE");
+ assertThat(
+ avroSchema.getField("anniversaryDate").schema(),
+ equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateSchema)));
+ Schema dateTimeSchema = Schema.create(Schema.Type.STRING);
+ dateTimeSchema.addProp("sqlType", "DATETIME");
+ assertThat(
+ avroSchema.getField("anniversaryDatetime").schema(),
+ equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), dateTimeSchema)));
+ Schema timeSchema = Schema.create(Schema.Type.STRING);
+ timeSchema.addProp("sqlType", "TIME");
+ assertThat(
+ avroSchema.getField("anniversaryTime").schema(),
+ equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), timeSchema)));
+ Schema geoSchema = Schema.create(Type.STRING);
+ geoSchema.addProp("sqlType", "GEOGRAPHY");
+ assertThat(
+ avroSchema.getField("geoPositions").schema(),
+ equalTo(Schema.createUnion(Schema.create(Schema.Type.NULL), geoSchema)));
+ assertThat(
+ avroSchema.getField("scion").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ Schema.createRecord(
+ "scion",
+ "Translated Avro Schema for scion",
+ "org.apache.beam.sdk.io.gcp.bigquery",
+ false,
+ ImmutableList.of(
+ new Schema.Field(
+ "species",
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)),
+ null,
+ (Object) null))))));
+ assertThat(
+ avroSchema.getField("associates").schema(),
+ equalTo(
+ Schema.createArray(
+ Schema.createRecord(
+ "associates",
+ "Translated Avro Schema for associates",
+ "org.apache.beam.sdk.io.gcp.bigquery",
+ false,
+ ImmutableList.of(
+ new Schema.Field(
+ "species",
+ Schema.createUnion(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)),
+ null,
+ (Object) null))))));
+ }
+
@Test
public void testFormatTimestamp() {
assertThat(
@@ -427,22 +498,34 @@ public void testSchemaCollisionsInAvroConversion() {
.setType("FLOAT"))))))))),
new TableFieldSchema().setName("platform").setType("STRING")));
// To string should be sufficient here as this exercises Avro's conversion feature
- String output = BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString();
+ String output = BigQueryAvroUtils.toGenericAvroSchema(schema, false).toString();
assertThat(output.length(), greaterThan(0));
}
/** Pojo class used as the record type in tests. */
- @DefaultCoder(AvroCoder.class)
@SuppressWarnings("unused") // Used by Avro reflection.
static class Bird {
long number;
@Nullable String species;
@Nullable Double quality;
@Nullable Long quantity;
- @Nullable Long birthday; // Exercises TIMESTAMP.
- @Nullable ByteBuffer birthdayMoney; // Exercises NUMERIC.
- @Nullable ByteBuffer lotteryWinnings; // Exercises BIGNUMERIC.
- @Nullable String geoPositions; // Exercises GEOGRAPHY.
+
+ @AvroSchema(value = "[\"null\", {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}]")
+ Instant birthday;
+
+ @AvroSchema(
+ value =
+ "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 38, \"scale\": 9}]")
+ BigDecimal birthdayMoney;
+
+ @AvroSchema(
+ value =
+ "[\"null\", {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 77, \"scale\": 38}]")
+ BigDecimal lotteryWinnings;
+
+ @AvroSchema(value = "[\"null\", {\"type\": \"string\", \"sqlType\": \"GEOGRAPHY\"}]")
+ String geoPositions;
+
@Nullable Boolean flighted;
@Nullable ByteBuffer sound;
@Nullable Utf8 anniversaryDate;