Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: IcebergIO opens a writer using the table's schema, which can cause data loss #32050

Closed
2 of 17 tasks
ahmedabu98 opened this issue Aug 1, 2024 · 2 comments · Fixed by #32095
Closed
2 of 17 tasks

Comments

@ahmedabu98
Copy link
Contributor

What happened?

I'm experimenting with HiveCatalog and noticed that when writing data with nested records, there is data loss. Specifically, the nested records are never committed to the table. I read back null values instead. I can confirm this happens even without our Beam library:

    TableIdentifier tableIdentifier =
        TableIdentifier.parse(String.format("%s.%s", TEST_DB, TEST_TABLE));
    org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(ROW_SCHEMA);
    Table table = catalog.createTable(tableIdentifier, icebergSchema);

    // write
    List<Record> records = LongStream.range(1, 10).boxed().map(l -> IcebergUtils.beamRowToIcebergRecord(icebergSchema, ROW_FUNC.apply(l))).collect(Collectors.toList());
    String filepath = table.location() + "/" + UUID.randomUUID();
    OutputFile file = table.io().newOutputFile(filepath);
    DataWriter<Record> writer =
            Parquet.writeData(file)
//                    .schema(table.schema())    <---- xxxx this is the problematic line xxxx
                    .schema(icebergSchema)
                    .createWriterFunc(GenericParquetWriter::buildWriter)
                    .overwrite()
                    .withSpec(table.spec())
                    .build();
    for (Record rec: records) {
      System.out.println("xxx writing: " + rec);
      writer.write(rec);
    }
    writer.close();
    AppendFiles appendFiles = table.newAppend();
    String manifestFilename = FileFormat.AVRO.addExtension(filepath + ".manifest");
    OutputFile outputFile = table.io().newOutputFile(manifestFilename);
    ManifestWriter<DataFile> manifestWriter;
    try (ManifestWriter<DataFile> openWriter = ManifestFiles.write(table.spec(), outputFile)) {
      openWriter.add(writer.toDataFile());
      manifestWriter = openWriter;
    }
    appendFiles.appendManifest(manifestWriter.toManifestFile());
    appendFiles.commit();

    
    // read
    table = catalog.loadTable(tableIdentifier);
    TableScan tableScan = table.newScan().project(icebergSchema);
    for (CombinedScanTask task : tableScan.planTasks()) {
      InputFilesDecryptor decryptor = new InputFilesDecryptor(task, table.io(), table.encryption());
      for (FileScanTask fileTask : task.files()) {
        InputFile inputFile = decryptor.getInputFile(fileTask);
        CloseableIterable<Record> iterable =
                Parquet.read(inputFile)
                        .split(fileTask.start(), fileTask.length())
                        .project(icebergSchema)
                        .createReaderFunc(
                                fileSchema -> GenericParquetReaders.buildReader(icebergSchema, fileSchema))
                        .filter(fileTask.residual())
                        .build();

        for (Record rec : iterable) {
          System.out.println("xxx reading: " + rec);
        }
      }
    }

I've tried the same with HadoopCatalog and everything seems to be working fine. I'm not sure why I'm seeing this error with HiveCatalog. There may be a gimmick in how the catalog fetches and returns the table schema.

I believe in this case we shouldn't rely on the catalog, and instead create our writers using the schema of the records in our PCollection. ie. line 70 here:

case PARQUET:
icebergDataWriter =
Parquet.writeData(outputFile)
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(table.schema())

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@ahmedabu98
Copy link
Contributor Author

I see similar behavior when using the BigQuery Metastore catalog

@ahmedabu98
Copy link
Contributor Author

Update: I realized this is actually due to a bug in our Iceberg utils. Fixing in #32095

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant