Skip to content

Commit

Permalink
Merge pull request #1452 from data-integrations/PLUGIN-1807-3
Browse files Browse the repository at this point in the history
[PLUGIN-1807] Implement error details provider to get more information about exceptions from GCP plugins
  • Loading branch information
itsankit-google authored Oct 18, 2024
2 parents a1a4049 + 77bdf7e commit 283824a
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.gcp.common;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpResponseException;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;

import java.util.List;

/**
* A custom ErrorDetailsProvider for GCP plugins.
*/
public class GCPErrorDetailsProvider implements ErrorDetailsProvider {

/**
* Get a ProgramFailureException with the given error
* information from generic exceptions like {@link java.io.IOException}.
*
* @param e The Throwable to get the error information from.
* @return A ProgramFailureException with the given error information, otherwise null.
*/
@Override
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
if (t instanceof ProgramFailureException) {
// if causal chain already has program failure exception, return null to avoid double wrap.
return null;
}
if (t instanceof HttpResponseException) {
return getProgramFailureException((HttpResponseException) t, errorContext);
}
}
return null;
}

/**
* Get a ProgramFailureException with the given error
* information from {@link HttpResponseException}.
*
* @param e The HttpResponseException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(HttpResponseException e,
ErrorContext errorContext) {
Integer statusCode = e.getStatusCode();
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
String errorReason = String.format("%s %s %s", e.getStatusCode(), e.getStatusMessage(),
pair.getCorrectiveAction());
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";

String errorMessage = e.getMessage();
if (e instanceof GoogleJsonResponseException) {
GoogleJsonResponseException exception = (GoogleJsonResponseException) e;
errorMessage = exception.getDetails() != null ? exception.getDetails().getMessage() :
exception.getMessage();
}

return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
errorReason, String.format(errorMessageFormat, errorContext.getPhase(), errorMessage),
pair.getErrorType(), true, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import com.google.bigtable.repackaged.com.google.gson.Gson;
import com.google.cloud.hadoop.util.AccessTokenProvider;
import com.google.cloud.hadoop.util.CredentialFactory;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -50,13 +54,20 @@ public AccessToken getAccessToken() {
}
return new AccessToken(token.getTokenValue(), token.getExpirationTime().getTime());
} catch (IOException e) {
throw new RuntimeException(e);
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to get service account access token.", e.getMessage(), ErrorType.UNKNOWN, true, e);
}
}

@Override
public void refresh() throws IOException {
getCredentials().refresh();
try {
getCredentials().refresh();
} catch (IOException e) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
"Unable to refresh service account access token.", e.getMessage(),
ErrorType.UNKNOWN, true, e);
}
}

private GoogleCredentials getCredentials() throws IOException {
Expand Down
70 changes: 52 additions & 18 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
Expand All @@ -51,6 +52,7 @@
import io.cdap.plugin.format.plugin.FileSinkProperties;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.Formats;
import io.cdap.plugin.gcp.gcs.GCSPath;
Expand Down Expand Up @@ -121,30 +123,50 @@ public void prepareRun(BatchSinkContext context) throws Exception {
collector.addFailure("Service account type is undefined.",
"Must be `filePath` or `JSON`");
collector.getOrThrowException();
return;
}
Credentials credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);

Credentials credentials = null;
try {
credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
isServiceAccountFilePath);
} catch (Exception e) {
String errorReason = "Unable to load service account credentials.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
Bucket bucket;
String location;
String location = null;
try {
bucket = storage.get(config.getBucket());
bucket = storage.get(bucketName);
if (bucket != null) {
location = bucket.getLocation();
} else {
location = config.getLocation();
GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
}
} catch (StorageException e) {
throw new RuntimeException(
String.format("Unable to access or create bucket %s. ", config.getBucket())
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
}
if (bucket != null) {
location = bucket.getLocation();
} else {
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
location = config.getLocation();
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

this.outputPath = getOutputDir(context);
// create asset for lineage
asset = Asset.builder(config.getReferenceName())
.setFqn(GCSPath.getFQN(config.getPath())).setLocation(location).build();

// set error details provider
context.setErrorDetailsProvider(
new ErrorDetailsProviderSpec(GCPErrorDetailsProvider.class.getName()));

// super is called down here to avoid instantiating the lineage recorder with a null asset
super.prepareRun(context);
Expand Down Expand Up @@ -532,8 +554,20 @@ public void validateContentType(FailureCollector failureCollector) {
}
}

public String getBucket() {
return GCSPath.from(path).getBucket();
/**
* Get the bucket name from the path.
* @param collector failure collector
* @return bucket name as {@link String} if found, otherwise null.
*/
public String getBucket(FailureCollector collector) {
try {
return GCSPath.from(path).getBucket();
} catch (IllegalArgumentException e) {
collector.addFailure(e.getMessage(), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}
return null;
}

@Override
Expand Down Expand Up @@ -718,8 +752,8 @@ public Builder setCustomContentType(@Nullable String customContentType) {
return this;
}

public GCSBatchSink.GCSBatchSinkConfig build() {
return new GCSBatchSink.GCSBatchSinkConfig(
public GCSBatchSinkConfig build() {
return new GCSBatchSinkConfig(
referenceName,
project,
fileSystemProperties,
Expand Down
47 changes: 34 additions & 13 deletions src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.connector.Connector;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.format.FileFormat;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPErrorDetailsProvider;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.connector.GCSConnector;
import org.apache.hadoop.io.NullWritable;
Expand Down Expand Up @@ -129,33 +131,52 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
config.validate(collector, context.getArguments().asMap());
collector.getOrThrowException();

Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config.connection,
config.getPath(), new HashMap<>());
Map<String, String> argumentCopy = new HashMap<>(context.getArguments().asMap());

CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();

Boolean isServiceAccountFilePath = config.connection.isServiceAccountFilePath();
if (isServiceAccountFilePath == null) {
context.getFailureCollector().addFailure("Service account type is undefined.",
collector.addFailure("Service account type is undefined.",
"Must be `filePath` or `JSON`");
context.getFailureCollector().getOrThrowException();
return;
collector.getOrThrowException();
}
Credentials credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(), isServiceAccountFilePath);

Credentials credentials = null;
try {
credentials = config.connection.getServiceAccount() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.connection.getServiceAccount(),
isServiceAccountFilePath);
} catch (Exception e) {
String errorReason = "Unable to load service account credentials.";
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

String bucketName = config.getBucket(collector);
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
String correctiveAction = "Ensure you entered the correct bucket path and "
+ "have permissions for it.";
try {
if (storage.get(config.getBucket()) == null) {
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKeyName);
if (storage.get(bucketName) == null) {
GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
}
} catch (StorageException e) {
// Add more descriptive error message
throw new RuntimeException(
String.format("Unable to access or create bucket %s. ", config.getBucket())
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
String errorReason = String.format(errorReasonFormat, e.getCode());
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
.withStacktrace(e.getStackTrace());
collector.getOrThrowException();
}

// set error details provider
context.setErrorDetailsProvider(
new ErrorDetailsProviderSpec(GCPErrorDetailsProvider.class.getName()));

Map<String, String> baseProperties = GCPUtils.getFileSystemProperties(config.connection,
config.getPath(), new HashMap<>());
if (config.getAllowFlexibleSchema()) {
//Configure MultiSink with support for flexible schemas.
configureSchemalessMultiSink(context, baseProperties, argumentCopy);
Expand Down
Loading

0 comments on commit 283824a

Please sign in to comment.