Skip to content

Commit

Permalink
bug(bigtable): Handle default values (#1940)
Browse files Browse the repository at this point in the history
* bug(bigtable): Handle default values

* bug(bigtable): Handle default values

* small fix

* Make output directory required

* Make output directory required

* Update docs

* Update test

* Update test
  • Loading branch information
ron-gal authored Oct 16, 2024
1 parent 583a695 commit 0b145d7
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 47 deletions.
16 changes: 8 additions & 8 deletions v1/README_Cloud_Bigtable_to_GCS_Json.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **bigtableProjectId** : The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.
* **bigtableInstanceId** : The ID of the Bigtable instance that contains the table.
* **bigtableTableId** : The ID of the Bigtable table to read from.
* **filenamePrefix** : The prefix of the JSON file name. For example, "table1-". If no value is provided, defaults to `part`.
* **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/).

### Optional parameters

* **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/).
* **filenamePrefix** : The prefix of the JSON file name. For example, "table1-". If no value is provided, defaults to `part`.
* **userOption** : Possible values are `FLATTEN` or `NONE`. `FLATTEN` flattens the row to the single level. `NONE` stores the whole row as a JSON string. Defaults to `NONE`.
* **columnsAliases** : A comma-separated list of columns that are required for the Vertex AI Vector Search index. The columns `id` and `embedding` are required for Vertex AI Vector Search. You can use the notation `fromfamily:fromcolumn;to`. For example, if the columns are `rowkey` and `cf:my_embedding`, where `rowkey` has a different name than the embedding column, specify `cf:my_embedding;embedding` and, `rowkey;id`. Only use this option when the value for `userOption` is `FLATTEN`.
* **bigtableAppProfileId** : The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.
Expand Down Expand Up @@ -111,10 +111,10 @@ export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/Cloud_Bigtable_to_GCS_
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part
export USER_OPTION=NONE
export COLUMNS_ALIASES=<columnsAliases>
export BIGTABLE_APP_PROFILE_ID=default
Expand Down Expand Up @@ -152,10 +152,10 @@ export REGION=us-central1
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part
export USER_OPTION=NONE
export COLUMNS_ALIASES=<columnsAliases>
export BIGTABLE_APP_PROFILE_ID=default
Expand Down Expand Up @@ -216,8 +216,8 @@ resource "google_dataflow_job" "cloud_bigtable_to_gcs_json" {
bigtableProjectId = "<bigtableProjectId>"
bigtableInstanceId = "<bigtableInstanceId>"
bigtableTableId = "<bigtableTableId>"
filenamePrefix = "part"
# outputDirectory = "gs://your-bucket/your-path/"
outputDirectory = "gs://your-bucket/your-path/"
# filenamePrefix = "part"
# userOption = "NONE"
# columnsAliases = "<columnsAliases>"
# bigtableAppProfileId = "default"
Expand Down
16 changes: 8 additions & 8 deletions v1/README_Cloud_Bigtable_to_Vector_Embeddings.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **bigtableProjectId** : The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.
* **bigtableInstanceId** : The ID of the Bigtable instance that contains the table.
* **bigtableTableId** : The ID of the Bigtable table to read from.
* **filenamePrefix** : The prefix of the JSON filename. For example: "table1-". If no value is provided, defaults to "part".
* **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/).
* **idColumn** : The fully qualified column name where the ID is stored. In the format cf:col or _key.
* **embeddingColumn** : The fully qualified column name where the embeddings are stored. In the format cf:col or _key.

### Optional parameters

