Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set a UUID when building a Schema object. #32399

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -90,7 +91,12 @@ public String toString() {
}
}
// A mapping between field names an indices.
private final BiMap<String, Integer> fieldIndices = HashBiMap.create();
private final BiMap<String, Integer> fieldIndices;

// Encoding positions can be used to maintain encoded byte compatibility between schemas with
// different field ordering or with added/removed fields. Such positions affect the encoding
// and decoding of Rows performed by RowCoderGenerator. They are stored within Schemas to
// facilitate plumbing to coders, display data etc but do not affect schema equality / uuid etc.
private Map<String, Integer> encodingPositions = Maps.newHashMap();
private boolean encodingPositionsOverridden = false;

Expand Down Expand Up @@ -312,17 +318,20 @@ public Schema(List<Field> fields) {
}

public Schema(List<Field> fields, Options options) {
this.fields = fields;
this.fields = ImmutableList.copyOf(fields);
int index = 0;
for (Field field : fields) {
BiMap<String, Integer> fieldIndicesMutable = HashBiMap.create();
for (Field field : this.fields) {
Preconditions.checkArgument(
fieldIndices.get(field.getName()) == null,
fieldIndicesMutable.get(field.getName()) == null,
"Duplicate field " + field.getName() + " added to schema");
encodingPositions.put(field.getName(), index);
fieldIndices.put(field.getName(), index++);
fieldIndicesMutable.put(field.getName(), index++);
}
this.hashCode = Objects.hash(fieldIndices, fields);
this.fieldIndices = ImmutableBiMap.copyOf(fieldIndicesMutable);
this.options = options;
this.hashCode = Objects.hash(this.fieldIndices, this.fields, this.options);
this.uuid = UUID.randomUUID();
}

public static Schema of(Field... fields) {
Expand All @@ -334,29 +343,24 @@ public static Schema of(Field... fields) {
* fields.
*/
public Schema sorted() {
// Create a new schema and copy over the appropriate Schema object attributes:
// {fields, uuid, options}
// Note: encoding positions are not copied over because generally they should align with the
// ordering of field indices. Otherwise, problems may occur when encoding/decoding Rows of
// this schema.
Schema sortedSchema =
this.fields.stream()
.sorted(Comparator.comparing(Field::getName))
.map(
field -> {
FieldType innerType = field.getType();
if (innerType.getRowSchema() != null) {
Schema innerSortedSchema = innerType.getRowSchema().sorted();
innerType = innerType.toBuilder().setRowSchema(innerSortedSchema).build();
return field.toBuilder().setType(innerType).build();
}
return field;
})
.collect(Schema.toSchema())
.withOptions(getOptions());
sortedSchema.setUUID(getUUID());

return sortedSchema;
// Create a new schema and copy over the appropriate Schema object attributes: {fields, options}
// Note: uuid is not copied as the Schema field ordering is changed. encoding positions are not
// copied over because generally they should align with the ordering of field indices.
// Otherwise, problems may occur when encoding/decoding Rows of this schema.
return this.fields.stream()
.sorted(Comparator.comparing(Field::getName))
.map(
field -> {
FieldType innerType = field.getType();
if (innerType.getRowSchema() != null) {
Schema innerSortedSchema = innerType.getRowSchema().sorted();
innerType = innerType.toBuilder().setRowSchema(innerSortedSchema).build();
return field.toBuilder().setType(innerType).build();
}
return field;
})
.collect(Schema.toSchema())
.withOptions(getOptions());
}

/** Returns a copy of the Schema with the options set. */
Expand Down Expand Up @@ -405,11 +409,14 @@ public boolean equals(@Nullable Object o) {
return false;
}
Schema other = (Schema) o;
// If both schemas have a UUID set, we can simply compare the UUIDs.
if (uuid != null && other.uuid != null) {
if (Objects.equals(uuid, other.uuid)) {
return true;
}
// If both schemas have a UUID set, we can short-circuit deep comparison if the
// UUIDs are equal.
if (uuid != null && other.uuid != null && Objects.equals(uuid, other.uuid)) {
return true;
}
// Utilize hash-code pre-calculation for cheap negative comparison.
if (this.hashCode != other.hashCode) {
return false;
}
return Objects.equals(fieldIndices, other.fieldIndices)
&& Objects.equals(getFields(), other.getFields())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,62 +115,80 @@ private static String getLogicalTypeUrn(String identifier) {
.build();

public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) {
String uuid = schema.getUUID() != null ? schema.getUUID().toString() : "";
return schemaToProto(schema, serializeLogicalType, true);
}

public static SchemaApi.Schema schemaToProto(
Schema schema, boolean serializeLogicalType, boolean serializeUUID) {
String uuid = schema.getUUID() != null && serializeUUID ? schema.getUUID().toString() : "";
SchemaApi.Schema.Builder builder = SchemaApi.Schema.newBuilder().setId(uuid);
for (Field field : schema.getFields()) {
SchemaApi.Field protoField =
fieldToProto(
field,
schema.indexOf(field.getName()),
schema.getEncodingPositions().get(field.getName()),
serializeLogicalType);
serializeLogicalType,
serializeUUID);
builder.addFields(protoField);
}
builder.addAllOptions(optionsToProto(schema.getOptions()));
return builder.build();
}

private static SchemaApi.Field fieldToProto(
Field field, int fieldId, int position, boolean serializeLogicalType) {
Field field, int fieldId, int position, boolean serializeLogicalType, boolean serializeUUID) {
return SchemaApi.Field.newBuilder()
.setName(field.getName())
.setDescription(field.getDescription())
.setType(fieldTypeToProto(field.getType(), serializeLogicalType))
.setType(fieldTypeToProto(field.getType(), serializeLogicalType, serializeUUID))
.setId(fieldId)
.setEncodingPosition(position)
.addAllOptions(optionsToProto(field.getOptions()))
.build();
}

@VisibleForTesting
static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean serializeLogicalType) {
static SchemaApi.FieldType fieldTypeToProto(
FieldType fieldType, boolean serializeLogicalType, boolean serializeUUID) {
SchemaApi.FieldType.Builder builder = SchemaApi.FieldType.newBuilder();
switch (fieldType.getTypeName()) {
case ROW:
builder.setRowType(
SchemaApi.RowType.newBuilder()
.setSchema(schemaToProto(fieldType.getRowSchema(), serializeLogicalType)));
.setSchema(
schemaToProto(fieldType.getRowSchema(), serializeLogicalType, serializeUUID)));
break;

case ARRAY:
builder.setArrayType(
SchemaApi.ArrayType.newBuilder()
.setElementType(
fieldTypeToProto(fieldType.getCollectionElementType(), serializeLogicalType)));
fieldTypeToProto(
fieldType.getCollectionElementType(),
serializeLogicalType,
serializeUUID)));
break;

case ITERABLE:
builder.setIterableType(
SchemaApi.IterableType.newBuilder()
.setElementType(
fieldTypeToProto(fieldType.getCollectionElementType(), serializeLogicalType)));
fieldTypeToProto(
fieldType.getCollectionElementType(),
serializeLogicalType,
serializeUUID)));
break;

