Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry Wrapper | WIP #19

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import com.sforce.ws.SessionRenewer;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
Expand Down Expand Up @@ -65,8 +66,8 @@ public final class SalesforceBulkUtil {
* @throws AsyncApiException if there is an issue creating the job
*/

public static JobInfo createJob(BulkConnection bulkConnection, String sObject, OperationEnum operationEnum,
@Nullable String externalIdField,
public static JobInfo createJob(BulkConnectionRetryWrapper bulkConnection, String sObject,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename the field to bulkConnectionRetryWrapper

OperationEnum operationEnum, @Nullable String externalIdField,
ConcurrencyMode concurrencyMode, ContentType contentType) throws AsyncApiException {
JobInfo job = new JobInfo();
job.setObject(sObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,7 +57,11 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
.put(SalesforceSinkConstants.CONFIG_MAX_BYTES_PER_BATCH, config.getMaxBytesPerBatch().toString())
.put(SalesforceSinkConstants.CONFIG_MAX_RECORDS_PER_BATCH, config.getMaxRecordsPerBatch().toString())
.put(SalesforceConstants.CONFIG_CONNECT_TIMEOUT, config.getConnection().getConnectTimeout().toString())
.put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString());
.put(SalesforceConstants.CONFIG_READ_TIMEOUT, config.getConnection().getReadTimeout().toString())
.put(SalesforceSourceConstants.CONFIG_INITIAL_RETRY_DURATION, Long.toString(config.getInitialRetryDuration()))
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_DURATION, Long.toString(config.getMaxRetryDuration()))
.put(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT, Integer.toString(config.getMaxRetryCount()))
.put(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, Boolean.toString(config.isRetryRequired()));

if (!Strings.isNullOrEmpty(config.getConnection().getProxyUrl())) {
configBuilder.put(SalesforceConstants.CONFIG_PROXY_URL, config.getConnection().getProxyUrl());
Expand Down Expand Up @@ -83,7 +89,9 @@ public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {

try {
BulkConnection bulkConnection = new BulkConnection(Authenticator.createConnectorConfig(credentials));
JobInfo job = SalesforceBulkUtil.createJob(bulkConnection, config.getSObject(), config.getOperationEnum(),
BulkConnectionRetryWrapper retryWrapper = new BulkConnectionRetryWrapper(bulkConnection, config.isRetryRequired(),
config.getInitialRetryDuration(), config.getMaxRetryDuration(), config.getMaxRetryCount());
JobInfo job = SalesforceBulkUtil.createJob(retryWrapper, config.getSObject(), config.getOperationEnum(),
config.getExternalIdField(), config.getConcurrencyModeEnum(),
ContentType.ZIP_CSV);
configBuilder.put(SalesforceSinkConstants.CONFIG_JOB_ID, job.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.cdap.plugin.salesforce.plugin.SalesforceConnectorBaseConfig;
import io.cdap.plugin.salesforce.plugin.SalesforceConnectorInfo;
import io.cdap.plugin.salesforce.plugin.connector.SalesforceConnectorConfig;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -152,6 +153,26 @@ public class SalesforceSinkConfig extends ReferencePluginConfig {
@Description("Whether to validate the field data types of the input schema as per Salesforce specific data types")
private final Boolean datatypeValidation;

@Name(SalesforceSourceConstants.PROPERTY_INITIAL_RETRY_DURATION)
@Description("Time taken for the first retry. Default is 5 seconds.")
@Nullable
private Long initialRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_DURATION)
@Description("Maximum time in seconds retries can take. Default is 80 seconds.")
@Nullable
private Long maxRetryDuration;

@Name(SalesforceSourceConstants.PROPERTY_MAX_RETRY_COUNT)
@Description("Maximum number of retries allowed. Default is 5.")
@Nullable
private Integer maxRetryCount;

@Name(SalesforceSourceConstants.PROPERTY_RETRY_REQUIRED)
@Description("Retry is required or not for some of the internal call failures")
@Nullable
private Boolean retryOnBackendError;

public SalesforceSinkConfig(String referenceName,
@Nullable String clientId,
@Nullable String clientSecret,
Expand Down Expand Up @@ -277,6 +298,23 @@ public String getOrgId(OAuthInfo oAuthInfo) throws ConnectionException {
return partnerConnection.getUserInfo().getOrganizationId();
}

public boolean isRetryRequired() {
return retryOnBackendError == null || retryOnBackendError;
}

public long getInitialRetryDuration() {
return initialRetryDuration == null ? SalesforceSourceConstants.DEFAULT_INITIAL_RETRY_DURATION_SECONDS :
initialRetryDuration;
}

public long getMaxRetryDuration() {
return maxRetryDuration == null ? SalesforceSourceConstants.DEFULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration;
}

public int getMaxRetryCount() {
return maxRetryCount == null ? SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
}

public void validate(Schema schema, FailureCollector collector, @Nullable OAuthInfo oAuthInfo) {
if (connection != null) {
getConnection().validate(collector, oAuthInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;

import java.util.ArrayList;
Expand Down Expand Up @@ -103,8 +104,11 @@ public void prepareRun(BatchSourceContext context) throws ConnectionException {
String sObjectNameField = config.getSObjectNameField();
authenticatorCredentials = config.getConnection().getAuthenticatorCredentials();
BulkConnection bulkConnection = SalesforceSplitUtil.getBulkConnection(authenticatorCredentials);
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection,
config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount());
List<SalesforceSplit> querySplits = queries.parallelStream()
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnection, false, config.getOperation(),
.map(query -> SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper, false, config.getOperation(),
config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount(), config.isRetryRequired()))
.flatMap(Collection::stream).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;

Expand Down Expand Up @@ -164,7 +165,10 @@ public static List<SalesforceSplit> getSplits(
bulkConnection.addHeader(SalesforceSourceConstants.HEADER_ENABLE_PK_CHUNK,
String.join(";", chunkHeaderValues));
}
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnection,
BulkConnectionRetryWrapper bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than passing the bulkConnection here, write a method in SalesforceSplitUtil that returns RetryWrapper

config.isRetryRequired(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount());
List<SalesforceSplit> querySplits = SalesforceSplitUtil.getQuerySplits(query, bulkConnectionRetryWrapper,
enablePKChunk, config.getOperation(), config.getInitialRetryDuration(), config.getMaxRetryDuration(),
config.getMaxRetryCount(), config.isRetryRequired());
return querySplits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.salesforce.BulkAPIBatchException;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.source.batch.util.BulkConnectionRetryWrapper;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceQueryExecutionException;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSplitUtil;
Expand Down Expand Up @@ -78,6 +78,7 @@ public class SalesforceBulkRecordReader extends RecordReader<Schema, Map<String,
private String batchId;
private String[] resultIds;
private int resultIdIndex;
private BulkConnectionRetryWrapper bulkConnectionRetryWrapper;

public SalesforceBulkRecordReader(Schema schema) {
this(schema, null, null, null);
Expand Down Expand Up @@ -115,6 +116,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
maxRetryCount = Integer.valueOf(conf.get(SalesforceSourceConstants.CONFIG_MAX_RETRY_COUNT,
String.valueOf(SalesforceSourceConstants.DEFAULT_MAX_RETRY_COUNT)));
isRetryRequired = Boolean.valueOf(conf.get(SalesforceSourceConstants.CONFIG_RETRY_REQUIRED, String.valueOf(true)));
bulkConnectionRetryWrapper = new BulkConnectionRetryWrapper(bulkConnection, isRetryRequired, initialRetryDuration,
maxRetryDuration, maxRetryCount);
AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(conf);
initialize(inputSplit, credentials);
}
Expand Down Expand Up @@ -205,15 +208,8 @@ void setupParser() throws IOException, AsyncApiException, InterruptedException {
resultIdIndex, resultIds.length));
}
try {
final InputStream queryResponseStream;
if (isRetryRequired) {
queryResponseStream =
Failsafe.with(SalesforceSplitUtil.getRetryPolicy(initialRetryDuration, maxRetryDuration, maxRetryCount))
.get(() -> getQueryResultStream(bulkConnection));
} else {
queryResponseStream = bulkConnection.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
}

final InputStream queryResponseStream = bulkConnectionRetryWrapper
.getQueryResultStream(jobId, batchId, resultIds[resultIdIndex]);
CSVFormat csvFormat = CSVFormat.DEFAULT
.withHeader()
.withQuoteMode(QuoteMode.ALL)
Expand Down
Loading