* **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/).
* **filenamePrefix** : The prefix of the JSON filename. For example: "table1-". If no value is provided, defaults to "part".
* **crowdingTagColumn** : The fully qualified column name where the crowding tag is stored. In the format cf:col or _key.
* **embeddingByteSize** : The byte size of each entry in the embeddings array. For float, use the value 4. For double, use the value 8. Defaults to 4.
* **allowRestrictsMappings** : The comma-separated, fully qualified column names for the columns to use as the allow restricts, with their aliases. In the format cf:col->alias.
Expand Down Expand Up @@ -119,12 +119,12 @@ export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/Cloud_Bigtable_to_Vect
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>
export ID_COLUMN=<idColumn>
export EMBEDDING_COLUMN=<embeddingColumn>

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part
export CROWDING_TAG_COLUMN=<crowdingTagColumn>
export EMBEDDING_BYTE_SIZE=4
export ALLOW_RESTRICTS_MAPPINGS=<allowRestrictsMappings>
Expand Down Expand Up @@ -174,12 +174,12 @@ export REGION=us-central1
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>
export ID_COLUMN=<idColumn>
export EMBEDDING_COLUMN=<embeddingColumn>

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part
export CROWDING_TAG_COLUMN=<crowdingTagColumn>
export EMBEDDING_BYTE_SIZE=4
export ALLOW_RESTRICTS_MAPPINGS=<allowRestrictsMappings>
Expand Down Expand Up @@ -245,10 +245,10 @@ resource "google_dataflow_job" "cloud_bigtable_to_vector_embeddings" {
bigtableProjectId = "<bigtableProjectId>"
bigtableInstanceId = "<bigtableInstanceId>"
bigtableTableId = "<bigtableTableId>"
filenamePrefix = "part"
outputDirectory = "gs://your-bucket/your-path/"
idColumn = "<idColumn>"
embeddingColumn = "<embeddingColumn>"
# outputDirectory = "gs://your-bucket/your-path/"
# filenamePrefix = "part"
# crowdingTagColumn = "<crowdingTagColumn>"
# embeddingByteSize = "4"
# allowRestrictsMappings = "<allowRestrictsMappings>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.StringWriter;
Expand All @@ -32,11 +34,14 @@
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -108,10 +113,10 @@ public interface Options extends PipelineOptions {
@TemplateParameter.GcsWriteFolder(
order = 4,
groupName = "Target",
optional = true,
description = "Cloud Storage directory for storing JSON files",
helpText = "The Cloud Storage path where the output JSON files are stored.",
example = "gs://your-bucket/your-path/")
@Required
ValueProvider<String> getOutputDirectory();

@SuppressWarnings("unused")
Expand All @@ -120,6 +125,7 @@ public interface Options extends PipelineOptions {
@TemplateParameter.Text(
order = 5,
groupName = "Target",
optional = true,
description = "JSON file prefix",
helpText =
"The prefix of the JSON file name. For example, \"table1-\". If no value is provided, defaults to `part`.")
Expand Down Expand Up @@ -208,28 +214,27 @@ public static PipelineResult run(Options options) {
read = read.withoutValidation();
}

// Concatenating cloud storage folder with file prefix to get complete path
ValueProvider<String> outputFilePrefix = options.getFilenamePrefix();

ValueProvider<String> outputFilePathWithPrefix =
ValueProvider.NestedValueProvider.of(
ValueProvider<String> filePathPrefix =
DualInputNestedValueProvider.of(
options.getOutputDirectory(),
(SerializableFunction<String, String>)
folder -> {
if (!folder.endsWith("/")) {
// Appending the slash if not provided by user
folder = folder + "/";
}
return folder + outputFilePrefix.get();
});
options.getFilenamePrefix(),
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
return FileSystems.matchNewResource(input.getX(), true)
.resolve(input.getY(), StandardResolveOptions.RESOLVE_FILE)
.toString();
}
});

String userOption = options.getUserOption();
pipeline
.apply("Read from Bigtable", read)
.apply(
"Transform to JSON",
MapElements.via(
new BigtableToJsonFn(userOption.equals("FLATTEN"), options.getColumnsAliases())))
.apply("Write to storage", TextIO.write().to(outputFilePathWithPrefix).withSuffix(".json"));
.apply("Write to storage", TextIO.write().to(filePathPrefix).withSuffix(".json"));

return pipeline.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.gson.stream.JsonWriter;
import com.google.protobuf.ByteString;
import java.io.IOException;
Expand All @@ -36,11 +38,14 @@
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -114,10 +119,10 @@ public interface Options extends PipelineOptions {
@TemplateParameter.GcsWriteFolder(
order = 4,
groupName = "Target",
optional = true,
description = "Cloud Storage directory for storing JSON files",
helpText = "The Cloud Storage path where the output JSON files are stored.",
example = "gs://your-bucket/your-path/")
@Required
ValueProvider<String> getOutputDirectory();

@SuppressWarnings("unused")
Expand All @@ -126,6 +131,7 @@ public interface Options extends PipelineOptions {
@TemplateParameter.Text(
order = 5,
groupName = "Target",
optional = true,
description = "JSON file prefix",
helpText =
"The prefix of the JSON filename. For example: \"table1-\". If no value is provided, defaults to \"part\".")
Expand Down Expand Up @@ -281,19 +287,18 @@ public static PipelineResult run(Options options) {
}

// Concatenating cloud storage folder with file prefix to get complete path
ValueProvider<String> outputFilePrefix = options.getFilenamePrefix();

ValueProvider<String> outputFilePathWithPrefix =
ValueProvider.NestedValueProvider.of(
ValueProvider<String> filePathPrefix =
DualInputNestedValueProvider.of(
options.getOutputDirectory(),
(SerializableFunction<String, String>)
folder -> {
if (!folder.endsWith("/")) {
// Appending the slash if not provided by user
folder = folder + "/";
}
return folder + outputFilePrefix.get();
});
options.getFilenamePrefix(),
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
return FileSystems.matchNewResource(input.getX(), true)
.resolve(input.getY(), StandardResolveOptions.RESOLVE_FILE)
.toString();
}
});
pipeline
.apply("Read from Bigtable", read)
.apply(
Expand All @@ -309,7 +314,7 @@ public static PipelineResult run(Options options) {
options.getIntNumericRestrictsMappings(),
options.getFloatNumericRestrictsMappings(),
options.getDoubleNumericRestrictsMappings())))
.apply("Write to storage", TextIO.write().to(outputFilePathWithPrefix).withSuffix(".json"));
.apply("Write to storage", TextIO.write().to(filePathPrefix).withSuffix(".json"));

return pipeline.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public void testUnflattenedBigtableToJson() throws IOException {
.addParameter("bigtableInstanceId", bigtableResourceManager.getInstanceId())
.addParameter("bigtableTableId", tableId)
.addParameter("outputDirectory", getGcsPath("output/"))
.addParameter("filenamePrefix", "bigtable-to-json-output-")
.addParameter("bigtableAppProfileId", appProfileId);

// Act
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ public void testBigtableToVectorEmbeddings() throws IOException {
.addParameter("bigtableInstanceId", bigtableResourceManager.getInstanceId())
.addParameter("bigtableTableId", tableId)
.addParameter("outputDirectory", getGcsPath("output/"))
.addParameter("filenamePrefix", "bigtable-to-json-output-")
.addParameter("idColumn", "_key")
.addParameter("embeddingColumn", "cf1:embedding")
.addParameter("crowdingTagColumn", "cf2:crowding")
Expand Down

0 comments on commit 0b145d7

Please sign in to comment.