Skip to content

Commit

Permalink
Merge pull request #591 from AdaptiveScale/bugfix/PLUGIN-604
Browse files Browse the repository at this point in the history
Fixed BigQuerySink fails to read ProjectID
  • Loading branch information
rmstar authored Mar 19, 2021
2 parents 40243ef + 4f29216 commit f054621
Showing 1 changed file with 23 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
Expand All @@ -62,6 +63,7 @@
import com.google.common.base.Strings;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
Expand Down Expand Up @@ -216,7 +218,7 @@ public void commitJob(JobContext jobContext) throws IOException {
writeDisposition, sourceUris, partitionType, range, partitionByField,
requirePartitionFilter, clusteringOrderList, tableExists, getJobIdForImportGCS(conf));
if (temporaryTableReference != null) {
operationAction(destTable, kmsKeyName, getJobIdForUpdateUpsert(conf));
operationAction(destTable, kmsKeyName, getJobIdForUpdateUpsert(conf), conf);
}
} catch (Exception e) {
throw new IOException("Failed to import GCS into BigQuery. ", e);
Expand Down Expand Up @@ -531,14 +533,16 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I
return Optional.empty();
}

private void operationAction(TableReference tableRef, @Nullable String cmekKey, JobId jobId) throws Exception {
private void operationAction(TableReference tableRef, @Nullable String cmekKey, JobId jobId, Configuration config)
throws Exception {
if (allowSchemaRelaxation) {
updateTableSchema(tableRef);
}
String query = generateQuery(tableRef);
LOG.info("Update/Upsert query: " + query);

BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
BigQuery bigquery = getBigQuery(config);

QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.setUseLegacySql(false)
Expand Down Expand Up @@ -696,4 +700,20 @@ private RangePartitioning createRangePartitioning(@Nullable String partitionByFi
return rangePartitioning;
}
}

private static BigQuery getBigQuery(Configuration config) throws IOException {
String projectId = ConfigurationUtil.getMandatoryConfig(config, BigQueryConfiguration.PROJECT_ID_KEY);
String serviceAccount;
boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH
.equals(config.get(GCPUtils.SERVICE_ACCOUNT_TYPE));
if (isServiceAccountFile) {
serviceAccount = config.get(GCPUtils.CLOUD_JSON_KEYFILE, null);
} else {
serviceAccount = config.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX,
GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX));
}
Credentials credentials = serviceAccount == null ? null :
GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFile);
return GCPUtils.getBigQuery(projectId, credentials);
}
}

0 comments on commit f054621

Please sign in to comment.