Skip to content

Commit

Permalink
Add ability to execute SqlServerImportJob with a test mode that check…
Browse files Browse the repository at this point in the history
…s the output sql for invalid data rather than sending to sql server.
  • Loading branch information
mseaton committed Aug 4, 2023
1 parent 208aeaa commit 6a2e21c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
42 changes: 40 additions & 2 deletions src/main/java/org/pih/petl/SqlUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
import org.apache.commons.logging.LogFactory;
import org.pih.petl.job.config.TableColumn;

import java.time.Instant;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -200,6 +201,43 @@ public static String mysqlDate(LocalDateTime instant) {
return instant == null ? "null" : ("cast('" + instant + "' as datetime)");
}

/**
* The goal of this method is to test a result set for validity in JDBC
* This is useful if there are errors with the bulk load to sql server job in the data being passed across
* @param resultSet the ResultSet to test
* @throws SQLException
*/
public static void testResultSet(ResultSet resultSet) throws SQLException {
int numRowsTotal = 0;
int numRowsWithError = 0;
ResultSetMetaData rsmd = resultSet.getMetaData();
while (resultSet.next()) {
numRowsTotal++;
boolean rowHasError = false;
StringBuilder row = new StringBuilder();
row.append("rowNum = ").append(numRowsTotal);
for (int i=1; i<=rsmd.getColumnCount(); i++) {
row.append(", ").append(rsmd.getColumnName(i)).append(" = ");
try {
row.append(resultSet.getObject(i));
}
catch (Exception e) {
rowHasError = true;
row.append("***ERROR***");
}
}
if (rowHasError) {
numRowsWithError++;
log.error(row.toString());
}
else if (log.isDebugEnabled()) {
log.debug(row.toString());
}
}
log.warn("Number of rows total in extraction: " + numRowsTotal);
log.error("Number of rows with error in extraction: " + numRowsWithError);
}

//********** CONVENIENCE METHODS **************

protected static boolean isEmptyLine(String line) {
Expand Down
31 changes: 19 additions & 12 deletions src/main/java/org/pih/petl/job/SqlServerImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ else if (StringUtils.isEmpty(partitionColumn)) {
// Get bulk load configuration
int batchSize = configReader.getInt(100, "load", "bulkCopy", "batchSize");
int timeout = configReader.getInt(7200, "load", "bulkCopy", "timeout"); // 2h default
boolean testOnly = configReader.getBoolean(false, "load", "bulkCopy", "testOnly");

try (Connection sourceConnection = sourceDatasource.openConnection()) {
try (Connection targetConnection = targetDatasource.openConnection()) {
Expand Down Expand Up @@ -298,18 +299,24 @@ else if (StringUtils.isEmpty(partitionColumn)) {
try {
resultSet = ((PreparedStatement) statement).executeQuery();
if (resultSet != null) {
log.trace("Setting up bulk copy connection");
Connection sqlServerConnection = getAsSqlServerConnection(targetConnection);
SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(sqlServerConnection);
SQLServerBulkCopyOptions bco = new SQLServerBulkCopyOptions();
bco.setKeepIdentity(true);
bco.setBatchSize(batchSize);
bco.setBulkCopyTimeout(timeout);
bulkCopy.setBulkCopyOptions(bco);
bulkCopy.setDestinationTableName(tableToBulkInsertInto);
log.info("Performing up bulk copy operation");
bulkCopy.writeToServer(resultSet);
log.trace("Bulk copy operation completed successfully");
if (testOnly) {
SqlUtils.testResultSet(resultSet);
throw new PetlException("Failed to load to SQL server due to testOnly mode");
}
else {
log.trace("Setting up bulk copy connection");
Connection sqlServerConnection = getAsSqlServerConnection(targetConnection);
SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(sqlServerConnection);
SQLServerBulkCopyOptions bco = new SQLServerBulkCopyOptions();
bco.setKeepIdentity(true);
bco.setBatchSize(batchSize);
bco.setBulkCopyTimeout(timeout);
bulkCopy.setBulkCopyOptions(bco);
bulkCopy.setDestinationTableName(tableToBulkInsertInto);
log.info("Performing up bulk copy operation");
bulkCopy.writeToServer(resultSet);
log.trace("Bulk copy operation completed successfully");
}
} else {
throw new PetlException("Invalid SQL extraction, no result set found");
}
Expand Down

0 comments on commit 6a2e21c

Please sign in to comment.