diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 3f63c0c9975f..bbdc3a3910ef 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index a2f84e6475c9..acd9b25a6a5e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -34,6 +34,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -115,113 +116,110 @@ private static Schema icebergStructTypeToBeamSchema(final Types.StructType struc } /** - * Represents an Object (in practice, either {@link Type} or {@link Types.NestedField}) along with - * the most recent (max) ID that has been used to build this object. + * Represents a {@link Type} and the most recent field ID used to build it. * *

Iceberg Schema fields are required to have unique IDs. This includes unique IDs for a {@link - * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and value type, and - * nested {@link Types.StructType}s. When constructing any of these types, we use multiple unique - * ID's for the type's components. The {@code maxId} in this object represents the most recent ID - * used after building this type. This helps signal that the next field we construct should have - * an ID greater than this one. + * org.apache.iceberg.types.Type.NestedType}'s components (e.g. {@link Types.ListType}'s + * collection type, {@link Types.MapType}'s key type and value type, and {@link + * Types.StructType}'s nested fields). The {@code maxId} in this object represents the most recent + * ID used after building this type. This helps signal that the next {@link + * org.apache.iceberg.types.Type.NestedType} we construct should have an ID greater than this one. */ @VisibleForTesting - static class ObjectAndMaxId { + static class TypeAndMaxId { int maxId; - T object; + Type type; - ObjectAndMaxId(int id, T object) { + TypeAndMaxId(int id, Type object) { this.maxId = id; - this.object = object; + this.type = object; } } /** - * Given a Beam {@link Schema.FieldType} and an index, returns an Iceberg {@link Type} and the - * maximum index after building the Iceberg Type. This assumes the input index is already in use - * (usually by the parent {@link Types.NestedField}, and will start building the Iceberg type from - * index + 1. + * Takes a Beam {@link Schema.FieldType} and an index intended as a starting point for Iceberg + * {@link org.apache.iceberg.types.Type.NestedType}s. Returns an Iceberg {@link Type} and the + * maximum index after building that type. * - *

Returns this information in an {@link ObjectAndMaxId} instance. + *

Returns this information in an {@link TypeAndMaxId} object. */ @VisibleForTesting - static ObjectAndMaxId beamFieldTypeToIcebergFieldType( - int fieldId, Schema.FieldType beamType) { + static TypeAndMaxId beamFieldTypeToIcebergFieldType( + Schema.FieldType beamType, int nestedFieldId) { if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) { - return new ObjectAndMaxId<>(fieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName())); + // we don't use nested field ID for primitive types. decrement it so the caller can use it for + // other types. + return new TypeAndMaxId( + --nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName())); } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE - // List ID needs to be unique from the NestedField that contains this ListType - int listId = fieldId + 1; Schema.FieldType beamCollectionType = Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()); - ObjectAndMaxId listInfo = beamFieldTypeToIcebergFieldType(listId, beamCollectionType); - Type icebergCollectionType = listInfo.object; + // nestedFieldId is reserved for the list's collection type. + // we increment here because further nested fields should use unique ID's + TypeAndMaxId listInfo = + beamFieldTypeToIcebergFieldType(beamCollectionType, nestedFieldId + 1); + Type icebergCollectionType = listInfo.type; boolean elementTypeIsNullable = Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable(); Type listType = elementTypeIsNullable - ? Types.ListType.ofOptional(listId, icebergCollectionType) - : Types.ListType.ofRequired(listId, icebergCollectionType); + ? Types.ListType.ofOptional(nestedFieldId, icebergCollectionType) + : Types.ListType.ofRequired(nestedFieldId, icebergCollectionType); - return new ObjectAndMaxId<>(listInfo.maxId, listType); + return new TypeAndMaxId(listInfo.maxId, listType); } else if (beamType.getTypeName().isMapType()) { // MAP - // key and value IDs need to be unique from the NestedField that contains this MapType - int keyId = fieldId + 1; - int valueId = fieldId + 2; - int maxId = valueId; + // key and value IDs need to be unique + int keyId = nestedFieldId; + int valueId = keyId + 1; + // nested field IDs should be unique + nestedFieldId = valueId + 1; Schema.FieldType beamKeyType = Preconditions.checkArgumentNotNull(beamType.getMapKeyType()); - ObjectAndMaxId keyInfo = beamFieldTypeToIcebergFieldType(maxId, beamKeyType); - Type icebergKeyType = keyInfo.object; - maxId = keyInfo.maxId; + TypeAndMaxId keyInfo = beamFieldTypeToIcebergFieldType(beamKeyType, nestedFieldId); + Type icebergKeyType = keyInfo.type; + nestedFieldId = keyInfo.maxId + 1; Schema.FieldType beamValueType = Preconditions.checkArgumentNotNull(beamType.getMapValueType()); - ObjectAndMaxId valueInfo = beamFieldTypeToIcebergFieldType(maxId, beamValueType); - Type icebergValueType = valueInfo.object; - maxId = valueInfo.maxId; + TypeAndMaxId valueInfo = beamFieldTypeToIcebergFieldType(beamValueType, nestedFieldId); + Type icebergValueType = valueInfo.type; Type mapType = beamValueType.getNullable() ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, icebergValueType) : Types.MapType.ofRequired(keyId, valueId, icebergKeyType, icebergValueType); - return new ObjectAndMaxId<>(maxId, mapType); + return new TypeAndMaxId(valueInfo.maxId, mapType); } else if (beamType.getTypeName().isCompositeType()) { // ROW // Nested field IDs need to be unique from the field that contains this StructType - int maxFieldId = fieldId; - Schema nestedSchema = Preconditions.checkArgumentNotNull(beamType.getRowSchema()); List nestedFields = new ArrayList<>(nestedSchema.getFieldCount()); - for (Schema.Field field : nestedSchema.getFields()) { - ObjectAndMaxId converted = beamFieldToIcebergField(++maxFieldId, field); - Types.NestedField nestedField = converted.object; - nestedFields.add(nestedField); - maxFieldId = converted.maxId; + int icebergFieldId = nestedFieldId; + nestedFieldId = icebergFieldId + nestedSchema.getFieldCount(); + for (Schema.Field beamField : nestedSchema.getFields()) { + TypeAndMaxId typeAndMaxId = + beamFieldTypeToIcebergFieldType(beamField.getType(), nestedFieldId); + Types.NestedField icebergField = + Types.NestedField.of( + icebergFieldId++, + beamField.getType().getNullable(), + beamField.getName(), + typeAndMaxId.type); + + nestedFields.add(icebergField); + nestedFieldId = typeAndMaxId.maxId + 1; } Type structType = Types.StructType.of(nestedFields); - return new ObjectAndMaxId<>(maxFieldId, structType); + return new TypeAndMaxId(nestedFieldId - 1, structType); } - return new ObjectAndMaxId<>(fieldId, Types.StringType.get()); - } - - private static ObjectAndMaxId beamFieldToIcebergField( - int fieldId, final Schema.Field field) { - ObjectAndMaxId typeAndMaxId = beamFieldTypeToIcebergFieldType(fieldId, field.getType()); - Type icebergType = typeAndMaxId.object; - int id = typeAndMaxId.maxId; - - Types.NestedField icebergField = - Types.NestedField.of(fieldId, field.getType().getNullable(), field.getName(), icebergType); - - return new ObjectAndMaxId<>(id, icebergField); + return new TypeAndMaxId(nestedFieldId, Types.StringType.get()); } /** @@ -233,18 +231,23 @@ private static ObjectAndMaxId beamFieldToIcebergField( *

  • {@link Schema.TypeName.LOGICAL_TYPE} */ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) { - Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()]; - int nextIcebergFieldId = 1; - for (int i = 0; i < schema.getFieldCount(); i++) { - Schema.Field beamField = schema.getField(i); - ObjectAndMaxId fieldAndMaxId = - beamFieldToIcebergField(nextIcebergFieldId, beamField); - Types.NestedField field = fieldAndMaxId.object; - fields[i] = field; - - nextIcebergFieldId = fieldAndMaxId.maxId + 1; + List fields = new ArrayList<>(schema.getFieldCount()); + int nestedFieldId = schema.getFieldCount() + 1; + int icebergFieldId = 1; + for (Schema.Field beamField : schema.getFields()) { + TypeAndMaxId typeAndMaxId = + beamFieldTypeToIcebergFieldType(beamField.getType(), nestedFieldId); + Types.NestedField icebergField = + Types.NestedField.of( + icebergFieldId++, + beamField.getType().getNullable(), + beamField.getName(), + typeAndMaxId.type); + + fields.add(icebergField); + nestedFieldId = typeAndMaxId.maxId + 1; } - return new org.apache.iceberg.Schema(fields); + return new org.apache.iceberg.Schema(fields.toArray(new Types.NestedField[fields.size()])); } /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */ @@ -323,27 +326,21 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row public static Row icebergRecordToBeamRow(Schema schema, Record record) { Row.Builder rowBuilder = Row.withSchema(schema); for (Schema.Field field : schema.getFields()) { + boolean isNullable = field.getType().getNullable(); + @Nullable Object icebergValue = record.getField(field.getName()); + if (icebergValue == null) { + if (isNullable) { + rowBuilder.addValue(null); + continue; + } + throw new RuntimeException( + String.format("Received null value for required field '%s'.", field.getName())); + } switch (field.getType().getTypeName()) { case BYTE: - // I guess allow anything we can cast here - byte byteValue = (byte) record.getField(field.getName()); - rowBuilder.addValue(byteValue); - break; case INT16: - // I guess allow anything we can cast here - short shortValue = (short) record.getField(field.getName()); - rowBuilder.addValue(shortValue); - break; case INT32: - // I guess allow anything we can cast here - int intValue = (int) record.getField(field.getName()); - rowBuilder.addValue(intValue); - break; case INT64: - // I guess allow anything we can cast here - long longValue = (long) record.getField(field.getName()); - rowBuilder.addValue(longValue); - break; case DECIMAL: // Iceberg and Beam both use BigDecimal case FLOAT: // Iceberg and Beam both use float case DOUBLE: // Iceberg and Beam both use double @@ -352,29 +349,31 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) { case ARRAY: case ITERABLE: case MAP: - rowBuilder.addValue(record.getField(field.getName())); + rowBuilder.addValue(icebergValue); break; case DATETIME: // Iceberg uses a long for millis; Beam uses joda time DateTime - long millis = (long) record.getField(field.getName()); + long millis = (long) icebergValue; rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC)); break; case BYTES: // Iceberg uses ByteBuffer; Beam uses byte[] - rowBuilder.addValue(((ByteBuffer) record.getField(field.getName())).array()); + rowBuilder.addValue(((ByteBuffer) icebergValue).array()); break; case ROW: - Record nestedRecord = (Record) record.getField(field.getName()); + Record nestedRecord = (Record) icebergValue; Schema nestedSchema = checkArgumentNotNull( field.getType().getRowSchema(), "Corrupted schema: Row type did not have associated nested schema."); - Row nestedRow = icebergRecordToBeamRow(nestedSchema, nestedRecord); - rowBuilder.addValue(nestedRow); + rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord)); break; case LOGICAL_TYPE: throw new UnsupportedOperationException( "Cannot convert iceberg field to Beam logical type"); + default: + throw new UnsupportedOperationException( + "Unsupported Beam type: " + field.getType().getTypeName()); } } return rowBuilder.build(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index c4da0b22f4d9..a20d5b7c8f59 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.io.iceberg.IcebergUtils.ObjectAndMaxId; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.TypeAndMaxId; import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -316,11 +316,11 @@ private static class BeamFieldTypeTestCase { private void checkTypes(List testCases) { for (BeamFieldTypeTestCase testCase : testCases) { - ObjectAndMaxId ret = - beamFieldTypeToIcebergFieldType(testCase.icebergFieldId, testCase.beamType); + TypeAndMaxId ret = + beamFieldTypeToIcebergFieldType(testCase.beamType, testCase.icebergFieldId); assertEquals(testCase.expectedMaxId, ret.maxId); - checkEquals(testCase.expectedIcebergType, ret.object); + checkEquals(testCase.expectedIcebergType, ret.type); } } @@ -338,65 +338,65 @@ private void checkEquals(Type expected, Type actual) { @Test public void testPrimitiveBeamFieldTypeToIcebergFieldType() { + // primitive types don't use the nested field ID List primitives = Arrays.asList( - new BeamFieldTypeTestCase(1, Schema.FieldType.BOOLEAN, 1, Types.BooleanType.get()), - new BeamFieldTypeTestCase(3, Schema.FieldType.INT32, 3, Types.IntegerType.get()), - new BeamFieldTypeTestCase(6, Schema.FieldType.INT64, 6, Types.LongType.get()), - new BeamFieldTypeTestCase(10, Schema.FieldType.FLOAT, 10, Types.FloatType.get()), - new BeamFieldTypeTestCase(7, Schema.FieldType.DOUBLE, 7, Types.DoubleType.get()), - new BeamFieldTypeTestCase(11, Schema.FieldType.STRING, 11, Types.StringType.get()), - new BeamFieldTypeTestCase(15, Schema.FieldType.BYTES, 15, Types.BinaryType.get())); + new BeamFieldTypeTestCase(1, Schema.FieldType.BOOLEAN, 0, Types.BooleanType.get()), + new BeamFieldTypeTestCase(3, Schema.FieldType.INT32, 2, Types.IntegerType.get()), + new BeamFieldTypeTestCase(6, Schema.FieldType.INT64, 5, Types.LongType.get()), + new BeamFieldTypeTestCase(10, Schema.FieldType.FLOAT, 9, Types.FloatType.get()), + new BeamFieldTypeTestCase(7, Schema.FieldType.DOUBLE, 6, Types.DoubleType.get()), + new BeamFieldTypeTestCase(11, Schema.FieldType.STRING, 10, Types.StringType.get()), + new BeamFieldTypeTestCase(15, Schema.FieldType.BYTES, 14, Types.BinaryType.get())); checkTypes(primitives); } @Test public void testArrayBeamFieldTypeToIcebergFieldType() { - // Iceberg sets one field ID for the List type itself and another field ID for the collection - // type. + // Iceberg's ListType reserves one nested ID for its element type List listTypes = Arrays.asList( new BeamFieldTypeTestCase( 1, Schema.FieldType.array(Schema.FieldType.BOOLEAN), - 2, + 1, Types.ListType.ofRequired(1, Types.BooleanType.get())), new BeamFieldTypeTestCase( 3, Schema.FieldType.iterable(Schema.FieldType.INT32), - 4, + 3, Types.ListType.ofRequired(3, Types.IntegerType.get())), new BeamFieldTypeTestCase( 6, Schema.FieldType.array(Schema.FieldType.INT64), - 7, + 6, Types.ListType.ofRequired(6, Types.LongType.get())), new BeamFieldTypeTestCase( 10, Schema.FieldType.array(Schema.FieldType.FLOAT), - 11, + 10, Types.ListType.ofRequired(10, Types.FloatType.get())), new BeamFieldTypeTestCase( 7, Schema.FieldType.iterable(Schema.FieldType.DOUBLE), - 8, + 7, Types.ListType.ofRequired(7, Types.DoubleType.get())), new BeamFieldTypeTestCase( 11, Schema.FieldType.array(Schema.FieldType.STRING), - 12, + 11, Types.ListType.ofRequired(11, Types.StringType.get())), new BeamFieldTypeTestCase( 15, Schema.FieldType.iterable(Schema.FieldType.BYTES), - 16, + 15, Types.ListType.ofRequired(15, Types.BinaryType.get())), new BeamFieldTypeTestCase( 23, Schema.FieldType.array( Schema.FieldType.array(Schema.FieldType.iterable(Schema.FieldType.STRING))), - 26, + 25, Types.ListType.ofRequired( 23, Types.ListType.ofRequired( @@ -407,23 +407,23 @@ public void testArrayBeamFieldTypeToIcebergFieldType() { @Test public void testStructBeamFieldTypeToIcebergFieldType() { - // Iceberg sets one field ID for each nested type. + // Iceberg sets one unique field ID for each nested type. List listTypes = Arrays.asList( new BeamFieldTypeTestCase( 1, Schema.FieldType.row(Schema.builder().addStringField("str").build()), - 2, + 1, Types.StructType.of( - Types.NestedField.required(2, "str", Types.StringType.get()))), + Types.NestedField.required(1, "str", Types.StringType.get()))), new BeamFieldTypeTestCase( 3, Schema.FieldType.row(Schema.builder().addInt32Field("int").build()), - 4, + 3, Types.StructType.of( - Types.NestedField.required(4, "int", Types.IntegerType.get()))), + Types.NestedField.required(3, "int", Types.IntegerType.get()))), new BeamFieldTypeTestCase( - 0, + 1, Schema.FieldType.row(BEAM_SCHEMA_PRIMITIVE), 7, Types.StructType.of(ICEBERG_SCHEMA_PRIMITIVE.columns())), @@ -434,11 +434,11 @@ public void testStructBeamFieldTypeToIcebergFieldType() { .addArrayField("arr", Schema.FieldType.STRING) .addNullableStringField("str") .build()), - 18, + 17, Types.StructType.of( Types.NestedField.required( - 16, "arr", Types.ListType.ofRequired(17, Types.StringType.get())), - Types.NestedField.optional(18, "str", Types.StringType.get()))), + 15, "arr", Types.ListType.ofRequired(17, Types.StringType.get())), + Types.NestedField.optional(16, "str", Types.StringType.get()))), new BeamFieldTypeTestCase( 20, Schema.FieldType.row( @@ -452,10 +452,10 @@ public void testStructBeamFieldTypeToIcebergFieldType() { .addNullableRowField( "nullable_row", Schema.builder().addInt64Field("long").build()) .build()), - 25, + 24, Types.StructType.of( Types.NestedField.required( - 21, + 20, "row", Types.StructType.of( Types.NestedField.required( @@ -465,33 +465,34 @@ public void testStructBeamFieldTypeToIcebergFieldType() { Types.NestedField.required( 23, "str", Types.StringType.get()))))), Types.NestedField.optional( - 24, + 21, "nullable_row", Types.StructType.of( - Types.NestedField.required(25, "long", Types.LongType.get())))))); + Types.NestedField.required(24, "long", Types.LongType.get())))))); checkTypes(listTypes); } @Test public void testMapBeamFieldTypeToIcebergFieldType() { + // Iceberg's MapType reserves two nested IDs. one for its key type and one for its value type. List primitives = Arrays.asList( new BeamFieldTypeTestCase( 1, Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.INT32), - 3, - Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.IntegerType.get())), + 2, + Types.MapType.ofRequired(1, 2, Types.StringType.get(), Types.IntegerType.get())), new BeamFieldTypeTestCase( 6, Schema.FieldType.map( Schema.FieldType.FLOAT, Schema.FieldType.array(Schema.FieldType.STRING)), - 9, + 8, Types.MapType.ofRequired( + 6, 7, - 8, Types.FloatType.get(), - Types.ListType.ofRequired(9, Types.StringType.get()))), + Types.ListType.ofRequired(8, Types.StringType.get()))), new BeamFieldTypeTestCase( 10, Schema.FieldType.map( @@ -499,30 +500,30 @@ public void testMapBeamFieldTypeToIcebergFieldType() { Schema.FieldType.map( Schema.FieldType.BOOLEAN, Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.INT32))), - 16, + 15, Types.MapType.ofRequired( + 10, 11, - 12, Types.StringType.get(), Types.MapType.ofRequired( + 12, 13, - 14, Types.BooleanType.get(), Types.MapType.ofRequired( - 15, 16, Types.StringType.get(), Types.IntegerType.get())))), + 14, 15, Types.StringType.get(), Types.IntegerType.get())))), new BeamFieldTypeTestCase( 15, Schema.FieldType.map( Schema.FieldType.row(Schema.builder().addStringField("str").build()), Schema.FieldType.row(Schema.builder().addInt32Field("int").build())), - 19, + 18, Types.MapType.ofRequired( + 15, 16, - 17, Types.StructType.of( - Types.NestedField.required(18, "str", Types.StringType.get())), + Types.NestedField.required(17, "str", Types.StringType.get())), Types.StructType.of( - Types.NestedField.required(19, "int", Types.IntegerType.get()))))); + Types.NestedField.required(18, "int", Types.IntegerType.get()))))); checkTypes(primitives); } @@ -574,9 +575,9 @@ public void testPrimitiveIcebergSchemaToBeamSchema() { .build(); static final org.apache.iceberg.Schema ICEBERG_SCHEMA_LIST = new org.apache.iceberg.Schema( - required(1, "arr_str", Types.ListType.ofRequired(2, Types.StringType.get())), - required(3, "arr_int", Types.ListType.ofRequired(4, Types.IntegerType.get())), - required(5, "arr_bool", Types.ListType.ofRequired(6, Types.BooleanType.get()))); + required(1, "arr_str", Types.ListType.ofRequired(4, Types.StringType.get())), + required(2, "arr_int", Types.ListType.ofRequired(5, Types.IntegerType.get())), + required(3, "arr_bool", Types.ListType.ofRequired(6, Types.BooleanType.get()))); @Test public void testArrayBeamSchemaToIcebergSchema() { @@ -607,9 +608,9 @@ public void testArrayIcebergSchemaToBeamSchema() { required( 1, "str_int", - Types.MapType.ofRequired(2, 3, Types.StringType.get(), Types.IntegerType.get())), + Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.IntegerType.get())), optional( - 4, + 2, "long_bool", Types.MapType.ofRequired(5, 6, Types.LongType.get(), Types.BooleanType.get()))); @@ -648,11 +649,11 @@ public void testMapIcebergSchemaToBeamSchema() { 1, "row", Types.StructType.of( - required(2, "str", Types.StringType.get()), - optional(3, "int", Types.IntegerType.get()), - required(4, "long", Types.LongType.get()))), + required(3, "str", Types.StringType.get()), + optional(4, "int", Types.IntegerType.get()), + required(5, "long", Types.LongType.get()))), optional( - 5, + 2, "nullable_row", Types.StructType.of( optional(6, "str", Types.StringType.get()),