From 689af5ba16ea5cba07783bc25bd21bfa2ab7537d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Tue, 5 Nov 2024 13:08:40 -0500 Subject: [PATCH] [Managed Iceberg] Allow updating partition specs during pipeline runtime (#32879) * allowed updating partition specs at runtime * add to changes md * add to changes md * trigger iceberg integration tests * refresh cached tables; split multiple partition specs into separate manifest files * add test * address comment * clarify changes comment --- .../IO_Iceberg_Integration_Tests.json | 2 +- CHANGES.md | 1 + .../sdk/io/iceberg/AppendFilesToTables.java | 115 ++++++++++++-- .../beam/sdk/io/iceberg/FileWriteResult.java | 5 +- .../sdk/io/iceberg/RecordWriterManager.java | 45 +++--- .../sdk/io/iceberg/SerializableDataFile.java | 17 ++- .../sdk/io/iceberg/WriteToDestinations.java | 2 +- .../io/iceberg/RecordWriterManagerTest.java | 140 +++++++++++++++--- 8 files changed, 260 insertions(+), 67 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 62ae7886c573..bbdc3a3910ef 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 3 } diff --git a/CHANGES.md b/CHANGES.md index c98504df4d1b..cdedce22e975 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -69,6 +69,7 @@ * [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495)) * [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) * BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527)) +* [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879)) ## New Features / Improvements diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index defe4f2a603d..d9768114e7c6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -17,6 +17,12 @@ */ package org.apache.beam.sdk.io.iceberg; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.metrics.Counter; @@ -29,14 +35,21 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,9 +58,11 @@ class AppendFilesToTables extends PTransform, PCollection>> { private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class); private final IcebergCatalogConfig catalogConfig; + private final String manifestFilePrefix; - AppendFilesToTables(IcebergCatalogConfig catalogConfig) { + AppendFilesToTables(IcebergCatalogConfig catalogConfig, String manifestFilePrefix) { this.catalogConfig = catalogConfig; + this.manifestFilePrefix = manifestFilePrefix; } @Override @@ -67,7 +82,7 @@ public String apply(FileWriteResult input) { .apply("Group metadata updates by table", GroupByKey.create()) .apply( "Append metadata updates to tables", - ParDo.of(new AppendFilesToTablesDoFn(catalogConfig))) + ParDo.of(new AppendFilesToTablesDoFn(catalogConfig, manifestFilePrefix))) .setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER)); } @@ -75,19 +90,19 @@ private static class AppendFilesToTablesDoFn extends DoFn>, KV> { private final Counter snapshotsCreated = Metrics.counter(AppendFilesToTables.class, "snapshotsCreated"); - private final Counter dataFilesCommitted = - Metrics.counter(AppendFilesToTables.class, "dataFilesCommitted"); private final Distribution committedDataFileByteSize = Metrics.distribution(RecordWriter.class, "committedDataFileByteSize"); private final Distribution committedDataFileRecordCount = Metrics.distribution(RecordWriter.class, "committedDataFileRecordCount"); private final IcebergCatalogConfig catalogConfig; + private final String manifestFilePrefix; private transient @MonotonicNonNull Catalog catalog; - private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) { + private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig, String manifestFilePrefix) { this.catalogConfig = catalogConfig; + this.manifestFilePrefix = manifestFilePrefix; } private Catalog getCatalog() { @@ -97,11 +112,22 @@ private Catalog getCatalog() { return catalog; } + private boolean containsMultiplePartitionSpecs(Iterable fileWriteResults) { + int id = fileWriteResults.iterator().next().getSerializableDataFile().getPartitionSpecId(); + for (FileWriteResult result : fileWriteResults) { + if (id != result.getSerializableDataFile().getPartitionSpecId()) { + return true; + } + } + return false; + } + @ProcessElement public void processElement( @Element KV> element, OutputReceiver> out, - BoundedWindow window) { + BoundedWindow window) + throws IOException { String tableStringIdentifier = element.getKey(); Iterable fileWriteResults = element.getValue(); if (!fileWriteResults.iterator().hasNext()) { @@ -109,24 +135,81 @@ public void processElement( } Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); + + // vast majority of the time, we will simply append data files. + // in the rare case we get a batch that contains multiple partition specs, we will group + // data into manifest files and append. + // note: either way, we must use a single commit operation for atomicity. + if (containsMultiplePartitionSpecs(fileWriteResults)) { + appendManifestFiles(table, fileWriteResults); + } else { + appendDataFiles(table, fileWriteResults); + } + + Snapshot snapshot = table.currentSnapshot(); + LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot); + snapshotsCreated.inc(); + out.outputWithTimestamp( + KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp()); + } + + // This works only when all files are using the same partition spec. + private void appendDataFiles(Table table, Iterable fileWriteResults) { AppendFiles update = table.newAppend(); - long numFiles = 0; for (FileWriteResult result : fileWriteResults) { - DataFile dataFile = result.getDataFile(table.spec()); + DataFile dataFile = result.getDataFile(table.specs()); update.appendFile(dataFile); committedDataFileByteSize.update(dataFile.fileSizeInBytes()); committedDataFileRecordCount.update(dataFile.recordCount()); - numFiles++; } - // this commit will create a ManifestFile. we don't need to manually create one. update.commit(); - dataFilesCommitted.inc(numFiles); + } - Snapshot snapshot = table.currentSnapshot(); - LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot); - snapshotsCreated.inc(); - out.outputWithTimestamp( - KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp()); + // When a user updates their table partition spec during runtime, we can end up with + // a batch of files where some are written with the old spec and some are written with the new + // spec. + // A table commit is limited to a single partition spec. + // To handle this, we create a manifest file for each partition spec, and group data files + // accordingly. + // Afterward, we append all manifests using a single commit operation. + private void appendManifestFiles(Table table, Iterable fileWriteResults) + throws IOException { + String uuid = UUID.randomUUID().toString(); + Map specs = table.specs(); + + Map> dataFilesBySpec = new HashMap<>(); + for (FileWriteResult result : fileWriteResults) { + DataFile dataFile = result.getDataFile(specs); + dataFilesBySpec.computeIfAbsent(dataFile.specId(), i -> new ArrayList<>()).add(dataFile); + } + + AppendFiles update = table.newAppend(); + for (Map.Entry> entry : dataFilesBySpec.entrySet()) { + int specId = entry.getKey(); + List files = entry.getValue(); + PartitionSpec spec = Preconditions.checkStateNotNull(specs.get(specId)); + ManifestWriter writer = + createManifestWriter(table.location(), uuid, spec, table.io()); + for (DataFile file : files) { + writer.add(file); + committedDataFileByteSize.update(file.fileSizeInBytes()); + committedDataFileRecordCount.update(file.recordCount()); + } + writer.close(); + update.appendManifest(writer.toManifestFile()); + } + update.commit(); + } + + private ManifestWriter createManifestWriter( + String tableLocation, String uuid, PartitionSpec spec, FileIO io) { + String location = + FileFormat.AVRO.addExtension( + String.format( + "%s/metadata/%s-%s-%s.manifest", + tableLocation, manifestFilePrefix, uuid, spec.specId())); + OutputFile outputFile = io.newOutputFile(location); + return ManifestFiles.write(spec, outputFile); } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java index c4090d9e7e53..bf00bf8519fc 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.iceberg; import com.google.auto.value.AutoValue; +import java.util.Map; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; @@ -46,9 +47,9 @@ public TableIdentifier getTableIdentifier() { } @SchemaIgnore - public DataFile getDataFile(PartitionSpec spec) { + public DataFile getDataFile(Map specs) { if (cachedDataFile == null) { - cachedDataFile = getSerializableDataFile().createDataFile(spec); + cachedDataFile = getSerializableDataFile().createDataFile(specs); } return cachedDataFile; } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 396db7c20f36..12c425993826 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.util.Preconditions; @@ -195,7 +194,9 @@ private RecordWriter createWriter(PartitionKey partitionKey) { private final Map, List> totalSerializableDataFiles = Maps.newHashMap(); - private static final Cache TABLE_CACHE = + + @VisibleForTesting + static final Cache TABLE_CACHE = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); private boolean isClosed = false; @@ -221,22 +222,28 @@ private RecordWriter createWriter(PartitionKey partitionKey) { private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) { @Nullable Table table = TABLE_CACHE.getIfPresent(identifier); if (table == null) { - try { - table = catalog.loadTable(identifier); - } catch (NoSuchTableException e) { + synchronized (TABLE_CACHE) { try { - org.apache.iceberg.Schema tableSchema = - IcebergUtils.beamSchemaToIcebergSchema(dataSchema); - // TODO(ahmedabu98): support creating a table with a specified partition spec - table = catalog.createTable(identifier, tableSchema); - LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema); - } catch (AlreadyExistsException alreadyExistsException) { - // handle race condition where workers are concurrently creating the same table. - // if running into already exists exception, we perform one last load table = catalog.loadTable(identifier); + } catch (NoSuchTableException e) { + try { + org.apache.iceberg.Schema tableSchema = + IcebergUtils.beamSchemaToIcebergSchema(dataSchema); + // TODO(ahmedabu98): support creating a table with a specified partition spec + table = catalog.createTable(identifier, tableSchema); + LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema); + } catch (AlreadyExistsException alreadyExistsException) { + // handle race condition where workers are concurrently creating the same table. + // if running into already exists exception, we perform one last load + table = catalog.loadTable(identifier); + } } + TABLE_CACHE.put(identifier, table); } - TABLE_CACHE.put(identifier, table); + } else { + // If fetching from cache, refresh the table to avoid working with stale metadata + // (e.g. partition spec) + table.refresh(); } return table; } @@ -254,15 +261,7 @@ public boolean write(WindowedValue icebergDestination, Row r icebergDestination, destination -> { TableIdentifier identifier = destination.getValue().getTableIdentifier(); - Table table; - try { - table = - TABLE_CACHE.get( - identifier, () -> getOrCreateTable(identifier, row.getSchema())); - } catch (ExecutionException e) { - throw new RuntimeException( - "Error while fetching or creating table: " + identifier, e); - } + Table table = getOrCreateTable(identifier, row.getSchema()); return new DestinationState(destination.getValue(), table); }); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index 699d4fa4dfd0..59b456162008 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import com.google.auto.value.AutoValue; import java.nio.ByteBuffer; import java.util.HashMap; @@ -24,7 +26,6 @@ import java.util.Map; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -141,12 +142,14 @@ static SerializableDataFile from(DataFile f, PartitionKey key) { * it from Beam-compatible types. */ @SuppressWarnings("nullness") - DataFile createDataFile(PartitionSpec partitionSpec) { - Preconditions.checkState( - partitionSpec.specId() == getPartitionSpecId(), - "Invalid partition spec id '%s'. This DataFile was originally created with spec id '%s'.", - partitionSpec.specId(), - getPartitionSpecId()); + DataFile createDataFile(Map partitionSpecs) { + PartitionSpec partitionSpec = + checkStateNotNull( + partitionSpecs.get(getPartitionSpecId()), + "This DataFile was originally created with spec id '%s'. Could not find " + + "this among table's partition specs: %s.", + getPartitionSpecId(), + partitionSpecs.keySet()); Metrics dataFileMetrics = new Metrics( diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index a2d0c320f58f..fb3bf43f3515 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -74,7 +74,7 @@ public IcebergWriteResult expand(PCollection> input) { // Commit files to tables PCollection> snapshots = - writtenFiles.apply(new AppendFilesToTables(catalogConfig)); + writtenFiles.apply(new AppendFilesToTables(catalogConfig, filePrefix)); return new IcebergWriteResult(input.getPipeline(), snapshots); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 7adf6defe520..8ced06bc944f 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -19,24 +19,29 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; import org.checkerframework.checker.nullness.qual.Nullable; @@ -73,6 +78,7 @@ public void setUp() { windowedDestination = getWindowedDestination("table_" + testName.getMethodName(), PARTITION_SPEC); catalog = new HadoopCatalog(new Configuration(), warehouse.location); + RecordWriterManager.TABLE_CACHE.invalidateAll(); } private WindowedValue getWindowedDestination( @@ -269,6 +275,25 @@ public void testRequireClosingBeforeFetchingDataFiles() { assertThrows(IllegalStateException.class, writerManager::getSerializableDataFiles); } + /** DataFile doesn't implement a .equals() method. Check equality manually. */ + private static void checkDataFileEquality(DataFile d1, DataFile d2) { + assertEquals(d1.path(), d2.path()); + assertEquals(d1.format(), d2.format()); + assertEquals(d1.recordCount(), d2.recordCount()); + assertEquals(d1.partition(), d2.partition()); + assertEquals(d1.specId(), d2.specId()); + assertEquals(d1.keyMetadata(), d2.keyMetadata()); + assertEquals(d1.splitOffsets(), d2.splitOffsets()); + assertEquals(d1.columnSizes(), d2.columnSizes()); + assertEquals(d1.valueCounts(), d2.valueCounts()); + assertEquals(d1.nullValueCounts(), d2.nullValueCounts()); + assertEquals(d1.nanValueCounts(), d2.nanValueCounts()); + assertEquals(d1.equalityFieldIds(), d2.equalityFieldIds()); + assertEquals(d1.fileSequenceNumber(), d2.fileSequenceNumber()); + assertEquals(d1.dataSequenceNumber(), d2.dataSequenceNumber()); + assertEquals(d1.pos(), d2.pos()); + } + @Test public void testSerializableDataFileRoundTripEquality() throws IOException { PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA); @@ -288,22 +313,103 @@ public void testSerializableDataFileRoundTripEquality() throws IOException { assertEquals(2L, datafile.recordCount()); DataFile roundTripDataFile = - SerializableDataFile.from(datafile, partitionKey).createDataFile(PARTITION_SPEC); - // DataFile doesn't implement a .equals() method. Check equality manually - assertEquals(datafile.path(), roundTripDataFile.path()); - assertEquals(datafile.format(), roundTripDataFile.format()); - assertEquals(datafile.recordCount(), roundTripDataFile.recordCount()); - assertEquals(datafile.partition(), roundTripDataFile.partition()); - assertEquals(datafile.specId(), roundTripDataFile.specId()); - assertEquals(datafile.keyMetadata(), roundTripDataFile.keyMetadata()); - assertEquals(datafile.splitOffsets(), roundTripDataFile.splitOffsets()); - assertEquals(datafile.columnSizes(), roundTripDataFile.columnSizes()); - assertEquals(datafile.valueCounts(), roundTripDataFile.valueCounts()); - assertEquals(datafile.nullValueCounts(), roundTripDataFile.nullValueCounts()); - assertEquals(datafile.nanValueCounts(), roundTripDataFile.nanValueCounts()); - assertEquals(datafile.equalityFieldIds(), roundTripDataFile.equalityFieldIds()); - assertEquals(datafile.fileSequenceNumber(), roundTripDataFile.fileSequenceNumber()); - assertEquals(datafile.dataSequenceNumber(), roundTripDataFile.dataSequenceNumber()); - assertEquals(datafile.pos(), roundTripDataFile.pos()); + SerializableDataFile.from(datafile, partitionKey) + .createDataFile(ImmutableMap.of(PARTITION_SPEC.specId(), PARTITION_SPEC)); + + checkDataFileEquality(datafile, roundTripDataFile); + } + + /** + * Users may update the table's spec while a write pipeline is running. Sometimes, this can happen + * after converting {@link DataFile} to {@link SerializableDataFile}s. When converting back to + * {@link DataFile} to commit in the {@link AppendFilesToTables} step, we need to make sure to use + * the same {@link PartitionSpec} it was originally created with. + * + *

This test checks that we're preserving the right {@link PartitionSpec} when such an update + * happens. + */ + @Test + public void testRecreateSerializableDataAfterUpdatingPartitionSpec() throws IOException { + PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA); + + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "abcdef", true).build(); + Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "abcxyz", true).build(); + // same partition for both records (name_trunc=abc, bool=true) + partitionKey.partition(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); + + // write some rows + RecordWriter writer = + new RecordWriter(catalog, windowedDestination.getValue(), "test_file_name", partitionKey); + writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); + writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row2)); + writer.close(); + + // fetch data file and its serializable version + DataFile datafile = writer.getDataFile(); + SerializableDataFile serializableDataFile = SerializableDataFile.from(datafile, partitionKey); + + assertEquals(2L, datafile.recordCount()); + assertEquals(serializableDataFile.getPartitionSpecId(), datafile.specId()); + + // update spec + Table table = catalog.loadTable(windowedDestination.getValue().getTableIdentifier()); + table.updateSpec().addField("id").removeField("bool").commit(); + + Map updatedSpecs = table.specs(); + DataFile roundTripDataFile = serializableDataFile.createDataFile(updatedSpecs); + + checkDataFileEquality(datafile, roundTripDataFile); + } + + @Test + public void testWriterKeepsUpWithUpdatingPartitionSpec() throws IOException { + Table table = catalog.loadTable(windowedDestination.getValue().getTableIdentifier()); + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "abcdef", true).build(); + Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "abcxyz", true).build(); + + // write some rows + RecordWriterManager writer = + new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, Integer.MAX_VALUE); + writer.write(windowedDestination, row); + writer.write(windowedDestination, row2); + writer.close(); + DataFile dataFile = + writer + .getSerializableDataFiles() + .get(windowedDestination) + .get(0) + .createDataFile(table.specs()); + + // check data file path contains the correct partition components + assertEquals(2L, dataFile.recordCount()); + assertEquals(dataFile.specId(), PARTITION_SPEC.specId()); + assertThat(dataFile.path().toString(), containsString("name_trunc=abc")); + assertThat(dataFile.path().toString(), containsString("bool=true")); + + // table is cached + assertEquals(1, RecordWriterManager.TABLE_CACHE.size()); + + // update spec + table.updateSpec().addField("id").removeField("bool").commit(); + + // write a second data file + // should refresh the table and use the new partition spec + RecordWriterManager writer2 = + new RecordWriterManager(catalog, "test_prefix_2", Long.MAX_VALUE, Integer.MAX_VALUE); + writer2.write(windowedDestination, row); + writer2.write(windowedDestination, row2); + writer2.close(); + + List serializableDataFiles = + writer2.getSerializableDataFiles().get(windowedDestination); + assertEquals(2, serializableDataFiles.size()); + for (SerializableDataFile serializableDataFile : serializableDataFiles) { + assertEquals(table.spec().specId(), serializableDataFile.getPartitionSpecId()); + dataFile = serializableDataFile.createDataFile(table.specs()); + assertEquals(1L, dataFile.recordCount()); + assertThat(dataFile.path().toString(), containsString("name_trunc=abc")); + assertThat( + dataFile.path().toString(), either(containsString("id=1")).or(containsString("id=2"))); + } } }