Skip to content

Commit

Permalink
JdbcToBigQuery: Support loading the source query from Cloud Storage. (#…
Browse files Browse the repository at this point in the history
…1773)

* JdbcToBigQuery: Support loading the source query from Cloud Storage.

This was already implemented in the legacy JdbcToBigQuery templates, but not in the Flex (v2) template.

This can be used in 2 use cases: providing very large queries (> 100 KB -- this causes the job graph to be too large if provided as a constant parameter) and providing queries with unicode characters.

* JdbcToBigQuery: Update documentation.

* Allow unicode characters in BigQuery Table IDs in tests.

* JdbcToBigQueryIT: Allow different test names and target table names.

* JdbcToBigQueryIT: Bugfix.

* JdbcToBigQueryIT: Bugfix.
  • Loading branch information
an2x authored Aug 12, 2024
1 parent e8949e2 commit ffdb13c
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class BigQueryResourceManagerUtils {
private static final String REPLACE_CHAR = "_";
private static final int MIN_TABLE_ID_LENGTH = 1;
private static final int MAX_TABLE_ID_LENGTH = 1024;
private static final Pattern ILLEGAL_TABLE_CHARS = Pattern.compile("[^a-zA-Z0-9-_]");
private static final Pattern ILLEGAL_TABLE_CHARS = Pattern.compile("[^a-zA-Z0-9-_\\p{L}]");
private static final String TIME_FORMAT = "yyyyMMdd_HHmmss";

private BigQueryResourceManagerUtils() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ public void testCheckValidTableIdShouldWorkWhenGivenCorrectId() {
checkValidTableId("a");
checkValidTableId("this-is_a_valid-id-1");
}

@Test
public void testCheckValidTableIdWhenIdContainsUnicodeChars() {
checkValidTableId("unicóde_table");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.utils;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.options.ValueProvider;

/**
* ValueProvider for a String, that is aware when the string contains a URL (starts with gs://) to
* resolve for the contents of that file.
*/
public class GCSAwareValueProvider implements ValueProvider<String>, Serializable {

private transient String cachedValue;

private final String originalValue;

public GCSAwareValueProvider(String originalValue) {
this.originalValue = originalValue;
}

@Override
public synchronized String get() {
if (cachedValue != null) {
return cachedValue;
}

cachedValue = resolve();
return cachedValue;
}

@Override
public boolean isAccessible() {
return true;
}

protected String resolve() {
if (originalValue != null && originalValue.startsWith("gs://")) {
try {
return new String(GCSUtils.getGcsFileAsBytes(originalValue), StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException(
"Error resolving a parameter from Cloud Storage path: " + originalValue, e);
}
}

return originalValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ public interface JdbcToBigQueryOptions
"The query to run on the source to extract the data. Note that some JDBC SQL and BigQuery types, although sharing the same name, have some differences. "
+ "Some important SQL -> BigQuery type mappings to keep in mind are:\n"
+ "DATETIME --> TIMESTAMP\n"
+ "\nType casting may be required if your schemas do not match.",
+ "\nType casting may be required if your schemas do not match. "
+ "This parameter can be set to a gs:// path pointing to a file in Cloud Storage to load the query from. "
+ "The file encoding should be UTF-8.",
example = "select * from sampledb.sample_table")
String getQuery();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.JdbcToBigQueryOptions;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSAwareValueProvider;
import com.google.cloud.teleport.v2.utils.JdbcConverters;
import com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -155,7 +156,7 @@ static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeTo
JdbcIO.Read<TableRow> readIO =
JdbcIO.<TableRow>read()
.withDataSourceConfiguration(dataSourceConfiguration)
.withQuery(options.getQuery())
.withQuery(new GCSAwareValueProvider(options.getQuery()))
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,36 @@ public void testPostgresToBigQueryFlex() throws IOException {
+ testName));
}

@Test
public void testPostgresWithUnicodeCharactersInQuery() throws IOException {
String tableName = "unicóde_table";

postgresResourceManager = PostgresResourceManager.builder(testName).build();
gcsClient.createArtifact(
"input/query.sql",
"SELECT ROW_ID, NAME AS FULL_NAME, AGE, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
+ tableName);

HashMap<String, String> columns = new HashMap<>();
columns.put(ROW_ID, "INTEGER NOT NULL");
columns.put(NAME, "VARCHAR(200)");
columns.put(AGE, "INTEGER");
columns.put(MEMBER, "VARCHAR(200)");
columns.put(ENTRY_ADDED, "VARCHAR(200)");
JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, ROW_ID);

simpleJdbcToBigQueryTest(
testName,
tableName,
testName,
schema,
POSTGRES_DRIVER,
postgresDriverGCSPath(),
postgresResourceManager,
true,
config -> config.addParameter("query", getGcsPath("input/query.sql")));
}

@Test
public void testOracleToBigQueryFlex() throws IOException {
// Oracle image does not work on M1
Expand Down Expand Up @@ -279,7 +309,30 @@ public void testReadWithPartitions() throws IOException {
}

private void simpleJdbcToBigQueryTest(
String tableName,
String testName,
JDBCResourceManager.JDBCSchema schema,
String driverClassName,
String driverJars,
JDBCResourceManager jdbcResourceManager,
boolean useColumnAlias,
Function<LaunchConfig.Builder, LaunchConfig.Builder> paramsAdder)
throws IOException {
simpleJdbcToBigQueryTest(
testName,
testName,
testName,
schema,
driverClassName,
driverJars,
jdbcResourceManager,
useColumnAlias,
paramsAdder);
}

private void simpleJdbcToBigQueryTest(
String testName,
String sourceTableName,
String targetTableName,
JDBCResourceManager.JDBCSchema schema,
String driverClassName,
String driverJars,
Expand All @@ -291,8 +344,8 @@ private void simpleJdbcToBigQueryTest(
// Arrange
List<Map<String, Object>> jdbcData =
getJdbcData(List.of(ROW_ID, NAME, AGE, MEMBER, ENTRY_ADDED));
jdbcResourceManager.createTable(tableName, schema);
jdbcResourceManager.write(tableName, jdbcData);
jdbcResourceManager.createTable(sourceTableName, schema);
jdbcResourceManager.write(sourceTableName, jdbcData);

List<Field> bqSchemaFields =
Arrays.asList(
Expand All @@ -304,15 +357,15 @@ private void simpleJdbcToBigQueryTest(
Schema bqSchema = Schema.of(bqSchemaFields);

bigQueryResourceManager.createDataset(REGION);
TableId table = bigQueryResourceManager.createTable(tableName, bqSchema);
TableId table = bigQueryResourceManager.createTable(targetTableName, bqSchema);

Function<String, String> encrypt =
message -> kmsResourceManager.encrypt(KEYRING_ID, CRYPTO_KEY_NAME, message);
CryptoKey cryptoKey = kmsResourceManager.getOrCreateCryptoKey(KEYRING_ID, CRYPTO_KEY_NAME);

PipelineLauncher.LaunchConfig.Builder options =
paramsAdder.apply(
PipelineLauncher.LaunchConfig.builder(tableName, specPath)
PipelineLauncher.LaunchConfig.builder(testName, specPath)
.addParameter("connectionURL", encrypt.apply(jdbcResourceManager.getUri()))
.addParameter("driverClassName", driverClassName)
.addParameter("outputTable", toTableSpecLegacy(table))
Expand Down Expand Up @@ -342,7 +395,7 @@ private void simpleJdbcToBigQueryTest(
row.put("is_member", row.remove("member"));
});
}
assertThatBigQueryRecords(bigQueryResourceManager.readTable(tableName))
assertThatBigQueryRecords(bigQueryResourceManager.readTable(targetTableName))
.hasRecordsUnorderedCaseInsensitiveColumns(jdbcData);
}

Expand Down

0 comments on commit ffdb13c

Please sign in to comment.