Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Map and Arrays of Maps in BQ for StorageWrites for Beam Rows #22179

Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -202,7 +204,19 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) {
builder = builder.setType(type);
break;
case MAP:
throw new RuntimeException("Map types not supported by BigQuery.");
@Nullable FieldType keyType = field.getType().getMapKeyType();
@Nullable FieldType valueType = field.getType().getMapValueType();
if (keyType == null || valueType == null) {
throw new RuntimeException("Unexpected null element type!");
}

builder =
builder
.setType(TableFieldSchema.Type.STRUCT)
.addFields(fieldDescriptorFromBeamField(Field.of("key", keyType)))
.addFields(fieldDescriptorFromBeamField(Field.of("value", valueType)))
.setMode(TableFieldSchema.Mode.REPEATED);
break;
default:
@Nullable
TableFieldSchema.Type primitiveType = PRIMITIVE_TYPES.get(field.getType().getTypeName());
Expand Down Expand Up @@ -231,6 +245,8 @@ private static Object messageValueFromRowValue(
if (value == null) {
if (fieldDescriptor.isOptional()) {
return null;
} else if (fieldDescriptor.isRepeated()) {
return Collections.emptyList();
} else {
throw new IllegalArgumentException(
"Received null value for non-nullable field " + fieldDescriptor.getName());
Expand All @@ -250,9 +266,18 @@ private static Object toProtoValue(
if (arrayElementType == null) {
throw new RuntimeException("Unexpected null element type!");
}
return list.stream()
.map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
.collect(Collectors.toList());
Boolean shouldFlatMap =
arrayElementType.getTypeName().isCollectionType()
|| arrayElementType.getTypeName().isMapType();

Stream<Object> valueStream =
list.stream().map(v -> toProtoValue(fieldDescriptor, arrayElementType, v));

if (shouldFlatMap) {
valueStream = valueStream.flatMap(vs -> ((List) vs).stream());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain why flatMap is correct here?

Copy link
Contributor Author

@prodriguezdefino prodriguezdefino Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because BQ does not support arrays of maps, this code is making the decision of flattening those structures (the if condition is computed based on that particular scenario).

[
map1 [k1,v2] [k2,v2] ,
map2 [k3,v3],
map3 [k4,v4] [k5,v5],
]

------------------------- to
[
record {key:k1, value:v1},
record {key:k2, value:v2},
record {key:k3, value:v3},
record {key:k4, value:v4},
record {key:k5, value:v5}
]

It respects the order in the array and the inherent order of iteration in the maps, but it won't check for repeated keys across the maps in the original array.


return valueStream.collect(Collectors.toList());
case ITERABLE:
Iterable<Object> iterable = (Iterable<Object>) value;
@Nullable FieldType iterableElementType = beamFieldType.getCollectionElementType();
Expand All @@ -263,12 +288,46 @@ private static Object toProtoValue(
.map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
.collect(Collectors.toList());
case MAP:
throw new RuntimeException("Map types not supported by BigQuery.");
Map<Object, Object> map = (Map<Object, Object>) value;
@Nullable FieldType keyType = beamFieldType.getMapKeyType();
@Nullable FieldType valueType = beamFieldType.getMapValueType();
if (keyType == null || valueType == null) {
throw new RuntimeException("Unexpected null element type!");
}

return map.entrySet().stream()
.map(
(Map.Entry<Object, Object> entry) ->
mapEntryToProtoValue(
fieldDescriptor.getMessageType(), keyType, valueType, entry))
.collect(Collectors.toList());
default:
return scalarToProtoValue(beamFieldType, value);
}
}

static Object mapEntryToProtoValue(
Descriptor descriptor,
FieldType keyFieldType,
FieldType valueFieldType,
Map.Entry<Object, Object> entryValue) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
FieldDescriptor keyFieldDescriptor =
Preconditions.checkNotNull(descriptor.findFieldByName("key"));
@Nullable Object key = toProtoValue(keyFieldDescriptor, keyFieldType, entryValue.getKey());
if (key != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are null keys allowed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, AFAICT the backing Map in the Row object is a HashMap (with expected size) so I would say that null keys are allowed in a map property for a Row object.
This code only ignores setting the value in the proto if null was the value present on the map's key.

builder.setField(keyFieldDescriptor, key);
}
FieldDescriptor valueFieldDescriptor =
Preconditions.checkNotNull(descriptor.findFieldByName("value"));
@Nullable
Object value = toProtoValue(valueFieldDescriptor, valueFieldType, entryValue.getValue());
if (value != null) {
builder.setField(valueFieldDescriptor, value);
}
return builder.build();
}

@VisibleForTesting
static Object scalarToProtoValue(FieldType beamFieldType, Object value) {
if (beamFieldType.getTypeName() == TypeName.LOGICAL_TYPE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
}
if (type.getTypeName().isCollectionType()) {
type = Preconditions.checkArgumentNotNull(type.getCollectionElementType());
if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) {
if (type.getTypeName().isCollectionType() && !type.getTypeName().isMapType()) {
throw new IllegalArgumentException("Array of collection is not supported in BigQuery.");
}
field.setMode(Mode.REPEATED.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
Expand All @@ -34,7 +35,9 @@
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
Expand Down Expand Up @@ -273,12 +276,14 @@ public class BeamRowToStorageApiProtoTest {
.addField("nested", FieldType.row(BASE_SCHEMA).withNullable(true))
.addField("nestedArray", FieldType.array(FieldType.row(BASE_SCHEMA)))
.addField("nestedIterable", FieldType.iterable(FieldType.row(BASE_SCHEMA)))
.addField("nestedMap", FieldType.map(FieldType.STRING, FieldType.row(BASE_SCHEMA)))
.build();
private static final Row NESTED_ROW =
Row.withSchema(NESTED_SCHEMA)
.withFieldValue("nested", BASE_ROW)
.withFieldValue("nestedArray", ImmutableList.of(BASE_ROW, BASE_ROW))
.withFieldValue("nestedIterable", ImmutableList.of(BASE_ROW, BASE_ROW))
.withFieldValue("nestedMap", ImmutableMap.of("key1", BASE_ROW, "key2", BASE_ROW))
.build();

@Test
Expand Down Expand Up @@ -336,12 +341,12 @@ public void testNestedFromSchema() {
.collect(
Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getLabel));

assertEquals(3, types.size());
assertEquals(4, types.size());

Map<String, DescriptorProto> nestedTypes =
descriptor.getNestedTypeList().stream()
.collect(Collectors.toMap(DescriptorProto::getName, Functions.identity()));
assertEquals(3, nestedTypes.size());
assertEquals(4, nestedTypes.size());
assertEquals(Type.TYPE_MESSAGE, types.get("nested"));
assertEquals(Label.LABEL_OPTIONAL, typeLabels.get("nested"));
String nestedTypeName1 = typeNames.get("nested");
Expand All @@ -368,6 +373,23 @@ public void testNestedFromSchema() {
.collect(
Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
assertEquals(expectedBaseTypes, nestedTypes3);

assertEquals(Type.TYPE_MESSAGE, types.get("nestedmap"));
assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedmap"));
String nestedTypeName4 = typeNames.get("nestedmap");
// expects 2 fields in the nested map, key and value
assertEquals(2, nestedTypes.get(nestedTypeName4).getFieldList().size());
Supplier<Stream<FieldDescriptorProto>> stream =
() -> nestedTypes.get(nestedTypeName4).getFieldList().stream();
assertTrue(stream.get().anyMatch(fdp -> fdp.getName().equals("key")));
assertTrue(stream.get().anyMatch(fdp -> fdp.getName().equals("value")));

Map<String, Type> nestedTypes4 =
nestedTypes.get(nestedTypeName4).getNestedTypeList().stream()
.flatMap(vdesc -> vdesc.getFieldList().stream())
.collect(
Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
assertEquals(expectedBaseTypes, nestedTypes4);
}

private void assertBaseRecord(DynamicMessage msg) {
Expand All @@ -384,7 +406,7 @@ public void testMessageFromTableRow() throws Exception {
TableRowToStorageApiProto.getDescriptorFromTableSchema(
BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(NESTED_SCHEMA), true);
DynamicMessage msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, NESTED_ROW);
assertEquals(3, msg.getAllFields().size());
assertEquals(4, msg.getAllFields().size());

Map<String, FieldDescriptor> fieldDescriptors =
descriptor.getFields().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,18 @@ public void testToTableSchema_map() {
assertThat(field.getFields(), containsInAnyOrder(MAP_KEY, MAP_VALUE));
}

@Test
public void testToTableSchema_map_array() {
TableSchema schema = toTableSchema(MAP_ARRAY_TYPE);

assertThat(schema.getFields().size(), equalTo(1));
TableFieldSchema field = schema.getFields().get(0);
assertThat(field.getName(), equalTo("map"));
assertThat(field.getType(), equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
assertThat(field.getFields(), containsInAnyOrder(MAP_KEY, MAP_VALUE));
}

@Test
public void testToTableRow_flat() {
TableRow row = toTableRow().apply(FLAT_ROW);
Expand Down