Skip to content

Commit

Permalink
[GOBBLIN-1917] Logging updates for Salesforce classes (#3786)
Browse files Browse the repository at this point in the history
* Logging updates for Salesforce classes

- Updating the log messages in the Salesforce related classes.
- In some cases, the exception was not included in the log. Adding it to make debugging issues easier.
- For some cases, added more details in the logs to print the current progress of the tasks.
- Minor coding style updates as well to improve code readability.
- There are no functional changes in this PR

* Fixed logging of jobTaskMetrics

* Revert logging on cancel in GobblinHelixJobTask class
  • Loading branch information
gautamguptabasant authored Sep 22, 2023
1 parent 1ca3192 commit 2a1fc46
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public TaskResult run() {
try {
HelixUtils.deleteWorkflow(previousActualJobId, this.jobHelixManager, timeOut);
} catch (HelixException e) {
log.error("Helix cannot delete previous actual job id {} within {} seconds.", previousActualJobId, timeOut / 1000);
log.error("Helix cannot delete previous actual job id {} within {} seconds.", previousActualJobId, timeOut / 1000, e);
return new TaskResult(TaskResult.Status.FAILED, ExceptionUtils.getFullStackTrace(e));
}
}
Expand Down Expand Up @@ -202,7 +202,7 @@ public TaskResult run() {
log.info("Completing planning job {}", this.planningJobId);
return new TaskResult(TaskResult.Status.COMPLETED, "");
} catch (Exception e) {
log.info("Failing planning job {}", this.planningJobId);
log.warn("Failing planning job {}", this.planningJobId, e);
return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils
.getFullStackTrace(e));
} finally {
Expand All @@ -211,6 +211,7 @@ public TaskResult run() {
try {
this.jobsMapping.deleteMapping(jobUri);
} catch (Exception e) {
log.warn("Failed to delete jobs mapping for job: {}", jobUri, e);
return new TaskResult(TaskResult.Status.FAILED,"Cannot delete jobs mapping for job : " + jobUri);
}
}
Expand All @@ -230,7 +231,7 @@ public void cancel() {
try {
this.jobsMapping.deleteMapping(jobUri);
} catch (Exception e) {
throw new RuntimeException("Cannot delete jobs mapping for job : " + jobUri);
throw new RuntimeException("Cannot delete jobs mapping for job : " + jobUri, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Void call()
this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
}
} catch (ReflectiveOperationException roe) {
log.error("Failed to instantiate data publisher for dataset %s of job %s.", this.datasetUrn,
log.error("Failed to instantiate data publisher for dataset {} of job {}.", this.datasetUrn,
this.jobContext.getJobId(), roe);
throw new RuntimeException(roe);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@
*/
@Slf4j
public class BulkResultIterator implements Iterator<JsonElement> {
private FileIdVO fileIdVO;
private int retryLimit;
private BulkConnection conn;
private final FileIdVO fileIdVO;
private final int retryLimit;
private final BulkConnection conn;
private InputStreamCSVReader csvReader;
private List<String> header;
private int columnSize;
private int lineCount = 0; // this is different than currentFileRowCount. cvs file has header
private long retryInterval;
private long retryExceedQuotaInterval;
private final long retryInterval;
private final long retryExceedQuotaInterval;
private List<String> preLoadedLine = null;

public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int retryLimit, long retryInterval, long retryExceedQuotaInterval) {
log.info("create BulkResultIterator: " + fileIdVO);
log.info("create BulkResultIterator: {} with retry limit as {} and retryInterval as {}", fileIdVO, retryLimit, retryInterval);
this.retryInterval = retryInterval;
this.retryExceedQuotaInterval = retryExceedQuotaInterval;
this.conn = conn;
Expand Down Expand Up @@ -87,17 +87,17 @@ private List<String> nextLineWithRetry() {
// Each organization is allowed 10 concurrent long-running requests. If the limit is reached,
// any new synchronous Apex request results in a runtime exception.
if (e.isCurrentExceptionExceedQuota()) {
log.warn("--Caught ExceededQuota: " + e.getMessage());
log.warn("--Caught ExceededQuota: ", e);
threadSleep(retryExceedQuotaInterval);
executeCount--; // if the current exception is Quota Exceeded, keep trying forever
}
log.info("***Retrying***1: {} - {}", fileIdVO, e.getMessage());
log.info("***Retrying***1: {} - Attempt {}/{}", fileIdVO, executeCount + 1, retryLimit, e);
this.csvReader = null; // in next loop, call openAndSeekCsvReader
} catch (Exception e) {
// Retry may resolve other exceptions.
rootCause = e;
threadSleep(retryInterval);
log.info("***Retrying***2: {} - {}", fileIdVO, e.getMessage());
log.info("***Retrying***2: {} - Attempt {}/{}", fileIdVO, executeCount + 1, retryLimit, e);
this.csvReader = null; // in next loop, call openAndSeekCsvReader
}
}
Expand Down
Loading

0 comments on commit 2a1fc46

Please sign in to comment.