Skip to content

Commit

Permalink
Add additional logging to job executions
Browse files Browse the repository at this point in the history
  • Loading branch information
mseaton committed Aug 1, 2023
1 parent 372a72c commit c3bc88e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
11 changes: 10 additions & 1 deletion src/main/java/org/pih/petl/api/JobExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,22 @@ public JobExecution executeJob(JobExecution execution) {
public void executeInParallel(List<JobExecutionTask> tasks) throws InterruptedException, ExecutionException {
List<JobExecutionResult> finalResults = new ArrayList<>();
List<JobExecutionTask> tasksToSchedule = new ArrayList<>(tasks);
log.debug("Executing " + tasksToSchedule.size() + " tasks in parallel");
while (tasksToSchedule.size() > 0) {
List<Future<JobExecutionResult>> futures = new ArrayList<>();
for (JobExecutionTask task : tasksToSchedule) {
JobExecution execution = task.getJobExecution();
etlService.saveJobExecution(execution);
if (task.getAttemptNum() == 1) {
log.debug("First attempt at " + task + " submitting to executor service for execution");
futures.add(executorService.submit(task));
execution.setStatus(JobExecutionStatus.QUEUED);
etlService.saveJobExecution(execution);
log.info(execution);
}
else {
ErrorHandling errorHandling = task.getJobExecution().getJobConfig().getErrorHandling();
log.debug("Retry attempt at " + task + ", scheduling for re-execution: " + errorHandling);
futures.add(executorService.schedule(task, errorHandling.getRetryInterval(), errorHandling.getRetryIntervalUnit()));
execution.setStatus(JobExecutionStatus.RETRY_QUEUED);
etlService.saveJobExecution(execution);
Expand All @@ -102,7 +105,8 @@ public void executeInParallel(List<JobExecutionTask> tasks) throws InterruptedEx
JobExecutionResult result = future.get();
JobExecutionTask task = result.getJobExecutionTask();
JobExecution execution = task.getJobExecution();
if (result.isSuccessful() || task.getAttemptNum() >= task.getJobExecution().getJobConfig().getErrorHandling().getMaxAttempts()) {
int maxAttempts = task.getJobExecution().getJobConfig().getErrorHandling().getMaxAttempts();
if (result.isSuccessful() || task.getAttemptNum() >= maxAttempts) {
finalResults.add(result);
tasksToSchedule.remove(task);
execution.setCompleted(new Date());
Expand All @@ -114,6 +118,7 @@ public void executeInParallel(List<JobExecutionTask> tasks) throws InterruptedEx
}
}
else {
log.info("Task failed, but will retry. Attempt: " + task.getAttemptNum() + "; max attempts: " + maxAttempts);
task.incrementAttemptNum();
execution.setStatus(JobExecutionStatus.FAILED_WILL_RETRY);
execution.setErrorMessageFromException(result.getException());
Expand All @@ -137,6 +142,9 @@ public void executeInParallel(List<JobExecutionTask> tasks) throws InterruptedEx
* Execute a List of jobs in series. A failure will terminate immediately and subsequent jobs will not run
*/
public void executeInSeries(List<JobExecutionTask> tasks) throws InterruptedException, ExecutionException {

log.debug("Executing " + tasks.size() + " tasks in series");

// First, ensure all job executions are saved so that they can be tracked and re-initiated as needed
for (JobExecutionTask task : tasks) {
JobExecution execution = task.getJobExecution();
Expand All @@ -157,6 +165,7 @@ public void executeInSeries(List<JobExecutionTask> tasks) throws InterruptedExce
JobExecutionResult result = futureResult.get(); // This blocks until result is available
ErrorHandling errorHandling = task.getJobExecution().getJobConfig().getErrorHandling();
while (!result.isSuccessful() && task.getAttemptNum() < errorHandling.getMaxAttempts()) {
log.info("Task failed, but will retry. Attempt: " + task.getAttemptNum() + "; max attempts: " + errorHandling.getMaxAttempts());
task.incrementAttemptNum();
execution.setStatus(JobExecutionStatus.RETRY_QUEUED);
etlService.saveJobExecution(execution);
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/org/pih/petl/job/SqlServerImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ else if (StringUtils.isEmpty(partitionColumn)) {
try {
resultSet = ((PreparedStatement) statement).executeQuery();
if (resultSet != null) {
log.trace("Setting up bulk copy connection");
Connection sqlServerConnection = getAsSqlServerConnection(targetConnection);
SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(sqlServerConnection);
SQLServerBulkCopyOptions bco = new SQLServerBulkCopyOptions();
Expand All @@ -306,10 +307,14 @@ else if (StringUtils.isEmpty(partitionColumn)) {
bco.setBulkCopyTimeout(timeout);
bulkCopy.setBulkCopyOptions(bco);
bulkCopy.setDestinationTableName(tableToBulkInsertInto);
log.info("Performing up bulk copy operation");
bulkCopy.writeToServer(resultSet);
log.trace("Bulk copy operation completed successfully");
} else {
throw new PetlException("Invalid SQL extraction, no result set found");
}
} catch (Exception e) {
log.error("An error occurred during bulk copy operation", e);
} finally {
DbUtils.closeQuietly(resultSet);
}
Expand All @@ -320,9 +325,14 @@ else if (StringUtils.isEmpty(partitionColumn)) {
DbUtils.closeQuietly(statement);
}
}
log.debug("Import Completed Sucessfully");
log.debug("Import Completed Successfully");
} finally {
sourceConnection.rollback();
try {
sourceConnection.rollback();
}
catch (Exception e) {
log.debug("An error occurred during source connection rollback", e);
}
sourceConnection.setAutoCommit(originalSourceAutoCommit);
targetConnection.setAutoCommit(originalTargetAutocommit);
}
Expand Down

0 comments on commit c3bc88e

Please sign in to comment.