case MAP:
builder.setMapType(
SchemaApi.MapType.newBuilder()
.setKeyType(fieldTypeToProto(fieldType.getMapKeyType(), serializeLogicalType))
.setValueType(fieldTypeToProto(fieldType.getMapValueType(), serializeLogicalType))
.setKeyType(
fieldTypeToProto(
fieldType.getMapKeyType(), serializeLogicalType, serializeUUID))
.setValueType(
fieldTypeToProto(
fieldType.getMapValueType(), serializeLogicalType, serializeUUID))
.build());
break;

Expand All @@ -186,12 +204,14 @@ static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean seriali
.setUrn(logicalType.getIdentifier())
.setPayload(ByteString.copyFrom(((UnknownLogicalType) logicalType).getPayload()))
.setRepresentation(
fieldTypeToProto(logicalType.getBaseType(), serializeLogicalType));
fieldTypeToProto(
logicalType.getBaseType(), serializeLogicalType, serializeUUID));

if (logicalType.getArgumentType() != null) {
logicalTypeBuilder
.setArgumentType(
fieldTypeToProto(logicalType.getArgumentType(), serializeLogicalType))
fieldTypeToProto(
logicalType.getArgumentType(), serializeLogicalType, serializeUUID))
.setArgument(
fieldValueToProto(logicalType.getArgumentType(), logicalType.getArgument()));
}
Expand All @@ -200,13 +220,15 @@ static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean seriali
logicalTypeBuilder =
SchemaApi.LogicalType.newBuilder()
.setRepresentation(
fieldTypeToProto(logicalType.getBaseType(), serializeLogicalType))
fieldTypeToProto(
logicalType.getBaseType(), serializeLogicalType, serializeUUID))
.setUrn(urn);
if (logicalType.getArgumentType() != null) {
logicalTypeBuilder =
logicalTypeBuilder
.setArgumentType(
fieldTypeToProto(logicalType.getArgumentType(), serializeLogicalType))
fieldTypeToProto(
logicalType.getArgumentType(), serializeLogicalType, serializeUUID))
.setArgument(
fieldValueToProto(
logicalType.getArgumentType(), logicalType.getArgument()));
Expand All @@ -226,7 +248,8 @@ static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean seriali
builder.setLogicalType(
SchemaApi.LogicalType.newBuilder()
.setUrn(URN_BEAM_LOGICAL_MILLIS_INSTANT)
.setRepresentation(fieldTypeToProto(FieldType.INT64, serializeLogicalType))
.setRepresentation(
fieldTypeToProto(FieldType.INT64, serializeLogicalType, serializeUUID))
.build());
break;
case DECIMAL:
Expand All @@ -235,7 +258,8 @@ static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean seriali
builder.setLogicalType(
SchemaApi.LogicalType.newBuilder()
.setUrn(URN_BEAM_LOGICAL_DECIMAL)
.setRepresentation(fieldTypeToProto(FieldType.BYTES, serializeLogicalType))
.setRepresentation(
fieldTypeToProto(FieldType.BYTES, serializeLogicalType, serializeUUID))
.build());
break;
case BYTE:
Expand Down Expand Up @@ -288,14 +312,14 @@ public static Schema schemaFromProto(SchemaApi.Schema protoSchema) {
Schema schema = builder.build();

Preconditions.checkState(encodingLocationMap.size() == schema.getFieldCount());
long dinstictEncodingPositions = encodingLocationMap.values().stream().distinct().count();
Preconditions.checkState(dinstictEncodingPositions <= schema.getFieldCount());
if (dinstictEncodingPositions < schema.getFieldCount() && schema.getFieldCount() > 0) {
long distinctEncodingPositions = encodingLocationMap.values().stream().distinct().count();
Preconditions.checkState(distinctEncodingPositions <= schema.getFieldCount());
if (distinctEncodingPositions < schema.getFieldCount() && schema.getFieldCount() > 0) {
// This means that encoding positions were not specified in the proto. Generally, we don't
// expect this to happen,
// but if it does happen, we expect none to be specified - in which case the should all be
// zero.
Preconditions.checkState(dinstictEncodingPositions == 1);
Preconditions.checkState(distinctEncodingPositions == 1);
} else if (protoSchema.getEncodingPositionsSet()) {
schema.setEncodingPositions(encodingLocationMap);
}
Expand Down Expand Up @@ -771,7 +795,8 @@ private static List<SchemaApi.Option> optionsToProto(Schema.Options options) {
protoOptions.add(
SchemaApi.Option.newBuilder()
.setName(name)
.setType(fieldTypeToProto(Objects.requireNonNull(options.getType(name)), false))
.setType(
fieldTypeToProto(Objects.requireNonNull(options.getType(name)), false, false))
.setValue(
fieldValueToProto(
Objects.requireNonNull(options.getType(name)), options.getValue(name)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ private OneOfType(List<Field> fields, @Nullable Map<String, Integer> enumMap) {
enumerationType = EnumerationType.create(enumValues);
}
oneOfSchema = Schema.builder().addFields(nullableFields).build();
schemaProtoRepresentation = SchemaTranslation.schemaToProto(oneOfSchema, false).toByteArray();
schemaProtoRepresentation =
SchemaTranslation.schemaToProto(oneOfSchema, false, false).toByteArray();
}

/** Create an {@link OneOfType} logical type. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void toAndFromProto() throws Exception {
public static class FromProtoToProtoTest {
@Parameters(name = "{index}: {0}")
public static Iterable<SchemaApi.Schema> data() {
ImmutableList.Builder<SchemaApi.Schema> listBuilder = ImmutableList.builder();
SchemaApi.Schema.Builder builder = SchemaApi.Schema.newBuilder();
// A go 'int'
builder.addFields(
Expand All @@ -232,6 +233,9 @@ public static Iterable<SchemaApi.Schema> data() {
.setId(0)
.setEncodingPosition(0)
.build());
SchemaApi.Schema singleFieldSchema = builder.build();
listBuilder.add(singleFieldSchema);

// A pickled python object
builder.addFields(
SchemaApi.Field.newBuilder()
Expand Down Expand Up @@ -294,21 +298,51 @@ public static Iterable<SchemaApi.Schema> data() {
.setId(2)
.setEncodingPosition(2)
.build());
SchemaApi.Schema unknownLogicalTypeSchema = builder.build();
SchemaApi.Schema multipleFieldSchema = builder.build();
listBuilder.add(multipleFieldSchema);

return ImmutableList.<SchemaApi.Schema>builder().add(unknownLogicalTypeSchema).build();
builder.clear();
builder.addFields(
SchemaApi.Field.newBuilder()
.setName("nested")
.setType(
SchemaApi.FieldType.newBuilder()
.setRowType(
SchemaApi.RowType.newBuilder().setSchema(singleFieldSchema).build())
.build())
.build());
SchemaApi.Schema nestedSchema = builder.build();
listBuilder.add(nestedSchema);

return listBuilder.build();
}

@Parameter(0)
public SchemaApi.Schema schemaProto;

private void clearIds(SchemaApi.Schema.Builder builder) {
builder.clearId();
for (SchemaApi.Field.Builder field : builder.getFieldsBuilderList()) {
if (field.hasType()
&& field.getType().hasRowType()
&& field.getType().getRowType().hasSchema()) {
clearIds(field.getTypeBuilder().getRowTypeBuilder().getSchemaBuilder());
}
}
}

@Test
public void fromProtoAndToProto() throws Exception {
Schema decodedSchema = SchemaTranslation.schemaFromProto(schemaProto);

SchemaApi.Schema reencodedSchemaProto = SchemaTranslation.schemaToProto(decodedSchema, true);
SchemaApi.Schema.Builder builder = reencodedSchemaProto.toBuilder();
clearIds(builder);
assertThat(builder.build(), equalTo(schemaProto));

assertThat(reencodedSchemaProto, equalTo(schemaProto));
SchemaApi.Schema reencodedSchemaProtoWithoutUUID =
SchemaTranslation.schemaToProto(decodedSchema, true, false);
assertThat(reencodedSchemaProtoWithoutUUID, equalTo(schemaProto));
}
}

Expand Down Expand Up @@ -432,8 +466,8 @@ public static Iterable<Schema.FieldType> data() {
public Schema.FieldType fieldType;

@Test
public void testLogicalTypeSerializeDeserilizeCorrectly() {
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true);
public void testLogicalTypeSerializeDeserializeCorrectly() {
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true, false);
Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto);

assertThat(
Expand All @@ -451,7 +485,7 @@ public void testLogicalTypeSerializeDeserilizeCorrectly() {

@Test
public void testLogicalTypeFromToProtoCorrectly() {
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, false);
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, false, false);
Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto);

if (STANDARD_LOGICAL_TYPES.containsKey(translated.getLogicalType().getIdentifier())) {
Expand Down
Loading
Loading