Skip to content

Commit

Permalink
Fix Beam Schema to Iceberg Schema ID conversion logic (#32095)
Browse files Browse the repository at this point in the history
* fix iceberg schema ID logic

* trigger integration tests
  • Loading branch information
ahmedabu98 authored Aug 7, 2024
1 parent 828717a commit b54967e
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 148 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -115,113 +116,110 @@ private static Schema icebergStructTypeToBeamSchema(final Types.StructType struc
}

/**
* Represents an Object (in practice, either {@link Type} or {@link Types.NestedField}) along with
* the most recent (max) ID that has been used to build this object.
* Represents a {@link Type} and the most recent field ID used to build it.
*
* <p>Iceberg Schema fields are required to have unique IDs. This includes unique IDs for a {@link
* Types.ListType}'s collection type, a {@link Types.MapType}'s key type and value type, and
* nested {@link Types.StructType}s. When constructing any of these types, we use multiple unique
* ID's for the type's components. The {@code maxId} in this object represents the most recent ID
* used after building this type. This helps signal that the next field we construct should have
* an ID greater than this one.
* org.apache.iceberg.types.Type.NestedType}'s components (e.g. {@link Types.ListType}'s
* collection type, {@link Types.MapType}'s key type and value type, and {@link
* Types.StructType}'s nested fields). The {@code maxId} in this object represents the most recent
* ID used after building this type. This helps signal that the next {@link
* org.apache.iceberg.types.Type.NestedType} we construct should have an ID greater than this one.
*/
@VisibleForTesting
static class ObjectAndMaxId<T> {
static class TypeAndMaxId {
int maxId;
T object;
Type type;

ObjectAndMaxId(int id, T object) {
TypeAndMaxId(int id, Type object) {
this.maxId = id;
this.object = object;
this.type = object;
}
}

/**
* Given a Beam {@link Schema.FieldType} and an index, returns an Iceberg {@link Type} and the
* maximum index after building the Iceberg Type. This assumes the input index is already in use
* (usually by the parent {@link Types.NestedField}, and will start building the Iceberg type from
* index + 1.
* Takes a Beam {@link Schema.FieldType} and an index intended as a starting point for Iceberg
* {@link org.apache.iceberg.types.Type.NestedType}s. Returns an Iceberg {@link Type} and the
* maximum index after building that type.
*
* <p>Returns this information in an {@link ObjectAndMaxId<Type>} instance.
* <p>Returns this information in an {@link TypeAndMaxId} object.
*/
@VisibleForTesting
static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
int fieldId, Schema.FieldType beamType) {
static TypeAndMaxId beamFieldTypeToIcebergFieldType(
Schema.FieldType beamType, int nestedFieldId) {
if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
return new ObjectAndMaxId<>(fieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
// we don't use nested field ID for primitive types. decrement it so the caller can use it for
// other types.
return new TypeAndMaxId(
--nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
} else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE
// List ID needs to be unique from the NestedField that contains this ListType
int listId = fieldId + 1;
Schema.FieldType beamCollectionType =
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());

ObjectAndMaxId<Type> listInfo = beamFieldTypeToIcebergFieldType(listId, beamCollectionType);
Type icebergCollectionType = listInfo.object;
// nestedFieldId is reserved for the list's collection type.
// we increment here because further nested fields should use unique ID's
TypeAndMaxId listInfo =
beamFieldTypeToIcebergFieldType(beamCollectionType, nestedFieldId + 1);
Type icebergCollectionType = listInfo.type;

boolean elementTypeIsNullable =
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();

Type listType =
elementTypeIsNullable
? Types.ListType.ofOptional(listId, icebergCollectionType)
: Types.ListType.ofRequired(listId, icebergCollectionType);
? Types.ListType.ofOptional(nestedFieldId, icebergCollectionType)
: Types.ListType.ofRequired(nestedFieldId, icebergCollectionType);

return new ObjectAndMaxId<>(listInfo.maxId, listType);
return new TypeAndMaxId(listInfo.maxId, listType);
} else if (beamType.getTypeName().isMapType()) { // MAP
// key and value IDs need to be unique from the NestedField that contains this MapType
int keyId = fieldId + 1;
int valueId = fieldId + 2;
int maxId = valueId;
// key and value IDs need to be unique
int keyId = nestedFieldId;
int valueId = keyId + 1;

// nested field IDs should be unique
nestedFieldId = valueId + 1;
Schema.FieldType beamKeyType = Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
ObjectAndMaxId<Type> keyInfo = beamFieldTypeToIcebergFieldType(maxId, beamKeyType);
Type icebergKeyType = keyInfo.object;
maxId = keyInfo.maxId;
TypeAndMaxId keyInfo = beamFieldTypeToIcebergFieldType(beamKeyType, nestedFieldId);
Type icebergKeyType = keyInfo.type;

nestedFieldId = keyInfo.maxId + 1;
Schema.FieldType beamValueType =
Preconditions.checkArgumentNotNull(beamType.getMapValueType());
ObjectAndMaxId<Type> valueInfo = beamFieldTypeToIcebergFieldType(maxId, beamValueType);
Type icebergValueType = valueInfo.object;
maxId = valueInfo.maxId;
TypeAndMaxId valueInfo = beamFieldTypeToIcebergFieldType(beamValueType, nestedFieldId);
Type icebergValueType = valueInfo.type;

Type mapType =
beamValueType.getNullable()
? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, icebergValueType)
: Types.MapType.ofRequired(keyId, valueId, icebergKeyType, icebergValueType);

return new ObjectAndMaxId<>(maxId, mapType);
return new TypeAndMaxId(valueInfo.maxId, mapType);
} else if (beamType.getTypeName().isCompositeType()) { // ROW
// Nested field IDs need to be unique from the field that contains this StructType
int maxFieldId = fieldId;

Schema nestedSchema = Preconditions.checkArgumentNotNull(beamType.getRowSchema());
List<Types.NestedField> nestedFields = new ArrayList<>(nestedSchema.getFieldCount());
for (Schema.Field field : nestedSchema.getFields()) {
ObjectAndMaxId<Types.NestedField> converted = beamFieldToIcebergField(++maxFieldId, field);
Types.NestedField nestedField = converted.object;
nestedFields.add(nestedField);

maxFieldId = converted.maxId;
int icebergFieldId = nestedFieldId;
nestedFieldId = icebergFieldId + nestedSchema.getFieldCount();
for (Schema.Field beamField : nestedSchema.getFields()) {
TypeAndMaxId typeAndMaxId =
beamFieldTypeToIcebergFieldType(beamField.getType(), nestedFieldId);
Types.NestedField icebergField =
Types.NestedField.of(
icebergFieldId++,
beamField.getType().getNullable(),
beamField.getName(),
typeAndMaxId.type);

nestedFields.add(icebergField);
nestedFieldId = typeAndMaxId.maxId + 1;
}

Type structType = Types.StructType.of(nestedFields);

return new ObjectAndMaxId<>(maxFieldId, structType);
return new TypeAndMaxId(nestedFieldId - 1, structType);
}

return new ObjectAndMaxId<>(fieldId, Types.StringType.get());
}

private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
int fieldId, final Schema.Field field) {
ObjectAndMaxId<Type> typeAndMaxId = beamFieldTypeToIcebergFieldType(fieldId, field.getType());
Type icebergType = typeAndMaxId.object;
int id = typeAndMaxId.maxId;

Types.NestedField icebergField =
Types.NestedField.of(fieldId, field.getType().getNullable(), field.getName(), icebergType);

return new ObjectAndMaxId<>(id, icebergField);
return new TypeAndMaxId(nestedFieldId, Types.StringType.get());
}

/**
Expand All @@ -233,18 +231,23 @@ private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
* <li>{@link Schema.TypeName.LOGICAL_TYPE}
*/
public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) {
Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
int nextIcebergFieldId = 1;
for (int i = 0; i < schema.getFieldCount(); i++) {
Schema.Field beamField = schema.getField(i);
ObjectAndMaxId<Types.NestedField> fieldAndMaxId =
beamFieldToIcebergField(nextIcebergFieldId, beamField);
Types.NestedField field = fieldAndMaxId.object;
fields[i] = field;

nextIcebergFieldId = fieldAndMaxId.maxId + 1;
List<Types.NestedField> fields = new ArrayList<>(schema.getFieldCount());
int nestedFieldId = schema.getFieldCount() + 1;
int icebergFieldId = 1;
for (Schema.Field beamField : schema.getFields()) {
TypeAndMaxId typeAndMaxId =
beamFieldTypeToIcebergFieldType(beamField.getType(), nestedFieldId);
Types.NestedField icebergField =
Types.NestedField.of(
icebergFieldId++,
beamField.getType().getNullable(),
beamField.getName(),
typeAndMaxId.type);

fields.add(icebergField);
nestedFieldId = typeAndMaxId.maxId + 1;
}
return new org.apache.iceberg.Schema(fields);
return new org.apache.iceberg.Schema(fields.toArray(new Types.NestedField[fields.size()]));
}

/** Converts a Beam {@link Row} to an Iceberg {@link Record}. */
Expand Down Expand Up @@ -323,27 +326,21 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
public static Row icebergRecordToBeamRow(Schema schema, Record record) {
Row.Builder rowBuilder = Row.withSchema(schema);
for (Schema.Field field : schema.getFields()) {
boolean isNullable = field.getType().getNullable();
@Nullable Object icebergValue = record.getField(field.getName());
if (icebergValue == null) {
if (isNullable) {
rowBuilder.addValue(null);
continue;
}
throw new RuntimeException(
String.format("Received null value for required field '%s'.", field.getName()));
}
switch (field.getType().getTypeName()) {
case BYTE:
// I guess allow anything we can cast here
byte byteValue = (byte) record.getField(field.getName());
rowBuilder.addValue(byteValue);
break;
case INT16:
// I guess allow anything we can cast here
short shortValue = (short) record.getField(field.getName());
rowBuilder.addValue(shortValue);
break;
case INT32:
// I guess allow anything we can cast here
int intValue = (int) record.getField(field.getName());
rowBuilder.addValue(intValue);
break;
case INT64:
// I guess allow anything we can cast here
long longValue = (long) record.getField(field.getName());
rowBuilder.addValue(longValue);
break;
case DECIMAL: // Iceberg and Beam both use BigDecimal
case FLOAT: // Iceberg and Beam both use float
case DOUBLE: // Iceberg and Beam both use double
Expand All @@ -352,29 +349,31 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
case ARRAY:
case ITERABLE:
case MAP:
rowBuilder.addValue(record.getField(field.getName()));
rowBuilder.addValue(icebergValue);
break;
case DATETIME:
// Iceberg uses a long for millis; Beam uses joda time DateTime
long millis = (long) record.getField(field.getName());
long millis = (long) icebergValue;
rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC));
break;
case BYTES:
// Iceberg uses ByteBuffer; Beam uses byte[]
rowBuilder.addValue(((ByteBuffer) record.getField(field.getName())).array());
rowBuilder.addValue(((ByteBuffer) icebergValue).array());
break;
case ROW:
Record nestedRecord = (Record) record.getField(field.getName());
Record nestedRecord = (Record) icebergValue;
Schema nestedSchema =
checkArgumentNotNull(
field.getType().getRowSchema(),
"Corrupted schema: Row type did not have associated nested schema.");
Row nestedRow = icebergRecordToBeamRow(nestedSchema, nestedRecord);
rowBuilder.addValue(nestedRow);
rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord));
break;
case LOGICAL_TYPE:
throw new UnsupportedOperationException(
"Cannot convert iceberg field to Beam logical type");
default:
throw new UnsupportedOperationException(
"Unsupported Beam type: " + field.getType().getTypeName());
}
}
return rowBuilder.build();
Expand Down
Loading

0 comments on commit b54967e

Please sign in to comment.