From 2aefd5b9c0c8a4dcb577d367d859455e4a93ec14 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Wed, 31 Jul 2024 14:25:14 -0400 Subject: [PATCH] Improve IcebergIO utils (#31958) * Improve Iceberg utils * add documentation; clarify variable name * fix kinks, add type tests --- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 2 +- ...dRowConversions.java => IcebergUtils.java} | 217 ++++-- .../beam/sdk/io/iceberg/RecordWriter.java | 4 +- .../beam/sdk/io/iceberg/ScanSource.java | 2 +- .../beam/sdk/io/iceberg/ScanTaskReader.java | 4 +- .../beam/sdk/io/iceberg/IcebergIOIT.java | 5 +- .../sdk/io/iceberg/IcebergIOReadTest.java | 8 +- .../sdk/io/iceberg/IcebergIOWriteTest.java | 12 +- ...cebergReadSchemaTransformProviderTest.java | 8 +- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 676 ++++++++++++++++++ ...ebergWriteSchemaTransformProviderTest.java | 5 +- .../beam/sdk/io/iceberg/ScanSourceTest.java | 6 +- .../iceberg/SchemaAndRowConversionsTest.java | 268 ------- .../beam/sdk/io/iceberg/TestFixtures.java | 2 +- 14 files changed, 865 insertions(+), 354 deletions(-) rename sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/{SchemaAndRowConversions.java => IcebergUtils.java} (50%) create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java delete mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 50e0ea8b63d1..c3c1da7c7885 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -137,7 +137,7 @@ public PCollection expand(PBegin input) { .setCatalogConfig(getCatalogConfig()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(tableId) - .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema())) + .setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema())) .build()))); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java similarity index 50% rename from sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java rename to sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index e1a8685614f5..a2f84e6475c9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -20,36 +20,42 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -class SchemaAndRowConversions { +/** Utilities for converting between Beam and Iceberg types. */ +public class IcebergUtils { + // This is made public for users convenience, as many may have more experience working with + // Iceberg types. - private SchemaAndRowConversions() {} + private IcebergUtils() {} - static final Map BEAM_TYPES_TO_ICEBERG_TYPES = - ImmutableMap.builder() - .put(Schema.FieldType.BOOLEAN, Types.BooleanType.get()) - .put(Schema.FieldType.INT32, Types.IntegerType.get()) - .put(Schema.FieldType.INT64, Types.LongType.get()) - .put(Schema.FieldType.FLOAT, Types.FloatType.get()) - .put(Schema.FieldType.DOUBLE, Types.DoubleType.get()) - .put(Schema.FieldType.STRING, Types.StringType.get()) - .put(Schema.FieldType.BYTES, Types.BinaryType.get()) + private static final Map BEAM_TYPES_TO_ICEBERG_TYPES = + ImmutableMap.builder() + .put(Schema.TypeName.BOOLEAN, Types.BooleanType.get()) + .put(Schema.TypeName.INT32, Types.IntegerType.get()) + .put(Schema.TypeName.INT64, Types.LongType.get()) + .put(Schema.TypeName.FLOAT, Types.FloatType.get()) + .put(Schema.TypeName.DOUBLE, Types.DoubleType.get()) + .put(Schema.TypeName.STRING, Types.StringType.get()) + .put(Schema.TypeName.BYTES, Types.BinaryType.get()) .build(); - public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { + private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { switch (type.typeId()) { case BOOLEAN: return Schema.FieldType.BOOLEAN; @@ -86,11 +92,12 @@ public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { throw new RuntimeException("Unrecognized IcebergIO Type"); } - public static Schema.Field icebergFieldToBeamField(final Types.NestedField field) { + private static Schema.Field icebergFieldToBeamField(final Types.NestedField field) { return Schema.Field.of(field.name(), icebergTypeToBeamFieldType(field.type())) .withNullable(field.isOptional()); } + /** Converts an Iceberg {@link org.apache.iceberg.Schema} to a Beam {@link Schema}. */ public static Schema icebergSchemaToBeamSchema(final org.apache.iceberg.Schema schema) { Schema.Builder builder = Schema.builder(); for (Types.NestedField f : schema.columns()) { @@ -99,7 +106,7 @@ public static Schema icebergSchemaToBeamSchema(final org.apache.iceberg.Schema s return builder.build(); } - public static Schema icebergStructTypeToBeamSchema(final Types.StructType struct) { + private static Schema icebergStructTypeToBeamSchema(final Types.StructType struct) { Schema.Builder builder = Schema.builder(); for (Types.NestedField f : struct.fields()) { builder.addField(icebergFieldToBeamField(f)); @@ -107,28 +114,141 @@ public static Schema icebergStructTypeToBeamSchema(final Types.StructType struct return builder.build(); } - public static Types.NestedField beamFieldToIcebergField(int fieldId, final Schema.Field field) { - @Nullable Type icebergType = BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType()); + /** + * Represents an Object (in practice, either {@link Type} or {@link Types.NestedField}) along with + * the most recent (max) ID that has been used to build this object. + * + *

Iceberg Schema fields are required to have unique IDs. This includes unique IDs for a {@link + * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and value type, and + * nested {@link Types.StructType}s. When constructing any of these types, we use multiple unique + * ID's for the type's components. The {@code maxId} in this object represents the most recent ID + * used after building this type. This helps signal that the next field we construct should have + * an ID greater than this one. + */ + @VisibleForTesting + static class ObjectAndMaxId { + int maxId; + T object; - if (icebergType != null) { - return Types.NestedField.of( - fieldId, field.getType().getNullable(), field.getName(), icebergType); - } else { - return Types.NestedField.of( - fieldId, field.getType().getNullable(), field.getName(), Types.StringType.get()); + ObjectAndMaxId(int id, T object) { + this.maxId = id; + this.object = object; } } + /** + * Given a Beam {@link Schema.FieldType} and an index, returns an Iceberg {@link Type} and the + * maximum index after building the Iceberg Type. This assumes the input index is already in use + * (usually by the parent {@link Types.NestedField}, and will start building the Iceberg type from + * index + 1. + * + *

Returns this information in an {@link ObjectAndMaxId} instance. + */ + @VisibleForTesting + static ObjectAndMaxId beamFieldTypeToIcebergFieldType( + int fieldId, Schema.FieldType beamType) { + if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) { + return new ObjectAndMaxId<>(fieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName())); + } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE + // List ID needs to be unique from the NestedField that contains this ListType + int listId = fieldId + 1; + Schema.FieldType beamCollectionType = + Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()); + + ObjectAndMaxId listInfo = beamFieldTypeToIcebergFieldType(listId, beamCollectionType); + Type icebergCollectionType = listInfo.object; + + boolean elementTypeIsNullable = + Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable(); + + Type listType = + elementTypeIsNullable + ? Types.ListType.ofOptional(listId, icebergCollectionType) + : Types.ListType.ofRequired(listId, icebergCollectionType); + + return new ObjectAndMaxId<>(listInfo.maxId, listType); + } else if (beamType.getTypeName().isMapType()) { // MAP + // key and value IDs need to be unique from the NestedField that contains this MapType + int keyId = fieldId + 1; + int valueId = fieldId + 2; + int maxId = valueId; + + Schema.FieldType beamKeyType = Preconditions.checkArgumentNotNull(beamType.getMapKeyType()); + ObjectAndMaxId keyInfo = beamFieldTypeToIcebergFieldType(maxId, beamKeyType); + Type icebergKeyType = keyInfo.object; + maxId = keyInfo.maxId; + + Schema.FieldType beamValueType = + Preconditions.checkArgumentNotNull(beamType.getMapValueType()); + ObjectAndMaxId valueInfo = beamFieldTypeToIcebergFieldType(maxId, beamValueType); + Type icebergValueType = valueInfo.object; + maxId = valueInfo.maxId; + + Type mapType = + beamValueType.getNullable() + ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, icebergValueType) + : Types.MapType.ofRequired(keyId, valueId, icebergKeyType, icebergValueType); + + return new ObjectAndMaxId<>(maxId, mapType); + } else if (beamType.getTypeName().isCompositeType()) { // ROW + // Nested field IDs need to be unique from the field that contains this StructType + int maxFieldId = fieldId; + + Schema nestedSchema = Preconditions.checkArgumentNotNull(beamType.getRowSchema()); + List nestedFields = new ArrayList<>(nestedSchema.getFieldCount()); + for (Schema.Field field : nestedSchema.getFields()) { + ObjectAndMaxId converted = beamFieldToIcebergField(++maxFieldId, field); + Types.NestedField nestedField = converted.object; + nestedFields.add(nestedField); + + maxFieldId = converted.maxId; + } + + Type structType = Types.StructType.of(nestedFields); + + return new ObjectAndMaxId<>(maxFieldId, structType); + } + + return new ObjectAndMaxId<>(fieldId, Types.StringType.get()); + } + + private static ObjectAndMaxId beamFieldToIcebergField( + int fieldId, final Schema.Field field) { + ObjectAndMaxId typeAndMaxId = beamFieldTypeToIcebergFieldType(fieldId, field.getType()); + Type icebergType = typeAndMaxId.object; + int id = typeAndMaxId.maxId; + + Types.NestedField icebergField = + Types.NestedField.of(fieldId, field.getType().getNullable(), field.getName(), icebergType); + + return new ObjectAndMaxId<>(id, icebergField); + } + + /** + * Converts a Beam {@link Schema} to an Iceberg {@link org.apache.iceberg.Schema}. + * + *

The following unsupported Beam types will be defaulted to {@link Types.StringType}: + *

  • {@link Schema.TypeName.DECIMAL} + *
  • {@link Schema.TypeName.DATETIME} + *
  • {@link Schema.TypeName.LOGICAL_TYPE} + */ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) { Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()]; - int fieldId = 0; - for (Schema.Field f : schema.getFields()) { - fields[fieldId++] = beamFieldToIcebergField(fieldId, f); + int nextIcebergFieldId = 1; + for (int i = 0; i < schema.getFieldCount(); i++) { + Schema.Field beamField = schema.getField(i); + ObjectAndMaxId fieldAndMaxId = + beamFieldToIcebergField(nextIcebergFieldId, beamField); + Types.NestedField field = fieldAndMaxId.object; + fields[i] = field; + + nextIcebergFieldId = fieldAndMaxId.maxId + 1; } return new org.apache.iceberg.Schema(fields); } - public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) { + /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */ + public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Row row) { return copyRowIntoRecord(GenericRecord.create(schema), row); } @@ -191,13 +311,16 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row))); break; case LIST: - throw new UnsupportedOperationException("List fields are not yet supported."); + Optional.ofNullable(value.getArray(name)).ifPresent(list -> rec.setField(name, list)); + break; case MAP: - throw new UnsupportedOperationException("Map fields are not yet supported."); + Optional.ofNullable(value.getMap(name)).ifPresent(v -> rec.setField(name, v)); + break; } } - public static Row recordToRow(Schema schema, Record record) { + /** Converts an Iceberg {@link Record} to a Beam {@link Row}. */ + public static Row icebergRecordToBeamRow(Schema schema, Record record) { Row.Builder rowBuilder = Row.withSchema(schema); for (Schema.Field field : schema.getFields()) { switch (field.getType().getTypeName()) { @@ -221,20 +344,14 @@ public static Row recordToRow(Schema schema, Record record) { long longValue = (long) record.getField(field.getName()); rowBuilder.addValue(longValue); break; - case DECIMAL: - // Iceberg and Beam both use BigDecimal - rowBuilder.addValue(record.getField(field.getName())); - break; - case FLOAT: - // Iceberg and Beam both use float - rowBuilder.addValue(record.getField(field.getName())); - break; - case DOUBLE: - // Iceberg and Beam both use double - rowBuilder.addValue(record.getField(field.getName())); - break; - case STRING: - // Iceberg and Beam both use String + case DECIMAL: // Iceberg and Beam both use BigDecimal + case FLOAT: // Iceberg and Beam both use float + case DOUBLE: // Iceberg and Beam both use double + case STRING: // Iceberg and Beam both use String + case BOOLEAN: // Iceberg and Beam both use String + case ARRAY: + case ITERABLE: + case MAP: rowBuilder.addValue(record.getField(field.getName())); break; case DATETIME: @@ -242,27 +359,17 @@ public static Row recordToRow(Schema schema, Record record) { long millis = (long) record.getField(field.getName()); rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC)); break; - case BOOLEAN: - // Iceberg and Beam both use String - rowBuilder.addValue(record.getField(field.getName())); - break; case BYTES: // Iceberg uses ByteBuffer; Beam uses byte[] rowBuilder.addValue(((ByteBuffer) record.getField(field.getName())).array()); break; - case ARRAY: - throw new UnsupportedOperationException("Array fields are not yet supported."); - case ITERABLE: - throw new UnsupportedOperationException("Iterable fields are not yet supported."); - case MAP: - throw new UnsupportedOperationException("Map fields are not yet supported."); case ROW: Record nestedRecord = (Record) record.getField(field.getName()); Schema nestedSchema = checkArgumentNotNull( field.getType().getRowSchema(), "Corrupted schema: Row type did not have associated nested schema."); - Row nestedRow = recordToRow(nestedSchema, nestedRecord); + Row nestedRow = icebergRecordToBeamRow(nestedSchema, nestedRecord); rowBuilder.addValue(nestedRow); break; case LOGICAL_TYPE: diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 859310bdcecb..d7212783d1b6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.io.iceberg.SchemaAndRowConversions.rowToRecord; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord; import java.io.IOException; import org.apache.beam.sdk.values.Row; @@ -80,7 +80,7 @@ class RecordWriter { } public void write(Row row) { - Record record = rowToRecord(table.schema(), row); + Record record = beamRowToIcebergRecord(table.schema(), row); icebergDataWriter.write(record); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java index ed2f2eda767e..ff2aa0833481 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java @@ -50,7 +50,7 @@ private TableScan getTableScan() { scanConfig .getTable() .newScan() - .project(SchemaAndRowConversions.beamSchemaToIcebergSchema(scanConfig.getSchema())); + .project(IcebergUtils.beamSchemaToIcebergSchema(scanConfig.getSchema())); if (scanConfig.getFilter() != null) { tableScan = tableScan.filter(scanConfig.getFilter()); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index 52e6d60c1fbd..b7cb42b2eacb 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -60,7 +60,7 @@ class ScanTaskReader extends BoundedSource.BoundedReader { public ScanTaskReader(ScanTaskSource source) { this.source = source; - this.project = SchemaAndRowConversions.beamSchemaToIcebergSchema(source.getSchema()); + this.project = IcebergUtils.beamSchemaToIcebergSchema(source.getSchema()); } @Override @@ -160,7 +160,7 @@ public Row getCurrent() throws NoSuchElementException { if (current == null) { throw new NoSuchElementException(); } - return SchemaAndRowConversions.recordToRow(source.getSchema(), current); + return IcebergUtils.icebergRecordToBeamRow(source.getSchema(), current); } @Override diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 1c5686bfde91..3a169eeb40da 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -134,8 +134,7 @@ public void setUp() { .addByteArrayField("bytes") .build(); - static final Schema ICEBERG_SCHEMA = - SchemaAndRowConversions.beamSchemaToIcebergSchema(BEAM_SCHEMA); + static final Schema ICEBERG_SCHEMA = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); Map getValues(int num) { String strNum = Integer.toString(num); @@ -238,7 +237,7 @@ public void testWrite() { List inputRows = inputRecords.stream() - .map(record -> SchemaAndRowConversions.recordToRow(BEAM_SCHEMA, record)) + .map(record -> IcebergUtils.icebergRecordToBeamRow(BEAM_SCHEMA, record)) .collect(Collectors.toList()); // Write with Beam diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index d6db3f689117..3f31073b4448 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -70,7 +70,7 @@ public void testSimpleScan() throws Exception { TableIdentifier tableId = TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); - final Schema schema = SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); simpleTable .newFastAppend() @@ -91,7 +91,7 @@ public void testSimpleScan() throws Exception { TestFixtures.FILE2SNAPSHOT1, TestFixtures.FILE3SNAPSHOT1) .flatMap(List::stream) - .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) + .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record)) .collect(Collectors.toList()); Properties props = new Properties(); @@ -105,9 +105,7 @@ public void testSimpleScan() throws Exception { testPipeline .apply(IcebergIO.readRows(catalogConfig).from(tableId)) .apply(ParDo.of(new PrintRow())) - .setCoder( - RowCoder.of( - SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))); + .setCoder(RowCoder.of(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))); PAssert.that(output) .satisfies( diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index e0a584ec9da9..02213c45e075 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.io.iceberg.SchemaAndRowConversions.rowToRecord; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord; import static org.hamcrest.MatcherAssert.assertThat; import java.io.Serializable; @@ -85,7 +85,7 @@ public void testSimpleAppend() throws Exception { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) - .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId)); LOG.info("Executing pipeline"); @@ -152,7 +152,7 @@ public IcebergDestination instantiateDestination(Row dest) { TestFixtures.FILE1SNAPSHOT1, TestFixtures.FILE1SNAPSHOT2, TestFixtures.FILE1SNAPSHOT3)))) - .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .apply("Append To Table", IcebergIO.writeRows(catalog).to(dynamicDestinations)); LOG.info("Executing pipeline"); @@ -235,7 +235,7 @@ public IcebergDestination instantiateDestination(Row dest) { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(elements))) - .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .apply("Append To Table", IcebergIO.writeRows(catalog).to(dynamicDestinations)); LOG.info("Executing pipeline"); @@ -262,9 +262,9 @@ public void testIdempotentCommit() throws Exception { // Create a table and add records to it. Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); Record record = - rowToRecord( + beamRowToIcebergRecord( table.schema(), - Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + Row.withSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .addValues(42L, "bizzle") .build()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java index bc15021fa2b0..effb5cc4838e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java @@ -73,7 +73,7 @@ public void testSimpleScan() throws Exception { TableIdentifier tableId = TableIdentifier.parse(identifier); Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); - final Schema schema = SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); simpleTable .newFastAppend() @@ -94,7 +94,7 @@ public void testSimpleScan() throws Exception { TestFixtures.FILE2SNAPSHOT1, TestFixtures.FILE3SNAPSHOT1) .flatMap(List::stream) - .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) + .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record)) .collect(Collectors.toList()); Map properties = new HashMap<>(); @@ -129,7 +129,7 @@ public void testReadUsingManagedTransform() throws Exception { TableIdentifier tableId = TableIdentifier.parse(identifier); Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); - final Schema schema = SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); simpleTable .newFastAppend() @@ -150,7 +150,7 @@ public void testReadUsingManagedTransform() throws Exception { TestFixtures.FILE2SNAPSHOT1, TestFixtures.FILE3SNAPSHOT1) .flatMap(List::stream) - .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) + .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record)) .collect(Collectors.toList()); String yamlConfig = diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java new file mode 100644 index 000000000000..c4da0b22f4d9 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -0,0 +1,676 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.ObjectAndMaxId; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(Enclosed.class) +public class IcebergUtilsTest { + + @RunWith(JUnit4.class) + public static class RowToRecordTests { + /** + * Checks a value that when converted to Iceberg type is the same value when interpreted in + * Java. + */ + private void checkRowValueToRecordValue( + Schema.FieldType sourceType, Type destType, Object value) { + checkRowValueToRecordValue(sourceType, value, destType, value); + } + + private void checkRowValueToRecordValue( + Schema.FieldType sourceType, Object sourceValue, Type destType, Object destValue) { + Schema beamSchema = Schema.of(Schema.Field.of("v", sourceType)); + Row row = Row.withSchema(beamSchema).addValues(sourceValue).build(); + + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema(required(0, "v", destType)); + Record record = IcebergUtils.beamRowToIcebergRecord(icebergSchema, row); + + assertThat(record.getField("v"), equalTo(destValue)); + } + + @Test + public void testBoolean() { + checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), true); + checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), false); + } + + @Test + public void testInteger() { + checkRowValueToRecordValue(Schema.FieldType.INT32, Types.IntegerType.get(), -13); + checkRowValueToRecordValue(Schema.FieldType.INT32, Types.IntegerType.get(), 42); + checkRowValueToRecordValue(Schema.FieldType.INT32, Types.IntegerType.get(), 0); + } + + @Test + public void testLong() { + checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 13L); + checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 42L); + } + + @Test + public void testFloat() { + checkRowValueToRecordValue(Schema.FieldType.FLOAT, Types.FloatType.get(), 3.14159f); + checkRowValueToRecordValue(Schema.FieldType.FLOAT, Types.FloatType.get(), 42.0f); + } + + @Test + public void testDouble() { + checkRowValueToRecordValue(Schema.FieldType.DOUBLE, Types.DoubleType.get(), 3.14159); + } + + @Test + public void testDate() {} + + @Test + public void testTime() {} + + @Test + public void testTimestamp() { + DateTime dateTime = + new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); + + checkRowValueToRecordValue( + Schema.FieldType.DATETIME, + dateTime.toInstant(), + Types.TimestampType.withoutZone(), + dateTime.getMillis()); + } + + @Test + public void testFixed() {} + + @Test + public void testBinary() { + byte[] bytes = new byte[] {1, 2, 3, 4}; + checkRowValueToRecordValue( + Schema.FieldType.BYTES, bytes, Types.BinaryType.get(), ByteBuffer.wrap(bytes)); + } + + @Test + public void testDecimal() { + BigDecimal num = BigDecimal.valueOf(123.456); + + checkRowValueToRecordValue(Schema.FieldType.DECIMAL, Types.DecimalType.of(6, 3), num); + } + + @Test + public void testStruct() { + Schema schema = Schema.builder().addStringField("nested_str").build(); + Row beamRow = Row.withSchema(schema).addValue("str_value").build(); + + Types.NestedField nestedFieldType = required(1, "nested_str", Types.StringType.get()); + GenericRecord icebergRow = + GenericRecord.create(new org.apache.iceberg.Schema(nestedFieldType)); + icebergRow.setField("nested_str", "str_value"); + + checkRowValueToRecordValue( + Schema.FieldType.row(schema), beamRow, Types.StructType.of(nestedFieldType), icebergRow); + } + + @Test + public void testMap() { + Map map = + ImmutableMap.builder().put("a", 123).put("b", 456).put("c", 789).build(); + + checkRowValueToRecordValue( + Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.INT32), + Types.MapType.ofRequired(1, 2, Types.StringType.get(), Types.IntegerType.get()), + map); + } + + @Test + public void testList() { + List list = Arrays.asList("abc", "xyz", "123", "foo", "bar"); + + checkRowValueToRecordValue( + Schema.FieldType.array(Schema.FieldType.STRING), + Types.ListType.ofRequired(1, Types.StringType.get()), + list); + } + } + + @RunWith(JUnit4.class) + public static class RecordToRowTests { + private void checkRecordValueToRowValue( + Type sourceType, Schema.FieldType destType, Object value) { + checkRecordValueToRowValue(sourceType, value, destType, value); + } + + private void checkRecordValueToRowValue( + Type sourceType, Object sourceValue, Schema.FieldType destType, Object destValue) { + Schema beamSchema = Schema.of(Schema.Field.of("v", destType)); + + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema(required(0, "v", sourceType)); + Record record = GenericRecord.create(icebergSchema); + record.setField("v", sourceValue); + + Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record); + + assertThat(row.getBaseValue("v"), equalTo(destValue)); + } + + @Test + public void testBoolean() { + checkRecordValueToRowValue(Types.BooleanType.get(), Schema.FieldType.BOOLEAN, true); + checkRecordValueToRowValue(Types.BooleanType.get(), Schema.FieldType.BOOLEAN, false); + } + + @Test + public void testInteger() { + checkRecordValueToRowValue(Types.IntegerType.get(), Schema.FieldType.INT32, -13); + checkRecordValueToRowValue(Types.IntegerType.get(), Schema.FieldType.INT32, 42); + checkRecordValueToRowValue(Types.IntegerType.get(), Schema.FieldType.INT32, 0); + } + + @Test + public void testLong() { + checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 13L); + checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 42L); + } + + @Test + public void testFloat() { + checkRecordValueToRowValue(Types.FloatType.get(), Schema.FieldType.FLOAT, 3.14159f); + checkRecordValueToRowValue(Types.FloatType.get(), Schema.FieldType.FLOAT, 42.0f); + } + + @Test + public void testDouble() { + checkRecordValueToRowValue(Types.DoubleType.get(), Schema.FieldType.DOUBLE, 3.14159); + } + + @Test + public void testDate() {} + + @Test + public void testTime() {} + + @Test + public void testTimestamp() { + DateTime dateTime = + new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); + + checkRecordValueToRowValue( + Types.TimestampType.withoutZone(), + dateTime.getMillis(), + Schema.FieldType.DATETIME, + dateTime.toInstant()); + } + + @Test + public void testFixed() {} + + @Test + public void testBinary() { + byte[] bytes = new byte[] {1, 2, 3, 4}; + checkRecordValueToRowValue( + Types.BinaryType.get(), ByteBuffer.wrap(bytes), Schema.FieldType.BYTES, bytes); + } + + @Test + public void testDecimal() { + BigDecimal num = BigDecimal.valueOf(123.456); + + checkRecordValueToRowValue(Types.DecimalType.of(6, 3), Schema.FieldType.DECIMAL, num); + } + + @Test + public void testStruct() { + Schema schema = Schema.builder().addStringField("nested_str").build(); + Row beamRow = Row.withSchema(schema).addValue("str_value").build(); + + Types.NestedField nestedFieldType = required(1, "nested_str", Types.StringType.get()); + GenericRecord icebergRow = + GenericRecord.create(new org.apache.iceberg.Schema(nestedFieldType)); + icebergRow.setField("nested_str", "str_value"); + + checkRecordValueToRowValue( + Types.StructType.of(nestedFieldType), icebergRow, Schema.FieldType.row(schema), beamRow); + } + + @Test + public void testMap() { + Map map = + ImmutableMap.builder().put("a", 123).put("b", 456).put("c", 789).build(); + + checkRecordValueToRowValue( + Types.MapType.ofRequired(1, 2, Types.StringType.get(), Types.IntegerType.get()), + Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.INT32), + map); + } + + @Test + public void testList() { + List list = Arrays.asList("abc", "xyz", "123", "foo", "bar"); + + checkRecordValueToRowValue( + Types.ListType.ofRequired(1, Types.StringType.get()), + Schema.FieldType.iterable(Schema.FieldType.STRING), + list); + } + } + + @RunWith(JUnit4.class) + public static class SchemaTests { + private static class BeamFieldTypeTestCase { + final int icebergFieldId; + final Schema.FieldType beamType; + final int expectedMaxId; + final Type expectedIcebergType; + + BeamFieldTypeTestCase( + int icebergFieldId, + Schema.FieldType beamType, + int expectedMaxId, + Type expectedIcebergType) { + this.icebergFieldId = icebergFieldId; + this.beamType = beamType; + this.expectedMaxId = expectedMaxId; + this.expectedIcebergType = expectedIcebergType; + } + } + + private void checkTypes(List testCases) { + for (BeamFieldTypeTestCase testCase : testCases) { + ObjectAndMaxId ret = + beamFieldTypeToIcebergFieldType(testCase.icebergFieldId, testCase.beamType); + + assertEquals(testCase.expectedMaxId, ret.maxId); + checkEquals(testCase.expectedIcebergType, ret.object); + } + } + + private void checkEquals(Type expected, Type actual) { + if (expected.isListType() && actual.isListType()) { + Type nestedExpected = expected.asListType().elementType(); + Type nestedActual = actual.asListType().elementType(); + + assertEquals(nestedExpected.typeId(), nestedActual.typeId()); + checkEquals(nestedExpected, nestedActual); + } else { + assertEquals(expected, actual); + } + } + + @Test + public void testPrimitiveBeamFieldTypeToIcebergFieldType() { + List primitives = + Arrays.asList( + new BeamFieldTypeTestCase(1, Schema.FieldType.BOOLEAN, 1, Types.BooleanType.get()), + new BeamFieldTypeTestCase(3, Schema.FieldType.INT32, 3, Types.IntegerType.get()), + new BeamFieldTypeTestCase(6, Schema.FieldType.INT64, 6, Types.LongType.get()), + new BeamFieldTypeTestCase(10, Schema.FieldType.FLOAT, 10, Types.FloatType.get()), + new BeamFieldTypeTestCase(7, Schema.FieldType.DOUBLE, 7, Types.DoubleType.get()), + new BeamFieldTypeTestCase(11, Schema.FieldType.STRING, 11, Types.StringType.get()), + new BeamFieldTypeTestCase(15, Schema.FieldType.BYTES, 15, Types.BinaryType.get())); + + checkTypes(primitives); + } + + @Test + public void testArrayBeamFieldTypeToIcebergFieldType() { + // Iceberg sets one field ID for the List type itself and another field ID for the collection + // type. + List listTypes = + Arrays.asList( + new BeamFieldTypeTestCase( + 1, + Schema.FieldType.array(Schema.FieldType.BOOLEAN), + 2, + Types.ListType.ofRequired(1, Types.BooleanType.get())), + new BeamFieldTypeTestCase( + 3, + Schema.FieldType.iterable(Schema.FieldType.INT32), + 4, + Types.ListType.ofRequired(3, Types.IntegerType.get())), + new BeamFieldTypeTestCase( + 6, + Schema.FieldType.array(Schema.FieldType.INT64), + 7, + Types.ListType.ofRequired(6, Types.LongType.get())), + new BeamFieldTypeTestCase( + 10, + Schema.FieldType.array(Schema.FieldType.FLOAT), + 11, + Types.ListType.ofRequired(10, Types.FloatType.get())), + new BeamFieldTypeTestCase( + 7, + Schema.FieldType.iterable(Schema.FieldType.DOUBLE), + 8, + Types.ListType.ofRequired(7, Types.DoubleType.get())), + new BeamFieldTypeTestCase( + 11, + Schema.FieldType.array(Schema.FieldType.STRING), + 12, + Types.ListType.ofRequired(11, Types.StringType.get())), + new BeamFieldTypeTestCase( + 15, + Schema.FieldType.iterable(Schema.FieldType.BYTES), + 16, + Types.ListType.ofRequired(15, Types.BinaryType.get())), + new BeamFieldTypeTestCase( + 23, + Schema.FieldType.array( + Schema.FieldType.array(Schema.FieldType.iterable(Schema.FieldType.STRING))), + 26, + Types.ListType.ofRequired( + 23, + Types.ListType.ofRequired( + 24, Types.ListType.ofRequired(25, Types.StringType.get()))))); + + checkTypes(listTypes); + } + + @Test + public void testStructBeamFieldTypeToIcebergFieldType() { + // Iceberg sets one field ID for each nested type. + List listTypes = + Arrays.asList( + new BeamFieldTypeTestCase( + 1, + Schema.FieldType.row(Schema.builder().addStringField("str").build()), + 2, + Types.StructType.of( + Types.NestedField.required(2, "str", Types.StringType.get()))), + new BeamFieldTypeTestCase( + 3, + Schema.FieldType.row(Schema.builder().addInt32Field("int").build()), + 4, + Types.StructType.of( + Types.NestedField.required(4, "int", Types.IntegerType.get()))), + new BeamFieldTypeTestCase( + 0, + Schema.FieldType.row(BEAM_SCHEMA_PRIMITIVE), + 7, + Types.StructType.of(ICEBERG_SCHEMA_PRIMITIVE.columns())), + new BeamFieldTypeTestCase( + 15, + Schema.FieldType.row( + Schema.builder() + .addArrayField("arr", Schema.FieldType.STRING) + .addNullableStringField("str") + .build()), + 18, + Types.StructType.of( + Types.NestedField.required( + 16, "arr", Types.ListType.ofRequired(17, Types.StringType.get())), + Types.NestedField.optional(18, "str", Types.StringType.get()))), + new BeamFieldTypeTestCase( + 20, + Schema.FieldType.row( + Schema.builder() + .addRowField( + "row", + Schema.builder() + .addRowField( + "nested_row", Schema.builder().addStringField("str").build()) + .build()) + .addNullableRowField( + "nullable_row", Schema.builder().addInt64Field("long").build()) + .build()), + 25, + Types.StructType.of( + Types.NestedField.required( + 21, + "row", + Types.StructType.of( + Types.NestedField.required( + 22, + "nested_row", + Types.StructType.of( + Types.NestedField.required( + 23, "str", Types.StringType.get()))))), + Types.NestedField.optional( + 24, + "nullable_row", + Types.StructType.of( + Types.NestedField.required(25, "long", Types.LongType.get())))))); + + checkTypes(listTypes); + } + + @Test + public void testMapBeamFieldTypeToIcebergFieldType() { + List primitives = + Arrays.asList( + new BeamFieldTypeTestCase( + 1, + Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.INT32), + 3, + Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.IntegerType.get())), + new BeamFieldTypeTestCase( + 6, + Schema.FieldType.map( + Schema.FieldType.FLOAT, Schema.FieldType.array(Schema.FieldType.STRING)), + 9, + Types.MapType.ofRequired( + 7, + 8, + Types.FloatType.get(), + Types.ListType.ofRequired(9, Types.StringType.get()))), + new BeamFieldTypeTestCase( + 10, + Schema.FieldType.map( + Schema.FieldType.STRING, + Schema.FieldType.map( + Schema.FieldType.BOOLEAN, + Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.INT32))), + 16, + Types.MapType.ofRequired( + 11, + 12, + Types.StringType.get(), + Types.MapType.ofRequired( + 13, + 14, + Types.BooleanType.get(), + Types.MapType.ofRequired( + 15, 16, Types.StringType.get(), Types.IntegerType.get())))), + new BeamFieldTypeTestCase( + 15, + Schema.FieldType.map( + Schema.FieldType.row(Schema.builder().addStringField("str").build()), + Schema.FieldType.row(Schema.builder().addInt32Field("int").build())), + 19, + Types.MapType.ofRequired( + 16, + 17, + Types.StructType.of( + Types.NestedField.required(18, "str", Types.StringType.get())), + Types.StructType.of( + Types.NestedField.required(19, "int", Types.IntegerType.get()))))); + + checkTypes(primitives); + } + + static final Schema BEAM_SCHEMA_PRIMITIVE = + Schema.builder() + .addInt32Field("int") + .addFloatField("float") + .addNullableDoubleField("double") + .addInt64Field("long") + .addNullableStringField("str") + .addNullableBooleanField("bool") + .addByteArrayField("bytes") + .build(); + + static final org.apache.iceberg.Schema ICEBERG_SCHEMA_PRIMITIVE = + new org.apache.iceberg.Schema( + required(1, "int", Types.IntegerType.get()), + required(2, "float", Types.FloatType.get()), + optional(3, "double", Types.DoubleType.get()), + required(4, "long", Types.LongType.get()), + optional(5, "str", Types.StringType.get()), + optional(6, "bool", Types.BooleanType.get()), + required(7, "bytes", Types.BinaryType.get())); + + @Test + public void testPrimitiveBeamSchemaToIcebergSchema() { + org.apache.iceberg.Schema convertedIcebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_PRIMITIVE); + + System.out.println(convertedIcebergSchema); + System.out.println(ICEBERG_SCHEMA_PRIMITIVE); + + assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_PRIMITIVE)); + } + + @Test + public void testPrimitiveIcebergSchemaToBeamSchema() { + Schema convertedBeamSchema = IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_PRIMITIVE); + + assertEquals(BEAM_SCHEMA_PRIMITIVE, convertedBeamSchema); + } + + static final Schema BEAM_SCHEMA_LIST = + Schema.builder() + .addIterableField("arr_str", Schema.FieldType.STRING) + .addIterableField("arr_int", Schema.FieldType.INT32) + .addIterableField("arr_bool", Schema.FieldType.BOOLEAN) + .build(); + static final org.apache.iceberg.Schema ICEBERG_SCHEMA_LIST = + new org.apache.iceberg.Schema( + required(1, "arr_str", Types.ListType.ofRequired(2, Types.StringType.get())), + required(3, "arr_int", Types.ListType.ofRequired(4, Types.IntegerType.get())), + required(5, "arr_bool", Types.ListType.ofRequired(6, Types.BooleanType.get()))); + + @Test + public void testArrayBeamSchemaToIcebergSchema() { + org.apache.iceberg.Schema convertedIcebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_LIST); + + assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_LIST)); + } + + @Test + public void testArrayIcebergSchemaToBeamSchema() { + Schema convertedBeamSchema = IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_LIST); + + System.out.println(convertedBeamSchema); + System.out.println(BEAM_SCHEMA_LIST); + + assertEquals(BEAM_SCHEMA_LIST, convertedBeamSchema); + } + + static final Schema BEAM_SCHEMA_MAP = + Schema.builder() + .addMapField("str_int", Schema.FieldType.STRING, Schema.FieldType.INT32) + .addNullableMapField("long_bool", Schema.FieldType.INT64, Schema.FieldType.BOOLEAN) + .build(); + + static final org.apache.iceberg.Schema ICEBERG_SCHEMA_MAP = + new org.apache.iceberg.Schema( + required( + 1, + "str_int", + Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.IntegerType.get())), + optional( + 4, + "long_bool", + Types.MapType.ofRequired(5, 6, Types.LongType.get(), Types.BooleanType.get()))); + + @Test + public void testMapBeamSchemaToIcebergSchema() { + org.apache.iceberg.Schema convertedIcebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_MAP); + + assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_MAP)); + } + + @Test + public void testMapIcebergSchemaToBeamSchema() { + Schema convertedBeamSchema = IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_MAP); + + assertEquals(BEAM_SCHEMA_MAP, convertedBeamSchema); + } + + static final Schema BEAM_SCHEMA_STRUCT = + Schema.builder() + .addRowField( + "row", + Schema.builder() + .addStringField("str") + .addNullableInt32Field("int") + .addInt64Field("long") + .build()) + .addNullableRowField( + "nullable_row", + Schema.builder().addNullableStringField("str").addBooleanField("bool").build()) + .build(); + + static final org.apache.iceberg.Schema ICEBERG_SCHEMA_STRUCT = + new org.apache.iceberg.Schema( + required( + 1, + "row", + Types.StructType.of( + required(2, "str", Types.StringType.get()), + optional(3, "int", Types.IntegerType.get()), + required(4, "long", Types.LongType.get()))), + optional( + 5, + "nullable_row", + Types.StructType.of( + optional(6, "str", Types.StringType.get()), + required(7, "bool", Types.BooleanType.get())))); + + @Test + public void testStructBeamSchemaToIcebergSchema() { + org.apache.iceberg.Schema convertedIcebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_STRUCT); + + assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_STRUCT)); + } + + @Test + public void testStructIcebergSchemaToBeamSchema() { + Schema convertedBeamSchema = IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_STRUCT); + + assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema); + } + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 75884f4bcf70..a2cd64e23956 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -102,8 +102,7 @@ public void testSimpleAppend() { testPipeline .apply( "Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) - .setRowSchema( - SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))); + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))); PCollection result = input @@ -137,7 +136,7 @@ public void testWriteUsingManagedTransform() { PCollection inputRows = testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) - .setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)); + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)); PCollection result = inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(configMap)).get(OUTPUT_TAG); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java index 143687e3c999..007cb028c665 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java @@ -79,7 +79,7 @@ public void testUnstartedReaderReadsSamesItsSource() throws Exception { .build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) - .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .setSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .build()); BoundedSource.BoundedReader reader = source.createReader(options); @@ -121,7 +121,7 @@ public void testInitialSplitting() throws Exception { .build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) - .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .setSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .build()); // Input data for this test is tiny so try a number of very small split sizes @@ -167,7 +167,7 @@ public void testDoubleInitialSplitting() throws Exception { .build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) - .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .setSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .build()); // Input data for this test is tiny so make sure to split and get a few, but so they can be diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java deleted file mode 100644 index 5c708700a17d..000000000000 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.iceberg; - -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.nio.ByteBuffer; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.values.Row; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.junit.Test; -import org.junit.experimental.runners.Enclosed; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(Enclosed.class) -public class SchemaAndRowConversionsTest { - - @RunWith(JUnit4.class) - public static class RowToRecordTests { - /** - * Checks a value that when converted to Iceberg type is the same value when interpreted in - * Java. - */ - private void checkRowValueToRecordValue( - Schema.FieldType sourceType, Type destType, Object value) { - checkRowValueToRecordValue(sourceType, value, destType, value); - } - - private void checkRowValueToRecordValue( - Schema.FieldType sourceType, Object sourceValue, Type destType, Object destValue) { - Schema beamSchema = Schema.of(Schema.Field.of("v", sourceType)); - Row row = Row.withSchema(beamSchema).addValues(sourceValue).build(); - - org.apache.iceberg.Schema icebergSchema = - new org.apache.iceberg.Schema(required(0, "v", destType)); - Record record = SchemaAndRowConversions.rowToRecord(icebergSchema, row); - - assertThat(record.getField("v"), equalTo(destValue)); - } - - @Test - public void testBoolean() throws Exception { - checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), true); - checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, Types.BooleanType.get(), false); - } - - @Test - public void testInteger() throws Exception { - checkRowValueToRecordValue(Schema.FieldType.INT32, Types.IntegerType.get(), -13); - checkRowValueToRecordValue(Schema.FieldType.INT32, Types.IntegerType.get(), 42); - checkRowValueToRecordValue(Schema.FieldType.INT32, Types.IntegerType.get(), 0); - } - - @Test - public void testLong() throws Exception { - checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 13L); - checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 42L); - } - - @Test - public void testFloat() throws Exception { - checkRowValueToRecordValue(Schema.FieldType.FLOAT, Types.FloatType.get(), 3.14159f); - checkRowValueToRecordValue(Schema.FieldType.FLOAT, Types.FloatType.get(), 42.0f); - } - - @Test - public void testDouble() throws Exception { - checkRowValueToRecordValue(Schema.FieldType.DOUBLE, Types.DoubleType.get(), 3.14159); - } - - @Test - public void testDate() throws Exception {} - - @Test - public void testTime() throws Exception {} - - @Test - public void testTimestamp() throws Exception { - DateTime dateTime = - new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); - - checkRowValueToRecordValue( - Schema.FieldType.DATETIME, - dateTime.toInstant(), - Types.TimestampType.withoutZone(), - dateTime.getMillis()); - } - - @Test - public void testFixed() throws Exception {} - - @Test - public void testBinary() throws Exception { - byte[] bytes = new byte[] {1, 2, 3, 4}; - checkRowValueToRecordValue( - Schema.FieldType.BYTES, bytes, Types.BinaryType.get(), ByteBuffer.wrap(bytes)); - } - - @Test - public void testDecimal() throws Exception {} - - @Test - public void testStruct() throws Exception {} - - @Test - public void testMap() throws Exception {} - - @Test - public void testList() throws Exception {} - } - - @RunWith(JUnit4.class) - public static class RecordToRowTests { - private void checkRecordValueToRowValue( - Type sourceType, Schema.FieldType destType, Object value) { - checkRecordValueToRowValue(sourceType, value, destType, value); - } - - private void checkRecordValueToRowValue( - Type sourceType, Object sourceValue, Schema.FieldType destType, Object destValue) { - Schema beamSchema = Schema.of(Schema.Field.of("v", destType)); - - org.apache.iceberg.Schema icebergSchema = - new org.apache.iceberg.Schema(required(0, "v", sourceType)); - Record record = GenericRecord.create(icebergSchema); - record.setField("v", sourceValue); - - Row row = SchemaAndRowConversions.recordToRow(beamSchema, record); - - assertThat(row.getBaseValue("v"), equalTo(destValue)); - } - - @Test - public void testBoolean() throws Exception { - checkRecordValueToRowValue(Types.BooleanType.get(), Schema.FieldType.BOOLEAN, true); - checkRecordValueToRowValue(Types.BooleanType.get(), Schema.FieldType.BOOLEAN, false); - } - - @Test - public void testInteger() throws Exception { - checkRecordValueToRowValue(Types.IntegerType.get(), Schema.FieldType.INT32, -13); - checkRecordValueToRowValue(Types.IntegerType.get(), Schema.FieldType.INT32, 42); - checkRecordValueToRowValue(Types.IntegerType.get(), Schema.FieldType.INT32, 0); - } - - @Test - public void testLong() throws Exception { - checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 13L); - checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 42L); - } - - @Test - public void testFloat() throws Exception { - checkRecordValueToRowValue(Types.FloatType.get(), Schema.FieldType.FLOAT, 3.14159f); - checkRecordValueToRowValue(Types.FloatType.get(), Schema.FieldType.FLOAT, 42.0f); - } - - @Test - public void testDouble() throws Exception { - checkRecordValueToRowValue(Types.DoubleType.get(), Schema.FieldType.DOUBLE, 3.14159); - } - - @Test - public void testDate() throws Exception {} - - @Test - public void testTime() throws Exception {} - - @Test - public void testTimestamp() throws Exception { - DateTime dateTime = - new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC); - - checkRecordValueToRowValue( - Types.TimestampType.withoutZone(), - dateTime.getMillis(), - Schema.FieldType.DATETIME, - dateTime.toInstant()); - } - - @Test - public void testFixed() throws Exception {} - - @Test - public void testBinary() throws Exception { - byte[] bytes = new byte[] {1, 2, 3, 4}; - checkRecordValueToRowValue( - Types.BinaryType.get(), ByteBuffer.wrap(bytes), Schema.FieldType.BYTES, bytes); - } - - @Test - public void testDecimal() throws Exception {} - - @Test - public void testStruct() throws Exception {} - - @Test - public void testMap() throws Exception {} - - @Test - public void testList() throws Exception {} - } - - @RunWith(JUnit4.class) - public static class SchemaTests { - static final Schema BEAM_SCHEMA = - Schema.builder() - .addInt32Field("int") - .addFloatField("float") - .addDoubleField("double") - .addInt64Field("long") - .addStringField("str") - .addBooleanField("bool") - .addByteArrayField("bytes") - .build(); - - static final org.apache.iceberg.Schema ICEBERG_SCHEMA = - new org.apache.iceberg.Schema( - Types.NestedField.required(1, "int", Types.IntegerType.get()), - Types.NestedField.required(2, "float", Types.FloatType.get()), - Types.NestedField.required(3, "double", Types.DoubleType.get()), - Types.NestedField.required(4, "long", Types.LongType.get()), - Types.NestedField.required(5, "str", Types.StringType.get()), - Types.NestedField.required(6, "bool", Types.BooleanType.get()), - Types.NestedField.required(7, "bytes", Types.BinaryType.get())); - - @Test - public void testBeamSchemaToIcebergSchema() { - org.apache.iceberg.Schema convertedIcebergSchema = - SchemaAndRowConversions.beamSchemaToIcebergSchema(BEAM_SCHEMA); - - assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA)); - } - - @Test - public void testIcebergSchemaToBeamSchema() { - Schema convertedBeamSchema = - SchemaAndRowConversions.icebergSchemaToBeamSchema(ICEBERG_SCHEMA); - - assertEquals(BEAM_SCHEMA, convertedBeamSchema); - } - } -} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java index 4048e88398a9..6143bd03491d 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java @@ -91,7 +91,7 @@ public static final ImmutableList asRows(Iterable records) { ArrayList rows = new ArrayList<>(); for (Record record : records) { rows.add( - Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(SCHEMA)) + Row.withSchema(IcebergUtils.icebergSchemaToBeamSchema(SCHEMA)) .withFieldValue("id", record.getField("id")) .withFieldValue("data", record.getField("data")) .build());