From e85df7942df028969c8d43c64ecb0261e4de65d3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Mon, 4 Nov 2024 12:41:52 +0300 Subject: [PATCH] Add Managed Iceberg example (#32678) * add iceberg example * runtime dependency * update to let the sink create tables * remove dependencies --- examples/java/build.gradle | 4 + .../cookbook/IcebergTaxiExamples.java | 119 ++++++++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java diff --git a/examples/java/build.gradle b/examples/java/build.gradle index af91fa83fe91..4f1902cf1679 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -66,6 +66,8 @@ dependencies { implementation project(":sdks:java:extensions:python") implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") + runtimeOnly project(":sdks:java:io:iceberg") + implementation project(":sdks:java:managed") implementation project(":sdks:java:extensions:ml") implementation library.java.avro implementation library.java.bigdataoss_util @@ -100,6 +102,8 @@ dependencies { implementation "org.apache.httpcomponents:httpcore:4.4.13" implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.1" implementation "com.fasterxml.jackson.core:jackson-core:2.14.1" + runtimeOnly library.java.hadoop_client + runtimeOnly library.java.bigdataoss_gcs_connector testImplementation project(path: ":runners:direct-java", configuration: "shadow") testImplementation project(":sdks:java:io:google-cloud-platform") testImplementation project(":sdks:java:extensions:ml") diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java new file mode 100644 index 000000000000..446d11d03be4 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/IcebergTaxiExamples.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.cookbook; + +import java.util.Arrays; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.Filter; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * Reads real-time NYC taxi ride information from {@code + * projects/pubsub-public-data/topics/taxirides-realtime} and writes to Iceberg tables using Beam's + * {@link Managed} IcebergIO sink. + * + *

This is a streaming pipeline that writes records to Iceberg tables dynamically, depending on + * each record's passenger count. New tables are created as needed. We set a triggering frequency of + * 10s; at around this interval, the sink will accumulate records and write them to the appropriate + * table, creating a new snapshot each time. + */ +public class IcebergTaxiExamples { + private static final String TAXI_RIDES_TOPIC = + "projects/pubsub-public-data/topics/taxirides-realtime"; + private static final Schema TAXI_RIDE_INFO_SCHEMA = + Schema.builder() + .addStringField("ride_id") + .addInt32Field("point_idx") + .addDoubleField("latitude") + .addDoubleField("longitude") + .addStringField("timestamp") + .addDoubleField("meter_reading") + .addDoubleField("meter_increment") + .addStringField("ride_status") + .addInt32Field("passenger_count") + .build(); + + public static void main(String[] args) { + IcebergPipelineOptions options = + PipelineOptionsFactory.fromArgs(args).as(IcebergPipelineOptions.class); + options.setProject("apache-beam-testing"); + + // each record's 'passenger_count' value will be substituted in to determine + // its final table destination + // e.g. an event with 3 passengers will be written to 'iceberg_taxi.3_passengers' + String tableIdentifierTemplate = "iceberg_taxi.{passenger_count}_passengers"; + + Map catalogProps = + ImmutableMap.builder() + .put("catalog-impl", options.getCatalogImpl()) + .put("warehouse", options.getWarehouse()) + .build(); + Map icebergWriteConfig = + ImmutableMap.builder() + .put("table", tableIdentifierTemplate) + .put("catalog_name", options.getCatalogName()) + .put("catalog_properties", catalogProps) + .put("triggering_frequency_seconds", 10) + // perform a final filter to only write these two columns + .put("keep", Arrays.asList("ride_id", "meter_reading")) + .build(); + + Pipeline p = Pipeline.create(options); + p + // Read taxi ride data + .apply(PubsubIO.readStrings().fromTopic(TAXI_RIDES_TOPIC)) + // Convert JSON strings to Beam Rows + .apply(JsonToRow.withSchema(TAXI_RIDE_INFO_SCHEMA)) + // Filter to only include drop-offs + .apply(Filter.create().whereFieldName("ride_status", "dropoff"::equals)) + // Write to Iceberg tables + .apply(Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig)); + p.run(); + } + + public interface IcebergPipelineOptions extends GcpOptions { + @Description("Warehouse location where the table's data will be written to.") + @Default.String("gs://apache-beam-samples/iceberg-examples") + String getWarehouse(); + + void setWarehouse(String warehouse); + + @Description("Fully-qualified name of the catalog class to use.") + @Default.String("org.apache.iceberg.hadoop.HadoopCatalog") + String getCatalogImpl(); + + void setCatalogImpl(String catalogName); + + @Validation.Required + @Default.String("example-catalog") + String getCatalogName(); + + void setCatalogName(String catalogName); + } +}