diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java index 703f8b4849..8b415432df 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java @@ -37,6 +37,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.auth.Credentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; @@ -62,6 +63,7 @@ import com.google.common.base.Strings; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; +import io.cdap.plugin.gcp.common.GCPUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -216,7 +218,7 @@ public void commitJob(JobContext jobContext) throws IOException { writeDisposition, sourceUris, partitionType, range, partitionByField, requirePartitionFilter, clusteringOrderList, tableExists, getJobIdForImportGCS(conf)); if (temporaryTableReference != null) { - operationAction(destTable, kmsKeyName, getJobIdForUpdateUpsert(conf)); + operationAction(destTable, kmsKeyName, getJobIdForUpdateUpsert(conf), conf); } } catch (Exception e) { throw new IOException("Failed to import GCS into BigQuery. ", e); @@ -531,14 +533,16 @@ private static Optional getTableSchema(Configuration conf) throws I return Optional.empty(); } - private void operationAction(TableReference tableRef, @Nullable String cmekKey, JobId jobId) throws Exception { + private void operationAction(TableReference tableRef, @Nullable String cmekKey, JobId jobId, Configuration config) + throws Exception { if (allowSchemaRelaxation) { updateTableSchema(tableRef); } String query = generateQuery(tableRef); LOG.info("Update/Upsert query: " + query); - BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + BigQuery bigquery = getBigQuery(config); + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query) .setUseLegacySql(false) @@ -696,4 +700,20 @@ private RangePartitioning createRangePartitioning(@Nullable String partitionByFi return rangePartitioning; } } + + private static BigQuery getBigQuery(Configuration config) throws IOException { + String projectId = ConfigurationUtil.getMandatoryConfig(config, BigQueryConfiguration.PROJECT_ID_KEY); + String serviceAccount; + boolean isServiceAccountFile = GCPUtils.SERVICE_ACCOUNT_TYPE_FILE_PATH + .equals(config.get(GCPUtils.SERVICE_ACCOUNT_TYPE)); + if (isServiceAccountFile) { + serviceAccount = config.get(GCPUtils.CLOUD_JSON_KEYFILE, null); + } else { + serviceAccount = config.get(String.format("%s.%s", GCPUtils.CLOUD_JSON_KEYFILE_PREFIX, + GCPUtils.CLOUD_ACCOUNT_JSON_SUFFIX)); + } + Credentials credentials = serviceAccount == null ? null : + GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFile); + return GCPUtils.getBigQuery(projectId, credentials); + } }