Skip to content

Commit

Permalink
[Managed Iceberg] bubble up exceptions due to writer close (#32940)
Browse files Browse the repository at this point in the history
* throw suppressed cache exceptions

* add test
  • Loading branch information
ahmedabu98 authored Nov 5, 2024
1 parent 689af5b commit 738a76d
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,8 @@ public long bytesWritten() {
public DataFile getDataFile() {
return icebergDataWriter.toDataFile();
}

public String path() {
return absoluteFilename;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class DestinationState {
final Cache<PartitionKey, RecordWriter> writers;
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();
private final List<Exception> exceptions = Lists.newArrayList();

DestinationState(IcebergDestination icebergDestination, Table table) {
this.icebergDestination = icebergDestination;
Expand All @@ -112,11 +113,14 @@ class DestinationState {
try {
recordWriter.close();
} catch (IOException e) {
throw new RuntimeException(
String.format(
"Encountered an error when closing data writer for table '%s', partition %s",
icebergDestination.getTableIdentifier(), pk),
e);
RuntimeException rethrow =
new RuntimeException(
String.format(
"Encountered an error when closing data writer for table '%s', path: %s",
icebergDestination.getTableIdentifier(), recordWriter.path()),
e);
exceptions.add(rethrow);
throw rethrow;
}
openWriters--;
dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk));
Expand Down Expand Up @@ -282,6 +286,17 @@ public void close() throws IOException {
// removing writers from the state's cache will trigger the logic to collect each writer's
// data file.
state.writers.invalidateAll();
// first check for any exceptions swallowed by the cache
if (!state.exceptions.isEmpty()) {
IllegalStateException exception =
new IllegalStateException(
String.format("Encountered %s failed writer(s).", state.exceptions.size()));
for (Exception e : state.exceptions) {
exception.addSuppressed(e);
}
throw exception;
}

if (state.dataFiles.isEmpty()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -412,4 +414,62 @@ public void testWriterKeepsUpWithUpdatingPartitionSpec() throws IOException {
dataFile.path().toString(), either(containsString("id=1")).or(containsString("id=2")));
}
}

@Rule public ExpectedException thrown = ExpectedException.none();

@Test
public void testWriterExceptionGetsCaught() throws IOException {
RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 2);
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "abcdef", true).build();
PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA);
partitionKey.partition(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));

writerManager.write(windowedDestination, row);

RecordWriterManager.DestinationState state =
writerManager.destinations.get(windowedDestination);
// replace with a failing record writer
FailingRecordWriter failingWriter =
new FailingRecordWriter(
catalog, windowedDestination.getValue(), "test_failing_writer", partitionKey);
state.writers.put(partitionKey, failingWriter);
writerManager.write(windowedDestination, row);

// this tests that we indeed enter the catch block
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Encountered 1 failed writer(s)");
try {
writerManager.close();
} catch (IllegalStateException e) {
// fetch underlying exceptions and validate
Throwable[] underlyingExceptions = e.getSuppressed();
assertEquals(1, underlyingExceptions.length);
for (Throwable t : underlyingExceptions) {
assertThat(
t.getMessage(),
containsString("Encountered an error when closing data writer for table"));
assertThat(
t.getMessage(),
containsString(windowedDestination.getValue().getTableIdentifier().toString()));
assertThat(t.getMessage(), containsString(failingWriter.path()));
Throwable realCause = t.getCause();
assertEquals("I am failing!", realCause.getMessage());
}

throw e;
}
}

static class FailingRecordWriter extends RecordWriter {
FailingRecordWriter(
Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey)
throws IOException {
super(catalog, destination, filename, partitionKey);
}

@Override
public void close() throws IOException {
throw new IOException("I am failing!");
}
}
}

0 comments on commit 738a76d

Please sign in to comment.