Skip to content

Commit

Permalink
Improve IcebergIO utils (#31958)
Browse files Browse the repository at this point in the history
* Improve Iceberg utils

* add documentation; clarify variable name

* fix kinks, add type tests
  • Loading branch information
ahmedabu98 authored Jul 31, 2024
1 parent 2824944 commit 2aefd5b
Show file tree
Hide file tree
Showing 14 changed files with 865 additions and 354 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public PCollection<Row> expand(PBegin input) {
.setCatalogConfig(getCatalogConfig())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(tableId)
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema()))
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
.build())));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,42 @@
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

class SchemaAndRowConversions {
/** Utilities for converting between Beam and Iceberg types. */
public class IcebergUtils {
// This is made public for users convenience, as many may have more experience working with
// Iceberg types.

private SchemaAndRowConversions() {}
private IcebergUtils() {}

static final Map<Schema.FieldType, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
ImmutableMap.<Schema.FieldType, Type>builder()
.put(Schema.FieldType.BOOLEAN, Types.BooleanType.get())
.put(Schema.FieldType.INT32, Types.IntegerType.get())
.put(Schema.FieldType.INT64, Types.LongType.get())
.put(Schema.FieldType.FLOAT, Types.FloatType.get())
.put(Schema.FieldType.DOUBLE, Types.DoubleType.get())
.put(Schema.FieldType.STRING, Types.StringType.get())
.put(Schema.FieldType.BYTES, Types.BinaryType.get())
private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
ImmutableMap.<Schema.TypeName, Type>builder()
.put(Schema.TypeName.BOOLEAN, Types.BooleanType.get())
.put(Schema.TypeName.INT32, Types.IntegerType.get())
.put(Schema.TypeName.INT64, Types.LongType.get())
.put(Schema.TypeName.FLOAT, Types.FloatType.get())
.put(Schema.TypeName.DOUBLE, Types.DoubleType.get())
.put(Schema.TypeName.STRING, Types.StringType.get())
.put(Schema.TypeName.BYTES, Types.BinaryType.get())
.build();

public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
switch (type.typeId()) {
case BOOLEAN:
return Schema.FieldType.BOOLEAN;
Expand Down Expand Up @@ -86,11 +92,12 @@ public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
throw new RuntimeException("Unrecognized IcebergIO Type");
}

public static Schema.Field icebergFieldToBeamField(final Types.NestedField field) {
private static Schema.Field icebergFieldToBeamField(final Types.NestedField field) {
return Schema.Field.of(field.name(), icebergTypeToBeamFieldType(field.type()))
.withNullable(field.isOptional());
}

/** Converts an Iceberg {@link org.apache.iceberg.Schema} to a Beam {@link Schema}. */
public static Schema icebergSchemaToBeamSchema(final org.apache.iceberg.Schema schema) {
Schema.Builder builder = Schema.builder();
for (Types.NestedField f : schema.columns()) {
Expand All @@ -99,36 +106,149 @@ public static Schema icebergSchemaToBeamSchema(final org.apache.iceberg.Schema s
return builder.build();
}

public static Schema icebergStructTypeToBeamSchema(final Types.StructType struct) {
private static Schema icebergStructTypeToBeamSchema(final Types.StructType struct) {
Schema.Builder builder = Schema.builder();
for (Types.NestedField f : struct.fields()) {
builder.addField(icebergFieldToBeamField(f));
}
return builder.build();
}

public static Types.NestedField beamFieldToIcebergField(int fieldId, final Schema.Field field) {
@Nullable Type icebergType = BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());
/**
* Represents an Object (in practice, either {@link Type} or {@link Types.NestedField}) along with
* the most recent (max) ID that has been used to build this object.
*
* <p>Iceberg Schema fields are required to have unique IDs. This includes unique IDs for a {@link
* Types.ListType}'s collection type, a {@link Types.MapType}'s key type and value type, and
* nested {@link Types.StructType}s. When constructing any of these types, we use multiple unique
* ID's for the type's components. The {@code maxId} in this object represents the most recent ID
* used after building this type. This helps signal that the next field we construct should have
* an ID greater than this one.
*/
@VisibleForTesting
static class ObjectAndMaxId<T> {
int maxId;
T object;

if (icebergType != null) {
return Types.NestedField.of(
fieldId, field.getType().getNullable(), field.getName(), icebergType);
} else {
return Types.NestedField.of(
fieldId, field.getType().getNullable(), field.getName(), Types.StringType.get());
ObjectAndMaxId(int id, T object) {
this.maxId = id;
this.object = object;
}
}

/**
* Given a Beam {@link Schema.FieldType} and an index, returns an Iceberg {@link Type} and the
* maximum index after building the Iceberg Type. This assumes the input index is already in use
* (usually by the parent {@link Types.NestedField}, and will start building the Iceberg type from
* index + 1.
*
* <p>Returns this information in an {@link ObjectAndMaxId<Type>} instance.
*/
@VisibleForTesting
static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
int fieldId, Schema.FieldType beamType) {
if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
return new ObjectAndMaxId<>(fieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
} else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE
// List ID needs to be unique from the NestedField that contains this ListType
int listId = fieldId + 1;
Schema.FieldType beamCollectionType =
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());

ObjectAndMaxId<Type> listInfo = beamFieldTypeToIcebergFieldType(listId, beamCollectionType);
Type icebergCollectionType = listInfo.object;

boolean elementTypeIsNullable =
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();

Type listType =
elementTypeIsNullable
? Types.ListType.ofOptional(listId, icebergCollectionType)
: Types.ListType.ofRequired(listId, icebergCollectionType);

return new ObjectAndMaxId<>(listInfo.maxId, listType);
} else if (beamType.getTypeName().isMapType()) { // MAP
// key and value IDs need to be unique from the NestedField that contains this MapType
int keyId = fieldId + 1;
int valueId = fieldId + 2;
int maxId = valueId;

Schema.FieldType beamKeyType = Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
ObjectAndMaxId<Type> keyInfo = beamFieldTypeToIcebergFieldType(maxId, beamKeyType);
Type icebergKeyType = keyInfo.object;
maxId = keyInfo.maxId;

Schema.FieldType beamValueType =
Preconditions.checkArgumentNotNull(beamType.getMapValueType());
ObjectAndMaxId<Type> valueInfo = beamFieldTypeToIcebergFieldType(maxId, beamValueType);
Type icebergValueType = valueInfo.object;
maxId = valueInfo.maxId;

Type mapType =
beamValueType.getNullable()
? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, icebergValueType)
: Types.MapType.ofRequired(keyId, valueId, icebergKeyType, icebergValueType);

return new ObjectAndMaxId<>(maxId, mapType);
} else if (beamType.getTypeName().isCompositeType()) { // ROW
// Nested field IDs need to be unique from the field that contains this StructType
int maxFieldId = fieldId;

Schema nestedSchema = Preconditions.checkArgumentNotNull(beamType.getRowSchema());
List<Types.NestedField> nestedFields = new ArrayList<>(nestedSchema.getFieldCount());
for (Schema.Field field : nestedSchema.getFields()) {
ObjectAndMaxId<Types.NestedField> converted = beamFieldToIcebergField(++maxFieldId, field);
Types.NestedField nestedField = converted.object;
nestedFields.add(nestedField);

maxFieldId = converted.maxId;
}

Type structType = Types.StructType.of(nestedFields);

return new ObjectAndMaxId<>(maxFieldId, structType);
}

return new ObjectAndMaxId<>(fieldId, Types.StringType.get());
}

private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
int fieldId, final Schema.Field field) {
ObjectAndMaxId<Type> typeAndMaxId = beamFieldTypeToIcebergFieldType(fieldId, field.getType());
Type icebergType = typeAndMaxId.object;
int id = typeAndMaxId.maxId;

Types.NestedField icebergField =
Types.NestedField.of(fieldId, field.getType().getNullable(), field.getName(), icebergType);

return new ObjectAndMaxId<>(id, icebergField);
}

/**
* Converts a Beam {@link Schema} to an Iceberg {@link org.apache.iceberg.Schema}.
*
* <p>The following unsupported Beam types will be defaulted to {@link Types.StringType}:
* <li>{@link Schema.TypeName.DECIMAL}
* <li>{@link Schema.TypeName.DATETIME}
* <li>{@link Schema.TypeName.LOGICAL_TYPE}
*/
public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) {
Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
int fieldId = 0;
for (Schema.Field f : schema.getFields()) {
fields[fieldId++] = beamFieldToIcebergField(fieldId, f);
int nextIcebergFieldId = 1;
for (int i = 0; i < schema.getFieldCount(); i++) {
Schema.Field beamField = schema.getField(i);
ObjectAndMaxId<Types.NestedField> fieldAndMaxId =
beamFieldToIcebergField(nextIcebergFieldId, beamField);
Types.NestedField field = fieldAndMaxId.object;
fields[i] = field;

nextIcebergFieldId = fieldAndMaxId.maxId + 1;
}
return new org.apache.iceberg.Schema(fields);
}

public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) {
/** Converts a Beam {@link Row} to an Iceberg {@link Record}. */
public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Row row) {
return copyRowIntoRecord(GenericRecord.create(schema), row);
}

Expand Down Expand Up @@ -191,13 +311,16 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row)));
break;
case LIST:
throw new UnsupportedOperationException("List fields are not yet supported.");
Optional.ofNullable(value.getArray(name)).ifPresent(list -> rec.setField(name, list));
break;
case MAP:
throw new UnsupportedOperationException("Map fields are not yet supported.");
Optional.ofNullable(value.getMap(name)).ifPresent(v -> rec.setField(name, v));
break;
}
}

