Skip to content

Commit

Permalink
Add Java documentation to IcebergIO (#32621)
Browse files Browse the repository at this point in the history
* add documentation to IcebergIO's java class

* add example; trigger ITs

* nit
  • Loading branch information
ahmedabu98 authored Oct 2, 2024
1 parent d5b910c commit bb09de7
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
Expand All @@ -37,14 +39,243 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* The underlying Iceberg connector used by {@link org.apache.beam.sdk.managed.Managed#ICEBERG}. Not
* intended to be used directly.
* A connector that reads and writes to <a href="https://iceberg.apache.org/">Apache Iceberg</a>
* tables.
*
* <p>{@link IcebergIO} is offered as a {@link Managed} transform. This class is subject to change
* and should not be used directly. Instead, use it via {@link Managed#ICEBERG} like so:
*
* <pre>{@code
* Map<String, Object> config = Map.of(
* "table", table,
* "triggering_frequency_seconds", 5,
* "catalog_name", name,
* "catalog_properties", Map.of(
* "warehouse", warehouse_path,
* "catalog-impl", "org.apache.iceberg.hive.HiveCatalog"),
* "config_properties", Map.of(
* "hive.metastore.uris", metastore_uri));
*
* pipeline
* .apply(Create.of(BEAM_ROWS))
* .apply(Managed.write(ICEBERG).withConfig(config));
*
*
* // ====== READ ======
* pipeline
* .apply(Managed.read(ICEBERG).withConfig(config))
* .getSinglePCollection()
* .apply(ParDo.of(...));
* }</pre>
*
* <h3>Configuration Options</h3>
*
* <table border="1" cellspacing="1">
* <tr>
* <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td>
* </tr>
* <tr>
* <td> {@code table} </td> <td> {@code str} </td> <td> Required. A fully-qualified table identifier. You may also provide a
* template to use dynamic destinations (see the `Dynamic Destinations` section below for details). </td>
* </tr>
* <tr>
* <td> {@code triggering_frequency_seconds} </td> <td> {@code int} </td> <td> Required for streaming writes. Roughly every
* {@code triggering_frequency_seconds} duration, the sink will write records to data files and produce a table snapshot.
* Generally, a higher value will produce fewer, larger data files.
* </td>
* </tr>
* <tr>
* <td> {@code catalog_name} </td> <td> {@code str} </td> <td> The name of the catalog. Defaults to {@code apache-beam-<VERSION>}. </td>
* </tr>
* <tr>
* <td> {@code catalog_properties} </td> <td> {@code map<str, str>} </td> <td> A map of properties to be used when
* constructing the Iceberg catalog. Required properties will depend on what catalog you are using, but
* <a href="https://iceberg.apache.org/docs/latest/configuration/#catalog-properties">this list</a>
* is a good starting point. </td>
* </tr>
* <tr>
* <td> {@code config_properties} </td> <td> {@code map<str, str>} </td> <td> A map of properties
* to instantiate the catalog's Hadoop {@link Configuration}. Required properties will depend on your catalog
* implementation, but <a href="https://iceberg.apache.org/docs/latest/configuration/#hadoop-configuration">this list</a>
* is a good starting point.
* </tr>
* </table>
*
* <p><b>Additional configuration options are provided in the `Pre-filtering Options` section below,
* for Iceberg writes.</b>
*
* <h3>Beam Rows</h3>
*
* <p>Being a Managed transform, this IO exclusively writes and reads using Beam {@link Row}s.
* Conversion takes place between Beam {@link Row}s and Iceberg {@link Record}s using helper methods
* in {@link IcebergUtils}. Below is a type conversion table mapping Beam and Iceberg types:
*
* <table border="1" cellspacing="1">
* <tr>
* <td> <b>Beam {@link Schema.FieldType}</b> </td> <td> <b>Iceberg {@link Type}</b>
* </tr>
* <tr>
* <td> BYTES </td> <td> BINARY </td>
* </tr>
* <tr>
* <td> BOOLEAN </td> <td> BOOLEAN </td>
* </tr>
* <tr>
* <td> STRING </td> <td> STRING </td>
* </tr>
* <tr>
* <td> INT32 </td> <td> INTEGER </td>
* </tr>
* <tr>
* <td> INT64 </td> <td> LONG </td>
* </tr>
* <tr>
* <td> DECIMAL </td> <td> STRING </td>
* </tr>
* <tr>
* <td> FLOAT </td> <td> FLOAT </td>
* </tr>
* <tr>
* <td> DOUBLE </td> <td> DOUBLE </td>
* </tr>
* <tr>
* <td> DATETIME </td> <td> STRING </td>
* </tr>
* <tr>
* <td> ITERABLE </td> <td> LIST </td>
* </tr>
* <tr>
* <td> ARRAY </td> <td> LIST </td>
* </tr>
* <tr>
* <td> MAP </td> <td> MAP </td>
* </tr>
* <tr>
* <td> ROW </td> <td> STRUCT </td>
* </tr>
* </table>
*
* <h3>Dynamic Destinations</h3>
*
* <p>Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
* identifier template for the <b>{@code table}</b> parameter. A template should have placeholders
* represented as curly braces containing a record field name, e.g.: {@code
* "my_namespace.my_{foo}_table"}.
*
* <p>The sink uses simple String interpolation to determine a record's table destination. The
* placeholder is replaced with the record's field value. Nested fields can be specified using
* dot-notation (e.g. {@code "{top.middle.nested}"}).
*
* <h4>Pre-filtering Options</h4>
*
* <p>Some use cases may benefit from filtering record fields right before the write operation. For
* example, you may want to provide meta-data to guide records to the right destination, but not
* necessarily write that meta-data to your table. Some light-weight filtering options are provided
* to accommodate such cases, allowing you to control what actually gets written:
*
* <table border="1" cellspacing="1">
* <tr>
* <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td>
* </tr>
* <tr>
* <td>{@code drop}</td> <td>{@code list<str>}</td> <td>Drops the specified fields.</td>
* </tr>
* <tr>
* <td>{@code keep}</td> <td>{@code list<str>}</td> <td>Keeps the specified fields and drops the rest.</td>
* </tr>
* <tr>
* <td>{@code only}</td> <td>{@code str}</td> <td>Use this to specify a nested record you intend to write.
* That record wll be written and the rest will be dropped.</td>
* </tr>
* </table>
*
* <p>Example write to dynamic destinations (pseudocode):
*
* <pre>{@code
* Map<String, Object> config = Map.of(
* "table", "flights.{country}.{airport}",
* "catalog_properties", Map.of(...),
* "drop", ["country", "airport"]);
*
* JSON_ROWS = [
* // first record is written to table "flights.usa.RDU"
* "{\"country\": \"usa\"," +
* "\"airport\": \"RDU\"," +
* "\"flight_id\": \"AA356\"," +
* "\"destination\": \"SFO\"," +
* "\"meal\": \"chicken alfredo\"}",
* // second record is written to table "flights.qatar.HIA"
* "{\"country\": \"qatar\"," +
* "\"airport\": \"HIA\"," +
* "\"flight_id\": \"QR 875\"," +
* "\"destination\": \"DEL\"," +
* "\"meal\": \"shawarma\"}",
* ...
* ];
*
* // fields "country" and "airport" are dropped before
* // records are written to tables
* pipeline
* .apply(Create.of(JSON_ROWS))
* .apply(JsonToRow.withSchema(...))
* .apply(Managed.write(ICEBERG).withConfig(config));
*
* }</pre>
*
* <h3>Output Snapshots</h3>
*
* <p>When records are written and committed to a table, a snapshot is produced. A batch pipeline
* will perform a single commit and create a single snapshot per table. A streaming pipeline will
* produce a snapshot roughly according to the configured {@code triggering_frequency_seconds}.
*
* <p>You can access these snapshots and perform downstream processing by fetching the {@code
* "snapshots"} output PCollection:
*
* <pre>{@code
* pipeline
* .apply(Create.of(BEAM_ROWS))
* .apply(Managed.write(ICEBERG).withConfig(config))
* .get("snapshots")
* .apply(ParDo.of(new DoFn<Row, T> {...});
* }</pre>
*
* Each Snapshot is represented as a Beam Row, with the following Schema:
*
* <table border="1" cellspacing="1">
* <tr>
* <td> <b>Field</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td>
* </tr>
* <tr>
* <td> {@code table} </td> <td> {@code str} </td> <td> Table identifier. </td>
* </tr>
* <tr>
* <td> {@code manifest_list_location} </td> <td> {@code str} </td> <td> Location of the snapshot's manifest list. </td>
* </tr>
* <tr>
* <td> {@code operation} </td> <td> {@code str} </td> <td> Name of the operation that produced the snapshot. </td>
* </tr>
* <tr>
* <td> {@code parent_id} </td> <td> {@code long} </td> <td> The snapshot's parent ID. </td>
* </tr>
* <tr>
* <td> {@code schema_id} </td> <td> {@code int} </td> <td> The id of the schema used when the snapshot was created. </td>
* </tr>
* <tr>
* <td> {@code summary} </td> <td> {@code map<str, str>} </td> <td> A string map of summary data. </td>
* </tr>
* <tr>
* <td> {@code timestamp_millis} </td> <td> {@code long} </td> <td> The snapshot's timestamp in milliseconds. </td>
* </tr>
* </table>
*
* <p>For internal use only; no backwards compatibility guarantees
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class IcebergWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<Configuration> {

static final String INPUT_TAG = "input";
static final String OUTPUT_TAG = "output";
static final String SNAPSHOTS_TAG = "snapshots";

static final Schema OUTPUT_SCHEMA =
Schema.builder().addStringField("table").addFields(SnapshotInfo.SCHEMA.getFields()).build();
Expand Down Expand Up @@ -146,7 +146,7 @@ public List<String> inputCollectionNames() {

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
return Collections.singletonList(SNAPSHOTS_TAG);
}

@Override
Expand Down Expand Up @@ -204,7 +204,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.apply(MapElements.via(new SnapshotToRow()))
.setRowSchema(OUTPUT_SCHEMA);

return PCollectionRowTuple.of(OUTPUT_TAG, snapshots);
return PCollectionRowTuple.of(SNAPSHOTS_TAG, snapshots);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public Row toRow() {
return SchemaRegistry.createDefault()
.getToRowFunction(SnapshotInfo.class)
.apply(this)
.sorted();
.sorted()
.toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
Expand All @@ -69,7 +70,7 @@ public Row toRow() {
try {
SchemaRegistry registry = SchemaRegistry.createDefault();
CODER = registry.getSchemaCoder(SnapshotInfo.class);
SCHEMA = registry.getSchema(SnapshotInfo.class).sorted();
SCHEMA = registry.getSchema(SnapshotInfo.class).sorted().toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration;
import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.INPUT_TAG;
import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.OUTPUT_TAG;
import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.SNAPSHOTS_TAG;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -122,7 +122,7 @@ public void testSimpleAppend() {
PCollection<Row> result =
input
.apply("Append To Table", new IcebergWriteSchemaTransformProvider().from(config))
.get(OUTPUT_TAG);
.get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
Expand Down Expand Up @@ -154,7 +154,7 @@ public void testWriteUsingManagedTransform() {
.apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA));
PCollection<Row> result =
inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(configMap)).get(OUTPUT_TAG);
inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(configMap)).get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
Expand Down

0 comments on commit bb09de7

Please sign in to comment.