diff --git a/src/main/java/org/pih/petl/SqlUtils.java b/src/main/java/org/pih/petl/SqlUtils.java index d29ba62..6d75804 100644 --- a/src/main/java/org/pih/petl/SqlUtils.java +++ b/src/main/java/org/pih/petl/SqlUtils.java @@ -14,9 +14,10 @@ import org.apache.commons.logging.LogFactory; import org.pih.petl.job.config.TableColumn; -import java.time.Instant; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; @@ -200,6 +201,43 @@ public static String mysqlDate(LocalDateTime instant) { return instant == null ? "null" : ("cast('" + instant + "' as datetime)"); } + /** + * The goal of this method is to test a result set for validity in JDBC + * This is useful if there are errors with the bulk load to sql server job in the data being passed across + * @param resultSet the ResultSet to test + * @throws SQLException + */ + public static void testResultSet(ResultSet resultSet) throws SQLException { + int numRowsTotal = 0; + int numRowsWithError = 0; + ResultSetMetaData rsmd = resultSet.getMetaData(); + while (resultSet.next()) { + numRowsTotal++; + boolean rowHasError = false; + StringBuilder row = new StringBuilder(); + row.append("rowNum = ").append(numRowsTotal); + for (int i=1; i<=rsmd.getColumnCount(); i++) { + row.append(", ").append(rsmd.getColumnName(i)).append(" = "); + try { + row.append(resultSet.getObject(i)); + } + catch (Exception e) { + rowHasError = true; + row.append("***ERROR***"); + } + } + if (rowHasError) { + numRowsWithError++; + log.error(row.toString()); + } + else if (log.isDebugEnabled()) { + log.debug(row.toString()); + } + } + log.warn("Number of rows total in extraction: " + numRowsTotal); + log.error("Number of rows with error in extraction: " + numRowsWithError); + } + //********** CONVENIENCE METHODS ************** protected static boolean isEmptyLine(String line) { diff --git a/src/main/java/org/pih/petl/job/SqlServerImportJob.java b/src/main/java/org/pih/petl/job/SqlServerImportJob.java index b13885a..56195dc 100644 --- a/src/main/java/org/pih/petl/job/SqlServerImportJob.java +++ b/src/main/java/org/pih/petl/job/SqlServerImportJob.java @@ -254,6 +254,7 @@ else if (StringUtils.isEmpty(partitionColumn)) { // Get bulk load configuration int batchSize = configReader.getInt(100, "load", "bulkCopy", "batchSize"); int timeout = configReader.getInt(7200, "load", "bulkCopy", "timeout"); // 2h default + boolean testOnly = configReader.getBoolean(false, "load", "bulkCopy", "testOnly"); try (Connection sourceConnection = sourceDatasource.openConnection()) { try (Connection targetConnection = targetDatasource.openConnection()) { @@ -298,18 +299,24 @@ else if (StringUtils.isEmpty(partitionColumn)) { try { resultSet = ((PreparedStatement) statement).executeQuery(); if (resultSet != null) { - log.trace("Setting up bulk copy connection"); - Connection sqlServerConnection = getAsSqlServerConnection(targetConnection); - SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(sqlServerConnection); - SQLServerBulkCopyOptions bco = new SQLServerBulkCopyOptions(); - bco.setKeepIdentity(true); - bco.setBatchSize(batchSize); - bco.setBulkCopyTimeout(timeout); - bulkCopy.setBulkCopyOptions(bco); - bulkCopy.setDestinationTableName(tableToBulkInsertInto); - log.info("Performing up bulk copy operation"); - bulkCopy.writeToServer(resultSet); - log.trace("Bulk copy operation completed successfully"); + if (testOnly) { + SqlUtils.testResultSet(resultSet); + throw new PetlException("Failed to load to SQL server due to testOnly mode"); + } + else { + log.trace("Setting up bulk copy connection"); + Connection sqlServerConnection = getAsSqlServerConnection(targetConnection); + SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(sqlServerConnection); + SQLServerBulkCopyOptions bco = new SQLServerBulkCopyOptions(); + bco.setKeepIdentity(true); + bco.setBatchSize(batchSize); + bco.setBulkCopyTimeout(timeout); + bulkCopy.setBulkCopyOptions(bco); + bulkCopy.setDestinationTableName(tableToBulkInsertInto); + log.info("Performing up bulk copy operation"); + bulkCopy.writeToServer(resultSet); + log.trace("Bulk copy operation completed successfully"); + } } else { throw new PetlException("Invalid SQL extraction, no result set found"); }