public static Row recordToRow(Schema schema, Record record) {
/** Converts an Iceberg {@link Record} to a Beam {@link Row}. */
public static Row icebergRecordToBeamRow(Schema schema, Record record) {
Row.Builder rowBuilder = Row.withSchema(schema);
for (Schema.Field field : schema.getFields()) {
switch (field.getType().getTypeName()) {
Expand All @@ -221,48 +344,32 @@ public static Row recordToRow(Schema schema, Record record) {
long longValue = (long) record.getField(field.getName());
rowBuilder.addValue(longValue);
break;
case DECIMAL:
// Iceberg and Beam both use BigDecimal
rowBuilder.addValue(record.getField(field.getName()));
break;
case FLOAT:
// Iceberg and Beam both use float
rowBuilder.addValue(record.getField(field.getName()));
break;
case DOUBLE:
// Iceberg and Beam both use double
rowBuilder.addValue(record.getField(field.getName()));
break;
case STRING:
// Iceberg and Beam both use String
case DECIMAL: // Iceberg and Beam both use BigDecimal
case FLOAT: // Iceberg and Beam both use float
case DOUBLE: // Iceberg and Beam both use double
case STRING: // Iceberg and Beam both use String
case BOOLEAN: // Iceberg and Beam both use String
case ARRAY:
case ITERABLE:
case MAP:
rowBuilder.addValue(record.getField(field.getName()));
break;
case DATETIME:
// Iceberg uses a long for millis; Beam uses joda time DateTime
long millis = (long) record.getField(field.getName());
rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC));
break;
case BOOLEAN:
// Iceberg and Beam both use String
rowBuilder.addValue(record.getField(field.getName()));
break;
case BYTES:
// Iceberg uses ByteBuffer; Beam uses byte[]
rowBuilder.addValue(((ByteBuffer) record.getField(field.getName())).array());
break;
case ARRAY:
throw new UnsupportedOperationException("Array fields are not yet supported.");
case ITERABLE:
throw new UnsupportedOperationException("Iterable fields are not yet supported.");
case MAP:
throw new UnsupportedOperationException("Map fields are not yet supported.");
case ROW:
Record nestedRecord = (Record) record.getField(field.getName());
Schema nestedSchema =
checkArgumentNotNull(
field.getType().getRowSchema(),
"Corrupted schema: Row type did not have associated nested schema.");
Row nestedRow = recordToRow(nestedSchema, nestedRecord);
Row nestedRow = icebergRecordToBeamRow(nestedSchema, nestedRecord);
rowBuilder.addValue(nestedRow);
break;
case LOGICAL_TYPE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.SchemaAndRowConversions.rowToRecord;
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;

import java.io.IOException;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -80,7 +80,7 @@ class RecordWriter {
}

public void write(Row row) {
Record record = rowToRecord(table.schema(), row);
Record record = beamRowToIcebergRecord(table.schema(), row);
icebergDataWriter.write(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private TableScan getTableScan() {
scanConfig
.getTable()
.newScan()
.project(SchemaAndRowConversions.beamSchemaToIcebergSchema(scanConfig.getSchema()));
.project(IcebergUtils.beamSchemaToIcebergSchema(scanConfig.getSchema()));

if (scanConfig.getFilter() != null) {
tableScan = tableScan.filter(scanConfig.getFilter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ScanTaskReader extends BoundedSource.BoundedReader<Row> {

public ScanTaskReader(ScanTaskSource source) {
this.source = source;
this.project = SchemaAndRowConversions.beamSchemaToIcebergSchema(source.getSchema());
this.project = IcebergUtils.beamSchemaToIcebergSchema(source.getSchema());
}

@Override
Expand Down Expand Up @@ -160,7 +160,7 @@ public Row getCurrent() throws NoSuchElementException {
if (current == null) {
throw new NoSuchElementException();
}
return SchemaAndRowConversions.recordToRow(source.getSchema(), current);
return IcebergUtils.icebergRecordToBeamRow(source.getSchema(), current);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ public void setUp() {
.addByteArrayField("bytes")
.build();

static final Schema ICEBERG_SCHEMA =
SchemaAndRowConversions.beamSchemaToIcebergSchema(BEAM_SCHEMA);
static final Schema ICEBERG_SCHEMA = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);

Map<String, Object> getValues(int num) {
String strNum = Integer.toString(num);
Expand Down Expand Up @@ -238,7 +237,7 @@ public void testWrite() {

List<Row> inputRows =
inputRecords.stream()
.map(record -> SchemaAndRowConversions.recordToRow(BEAM_SCHEMA, record))
.map(record -> IcebergUtils.icebergRecordToBeamRow(BEAM_SCHEMA, record))
.collect(Collectors.toList());

// Write with Beam
Expand Down
Loading

0 comments on commit 2aefd5b

Please sign in to comment.