-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial Iceberg Sink #30797
Initial Iceberg Sink #30797
Conversation
R: @chamikaramj |
@@ -1151,7 +1151,7 @@ class BeamModulePlugin implements Plugin<Project> { | |||
options.compilerArgs += ([ | |||
'-parameters', | |||
'-Xlint:all', | |||
'-Werror' | |||
// '-Werror' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol I missed this one
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
201e6cb
to
a06a187
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
|
||
public static <ElementT, DestinationT> Write<ElementT, DestinationT> writeToDestinations( | ||
IcebergCatalog catalog, | ||
DynamicDestinations<ElementT, DestinationT> dynamicDestinations, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can strip dynamic destinations based on UDFs out and think about how to introduce dynamic destinations to this I/O in a portable way based on https://s.apache.org/portable-dynamic-destinations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left them in a little bit for abstraction, but it can be an implementation detail and IcebergIO.writeToDestinations(...)
can just take the string pattern. I haven't done that part yet. I was mostly getting the main body of the transform to only do Rows
return new Write<>(catalog, dynamicDestinations, toRecord); | ||
} | ||
|
||
public static TableFactory<String> forCatalog(final IcebergCatalog catalog) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to easily convert "IcebergCatalog" into a portable representation for SchemaTransforms ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBD. Leaving all "catalog" questions unresolved for this revision.
}; | ||
} | ||
|
||
public static class Write<ElementT, DestinationT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just limit this to PTransform<PCollection<Row>, IcebergWriteResult<Row>> to make this portability first and make it friendly for SchemaTransforms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (and even simpler)
extends PTransform< | ||
PCollection<KV<DestinationT, ElementT>>, IcebergWriteResult<DestinationT, ElementT>> { | ||
|
||
@VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any idea how we got to these defaults ? (if so we should document)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea. This number 20
must be just a guess. Some of the others appear to be BigQuery quota limitations that we can just ignore. One thing that we should do is that I read a lot online about ideal iceberg file size being 512mb (that's what some internal iceberg code does I guess) so perhaps we follow that. I'm still learning the iceberg Java APIs and the best way to use their best practices.
.get(successfulWritesTag) | ||
.setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder)); | ||
|
||
PCollection<KV<ShardedKey<DestinationT>, ElementT>> failedWrites = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the new DLQ framework instead ? (seems like this is following the old DLQ implementation in BQ).
New framework also considers portability aspects for example so it's more advantageous.
https://docs.google.com/document/d/1NGeCk6tOqF-TiGEAV7ixd_vhIiWz9sHPlCa1P_77Ajs/edit?tab=t.0#heading=h.fppublcudjbt
(can be a separate PR but we should remove the DLQ feature from this PR in that case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just left it out for now.
.setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder)); | ||
|
||
PCollection<Result<DestinationT>> writtenFilesGrouped = | ||
failedWrites |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what we are doing here. Are we trying to write failed records again and flatten with the originally written records (in the subsequent step below) ?
Possibly we should be writing failed records to a DLQ ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-reading the code, seems like failedWrites here are actually due to previous WriteBundlesToFiles exceeding any of the limits provided to the transform (DEFAULT_MAX_WRITERS_PER_BUNDLE, DEFAULT_MAX_BYTES_PER_FILE). We group known set of spilled over records and write in the subsequent transform which makes sense. We should probably change 'failedWrites' to 'spilledOverWrites'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have now totally refactored this and renamed everything. Thanks for your description; it helped a lot to understand how to organize it.
ORC | ||
} | ||
|
||
public static class MetadataUpdates<IdentifierT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prob rename to MetadataUpdateDoFn
for clarify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, but I still need to refactor this out anyhow.
})) | ||
.setCoder(KvCoder.of(StringUtf8Coder.of(), MetadataUpdate.coder())) | ||
.apply(GroupByKey.create()) | ||
.apply("Write Metadata Updates", ParDo.of(new MetadataUpdates<>(tableFactory))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably this should be followed up by another GBK and a cleanup step that deletes temp files (of this step and any failed work items).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(unresolved)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh and btw the files are not tmp. They become part of the table. So it is simpler than the BQ equivalent.
import org.apache.iceberg.Table; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
public abstract class DynamicDestinations<T, DestinationT> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this has a lot of copied over logic from BQ dynamic destinations which probably we can simplify/change if we went with the new DLQ framework.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. I removed actually all the logic and just do something extremely basic for now. I guess DLQ could be update-incompatible change so I better get that done really quick too.
public abstract long getAuthSessionTimeoutMillis(); | ||
|
||
@Pure | ||
public abstract @Nullable Configuration getConfiguration(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like org.apache.hadoop.conf.Configuration
is a set of string key value pairs.
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/conf/Configuration.html
May be we should just accept a org.apache.hadoop.conf.Configuration
and build the Hadoop Configuration to make this more portability friendly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Leaving this unresolved as I did not get to this yet.
.setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder)); | ||
|
||
PCollection<Result<DestinationT>> writtenFilesGrouped = | ||
failedWrites |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-reading the code, seems like failedWrites here are actually due to previous WriteBundlesToFiles exceeding any of the limits provided to the transform (DEFAULT_MAX_WRITERS_PER_BUNDLE, DEFAULT_MAX_BYTES_PER_FILE). We group known set of spilled over records and write in the subsequent transform which makes sense. We should probably change 'failedWrites' to 'spilledOverWrites'.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #30797 +/- ##
=======================================
Coverage 71.47% 71.47%
=======================================
Files 710 710
Lines 104815 104815
=======================================
Hits 74915 74915
Misses 28268 28268
Partials 1632 1632 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I did a major revision to clarify things and streamline the main logic around writing rows. Still need another major revision to address the remaining non-portable pieces and DLQ.
|
||
public static <ElementT, DestinationT> Write<ElementT, DestinationT> writeToDestinations( | ||
IcebergCatalog catalog, | ||
DynamicDestinations<ElementT, DestinationT> dynamicDestinations, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left them in a little bit for abstraction, but it can be an implementation detail and IcebergIO.writeToDestinations(...)
can just take the string pattern. I haven't done that part yet. I was mostly getting the main body of the transform to only do Rows
import org.apache.iceberg.Table; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
public abstract class DynamicDestinations<T, DestinationT> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. I removed actually all the logic and just do something extremely basic for now. I guess DLQ could be update-incompatible change so I better get that done really quick too.
public abstract long getAuthSessionTimeoutMillis(); | ||
|
||
@Pure | ||
public abstract @Nullable Configuration getConfiguration(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Leaving this unresolved as I did not get to this yet.
return new Write<>(catalog, dynamicDestinations, toRecord); | ||
} | ||
|
||
public static TableFactory<String> forCatalog(final IcebergCatalog catalog) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBD. Leaving all "catalog" questions unresolved for this revision.
}; | ||
} | ||
|
||
public static class Write<ElementT, DestinationT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (and even simpler)
extends PTransform< | ||
PCollection<KV<DestinationT, ElementT>>, IcebergWriteResult<DestinationT, ElementT>> { | ||
|
||
@VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea. This number 20
must be just a guess. Some of the others appear to be BigQuery quota limitations that we can just ignore. One thing that we should do is that I read a lot online about ideal iceberg file size being 512mb (that's what some internal iceberg code does I guess) so perhaps we follow that. I'm still learning the iceberg Java APIs and the best way to use their best practices.
.get(successfulWritesTag) | ||
.setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder)); | ||
|
||
PCollection<KV<ShardedKey<DestinationT>, ElementT>> failedWrites = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just left it out for now.
.setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder)); | ||
|
||
PCollection<Result<DestinationT>> writtenFilesGrouped = | ||
failedWrites |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have now totally refactored this and renamed everything. Thanks for your description; it helped a lot to understand how to organize it.
})) | ||
.setCoder(KvCoder.of(StringUtf8Coder.of(), MetadataUpdate.coder())) | ||
.apply(GroupByKey.create()) | ||
.apply("Write Metadata Updates", ParDo.of(new MetadataUpdates<>(tableFactory))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(unresolved)
ORC | ||
} | ||
|
||
public static class MetadataUpdates<IdentifierT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, but I still need to refactor this out anyhow.
0ccdf45
to
5af12aa
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #30797 +/- ##
=============================================
- Coverage 70.95% 0 -70.96%
=============================================
Files 1257 0 -1257
Lines 140939 0 -140939
Branches 4307 0 -4307
=============================================
- Hits 100004 0 -100004
+ Misses 37456 0 -37456
+ Partials 3479 0 -3479
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
OK I have done a whole massive revision and tested it a little bit more. The only piece that I have not revised is the |
It looks like this might work: https://github.com/tabular-io/iceberg-kafka-connect/blob/5ab5c538efab9ccf3cde166f36ba34189eed7187/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java#L256 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Looks great and almost there!
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java
Show resolved
Hide resolved
return IcebergDestination.builder() | ||
.setTableIdentifier(getTableIdentifier()) | ||
.setTableCreateConfig(null) | ||
.setFileFormat(FileFormat.PARQUET) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is not configurable, let's document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be configurable. In testing, I have discovered that the ORC codepath doesn't work so I've changed it to throw.
return input.getTableIdentifier().toString(); | ||
} | ||
})) | ||
// .setCoder(KvCoder.of(StringUtf8Coder.of(), new MetadataUpdate.MetadataUpdateCoder())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uncomment or delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
for (FileWriteResult writtenFile : element.getValue()) { | ||
update.appendFile(writtenFile.getDataFile()); | ||
} | ||
update.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this update be atomic for all files ?
In so, we might have to push this to a separate step behind a shuffle.
The key question is what will happen if the step fails after writing some of the elements and gets retried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the files per destination are grouped into a single atomic commit. There are two things that could go wrong:
- Failure after the commit but before downstream processing, so a new transaction will try to append the same files. I verified that this is idempotent (and I included it as a unit test just to clarify).
- Some tables successfully commit but then there are enough failures that the pipeline itself fails. We probably can do a multi-table transaction. We would write the various files all to a manifest and then merge to a single thread and commit all the manifests at once. We don't do this for other sinks, do we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, (2) is fine. It's more about making sure that we don't double write if a work item fails. But if writing is idempotent it's simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to be late on this, I just wondering if we would not need a kind of "commit coordinator" to be sure we have one commit at a time: if we have concurrent commits, it could be problematic in Iceberg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not that familiar with the iceberg libraries. I was under the impression that the optimistic concurrency protocol was handled by them (https://iceberg.apache.org/docs/1.5.2/reliability/#concurrent-write-operations and on filesystem tables described by https://iceberg.apache.org/spec/#file-system-tables).
public abstract FileWriteResult build(); | ||
} | ||
|
||
public static class FileWriteResultCoder extends StructuredCoder<FileWriteResult> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure that this is covered by unit testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, somewhat. Could use some data generators to thoroughly test.
case DOUBLE: | ||
Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v)); | ||
break; | ||
case DATE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these types not supported ?
If so we should fail instead of dropping ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
omg yes. haha I didn't notice this. Fixed - added some more support and testing for some types, and throw for the other ones that are not yet supported. We will want to fast-follow with support, but some of the date semantics are unclear to me. (like an iceberg DATE is stored as a Long but I'm not sure exactly what it represents)
return FieldType.DATETIME; | ||
case STRING: | ||
return FieldType.STRING; | ||
case UUID: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UUID is BYTES not STRING ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea it is a Java UUID
which contains a byte[]
.
8e4c12e
to
0a0899a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM.
break; | ||
case ORC: | ||
throw new UnsupportedOperationException("ORC file format not currently supported."); | ||
// icebergDataWriter = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
settings.gradle.kts
Outdated
@@ -355,3 +355,7 @@ include("sdks:java:io:kafka:kafka-01103") | |||
findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" | |||
include("sdks:java:managed") | |||
findProject(":sdks:java:managed")?.name = "managed" | |||
include("sdks:java:io:iceberg") | |||
findProject(":sdks:java:io:iceberg")?.name = "iceberg" | |||
include("sdks:java:io:catalog") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like we add anything under "sdks:java:io:catalog".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
- remove Read path (will propose separately) - re-enable checking, fix type errors - some style adjustments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the review!
settings.gradle.kts
Outdated
@@ -355,3 +355,7 @@ include("sdks:java:io:kafka:kafka-01103") | |||
findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" | |||
include("sdks:java:managed") | |||
findProject(":sdks:java:managed")?.name = "managed" | |||
include("sdks:java:io:iceberg") | |||
findProject(":sdks:java:io:iceberg")?.name = "iceberg" | |||
include("sdks:java:io:catalog") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
break; | ||
case ORC: | ||
throw new UnsupportedOperationException("ORC file format not currently supported."); | ||
// icebergDataWriter = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
0a0899a
to
a7a6515
Compare
Hello, could you pls kindly update the below docwith the merged implementation?
|
This is a basic Iceberg sink. Somewhat in the style of BigQuery file loads:
And how it works, roughly:
I'm a bit of an Iceberg newb. Byron did the first draft and I just refactored and added some stuff to it. This has some small tests but needs integration tests and larger tests. It is a starting point for integrating with @ahmedabu98's work on managed transforms.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.