Skip to content

Commit

Permalink
[Managed Iceberg] Allow updating partition specs during pipeline runt…
Browse files Browse the repository at this point in the history
…ime (#32879)

* allowed updating partition specs at runtime

* add to changes md

* add to changes md

* trigger iceberg integration tests

* refresh cached tables; split multiple partition specs into separate manifest files

* add test

* address comment

* clarify changes comment
  • Loading branch information
ahmedabu98 authored Nov 5, 2024
1 parent 9baa7ba commit 689af5b
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495))
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))
* BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527))
* [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
*/
package org.apache.beam.sdk.io.iceberg;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.metrics.Counter;
Expand All @@ -29,14 +35,21 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,9 +58,11 @@ class AppendFilesToTables
extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, SnapshotInfo>>> {
private static final Logger LOG = LoggerFactory.getLogger(AppendFilesToTables.class);
private final IcebergCatalogConfig catalogConfig;
private final String manifestFilePrefix;

AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
AppendFilesToTables(IcebergCatalogConfig catalogConfig, String manifestFilePrefix) {
this.catalogConfig = catalogConfig;
this.manifestFilePrefix = manifestFilePrefix;
}

@Override
Expand All @@ -67,27 +82,27 @@ public String apply(FileWriteResult input) {
.apply("Group metadata updates by table", GroupByKey.create())
.apply(
"Append metadata updates to tables",
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig)))
ParDo.of(new AppendFilesToTablesDoFn(catalogConfig, manifestFilePrefix)))
.setCoder(KvCoder.of(StringUtf8Coder.of(), SnapshotInfo.CODER));
}

private static class AppendFilesToTablesDoFn
extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, SnapshotInfo>> {
private final Counter snapshotsCreated =
Metrics.counter(AppendFilesToTables.class, "snapshotsCreated");
private final Counter dataFilesCommitted =
Metrics.counter(AppendFilesToTables.class, "dataFilesCommitted");
private final Distribution committedDataFileByteSize =
Metrics.distribution(RecordWriter.class, "committedDataFileByteSize");
private final Distribution committedDataFileRecordCount =
Metrics.distribution(RecordWriter.class, "committedDataFileRecordCount");

private final IcebergCatalogConfig catalogConfig;
private final String manifestFilePrefix;

private transient @MonotonicNonNull Catalog catalog;

private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) {
private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig, String manifestFilePrefix) {
this.catalogConfig = catalogConfig;
this.manifestFilePrefix = manifestFilePrefix;
}

private Catalog getCatalog() {
Expand All @@ -97,36 +112,104 @@ private Catalog getCatalog() {
return catalog;
}

private boolean containsMultiplePartitionSpecs(Iterable<FileWriteResult> fileWriteResults) {
int id = fileWriteResults.iterator().next().getSerializableDataFile().getPartitionSpecId();
for (FileWriteResult result : fileWriteResults) {
if (id != result.getSerializableDataFile().getPartitionSpecId()) {
return true;
}
}
return false;
}

@ProcessElement
public void processElement(
@Element KV<String, Iterable<FileWriteResult>> element,
OutputReceiver<KV<String, SnapshotInfo>> out,
BoundedWindow window) {
BoundedWindow window)
throws IOException {
String tableStringIdentifier = element.getKey();
Iterable<FileWriteResult> fileWriteResults = element.getValue();
if (!fileWriteResults.iterator().hasNext()) {
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));

// vast majority of the time, we will simply append data files.
// in the rare case we get a batch that contains multiple partition specs, we will group
// data into manifest files and append.
// note: either way, we must use a single commit operation for atomicity.
if (containsMultiplePartitionSpecs(fileWriteResults)) {
appendManifestFiles(table, fileWriteResults);
} else {
appendDataFiles(table, fileWriteResults);
}

Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
}

// This works only when all files are using the same partition spec.
private void appendDataFiles(Table table, Iterable<FileWriteResult> fileWriteResults) {
AppendFiles update = table.newAppend();
long numFiles = 0;
for (FileWriteResult result : fileWriteResults) {
DataFile dataFile = result.getDataFile(table.spec());
DataFile dataFile = result.getDataFile(table.specs());
update.appendFile(dataFile);
committedDataFileByteSize.update(dataFile.fileSizeInBytes());
committedDataFileRecordCount.update(dataFile.recordCount());
numFiles++;
}
// this commit will create a ManifestFile. we don't need to manually create one.
update.commit();
dataFilesCommitted.inc(numFiles);
}

Snapshot snapshot = table.currentSnapshot();
LOG.info("Created new snapshot for table '{}': {}", tableStringIdentifier, snapshot);
snapshotsCreated.inc();
out.outputWithTimestamp(
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());
// When a user updates their table partition spec during runtime, we can end up with
// a batch of files where some are written with the old spec and some are written with the new
// spec.
// A table commit is limited to a single partition spec.
// To handle this, we create a manifest file for each partition spec, and group data files
// accordingly.
// Afterward, we append all manifests using a single commit operation.
private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWriteResults)
throws IOException {
String uuid = UUID.randomUUID().toString();
Map<Integer, PartitionSpec> specs = table.specs();

Map<Integer, List<DataFile>> dataFilesBySpec = new HashMap<>();
for (FileWriteResult result : fileWriteResults) {
DataFile dataFile = result.getDataFile(specs);
dataFilesBySpec.computeIfAbsent(dataFile.specId(), i -> new ArrayList<>()).add(dataFile);
}

AppendFiles update = table.newAppend();
for (Map.Entry<Integer, List<DataFile>> entry : dataFilesBySpec.entrySet()) {
int specId = entry.getKey();
List<DataFile> files = entry.getValue();
PartitionSpec spec = Preconditions.checkStateNotNull(specs.get(specId));
ManifestWriter<DataFile> writer =
createManifestWriter(table.location(), uuid, spec, table.io());
for (DataFile file : files) {
writer.add(file);
committedDataFileByteSize.update(file.fileSizeInBytes());
committedDataFileRecordCount.update(file.recordCount());
}
writer.close();
update.appendManifest(writer.toManifestFile());
}
update.commit();
}

private ManifestWriter<DataFile> createManifestWriter(
String tableLocation, String uuid, PartitionSpec spec, FileIO io) {
String location =
FileFormat.AVRO.addExtension(
String.format(
"%s/metadata/%s-%s-%s.manifest",
tableLocation, manifestFilePrefix, uuid, spec.specId()));
OutputFile outputFile = io.newOutputFile(location);
return ManifestFiles.write(spec, outputFile);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.iceberg;

import com.google.auto.value.AutoValue;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
Expand Down Expand Up @@ -46,9 +47,9 @@ public TableIdentifier getTableIdentifier() {
}

@SchemaIgnore
public DataFile getDataFile(PartitionSpec spec) {
public DataFile getDataFile(Map<Integer, PartitionSpec> specs) {
if (cachedDataFile == null) {
cachedDataFile = getSerializableDataFile().createDataFile(spec);
cachedDataFile = getSerializableDataFile().createDataFile(specs);
}
return cachedDataFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.Preconditions;
Expand Down Expand Up @@ -195,7 +194,9 @@ private RecordWriter createWriter(PartitionKey partitionKey) {

private final Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
totalSerializableDataFiles = Maps.newHashMap();
private static final Cache<TableIdentifier, Table> TABLE_CACHE =

@VisibleForTesting
static final Cache<TableIdentifier, Table> TABLE_CACHE =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();

private boolean isClosed = false;
Expand All @@ -221,22 +222,28 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
if (table == null) {
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
synchronized (TABLE_CACHE) {
try {
org.apache.iceberg.Schema tableSchema =
IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException alreadyExistsException) {
// handle race condition where workers are concurrently creating the same table.
// if running into already exists exception, we perform one last load
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
try {
org.apache.iceberg.Schema tableSchema =
IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException alreadyExistsException) {
// handle race condition where workers are concurrently creating the same table.
// if running into already exists exception, we perform one last load
table = catalog.loadTable(identifier);
}
}
TABLE_CACHE.put(identifier, table);
}
TABLE_CACHE.put(identifier, table);
} else {
// If fetching from cache, refresh the table to avoid working with stale metadata
// (e.g. partition spec)
table.refresh();
}
return table;
}
Expand All @@ -254,15 +261,7 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
icebergDestination,
destination -> {
TableIdentifier identifier = destination.getValue().getTableIdentifier();
Table table;
try {
table =
TABLE_CACHE.get(
identifier, () -> getOrCreateTable(identifier, row.getSchema()));
} catch (ExecutionException e) {
throw new RuntimeException(
"Error while fetching or creating table: " + identifier, e);
}
Table table = getOrCreateTable(identifier, row.getSchema());
return new DestinationState(destination.getValue(), table);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -141,12 +142,14 @@ static SerializableDataFile from(DataFile f, PartitionKey key) {
* it from Beam-compatible types.
*/
@SuppressWarnings("nullness")
DataFile createDataFile(PartitionSpec partitionSpec) {
Preconditions.checkState(
partitionSpec.specId() == getPartitionSpecId(),
"Invalid partition spec id '%s'. This DataFile was originally created with spec id '%s'.",
partitionSpec.specId(),
getPartitionSpecId());
DataFile createDataFile(Map<Integer, PartitionSpec> partitionSpecs) {
PartitionSpec partitionSpec =
checkStateNotNull(
partitionSpecs.get(getPartitionSpecId()),
"This DataFile was originally created with spec id '%s'. Could not find "
+ "this among table's partition specs: %s.",
getPartitionSpecId(),
partitionSpecs.keySet());

Metrics dataFileMetrics =
new Metrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public IcebergWriteResult expand(PCollection<KV<String, Row>> input) {

// Commit files to tables
PCollection<KV<String, SnapshotInfo>> snapshots =
writtenFiles.apply(new AppendFilesToTables(catalogConfig));
writtenFiles.apply(new AppendFilesToTables(catalogConfig, filePrefix));

return new IcebergWriteResult(input.getPipeline(), snapshots);
}
Expand Down
Loading

0 comments on commit 689af5b

Please sign in to comment.