diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 18eec8e65..83833bf5a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -22,5 +22,5 @@
files="(DataConverter|GenericDatabaseDialect|JdbcSourceTask).java"/>
+ files="(ColumnDefinition|GenericDatabaseDialect|SqlServerDatabaseDialect|PostgreSqlDatabaseDialect|TimestampIncrementingTableQuerier|TimestampTableQuerier).java"/>
diff --git a/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java
index 2f2d0613d..57c53f1f3 100644
--- a/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java
+++ b/src/main/java/io/confluent/connect/jdbc/source/BulkTableQuerier.java
@@ -44,9 +44,10 @@ public BulkTableQuerier(
QueryMode mode,
String name,
String topicPrefix,
- String suffix
+ String suffix,
+ int queryTimeout
) {
- super(dialect, mode, name, topicPrefix, suffix);
+ super(dialect, mode, name, topicPrefix, suffix, queryTimeout);
}
@Override
diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java
index 17bebf0ba..6e43d9b26 100644
--- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java
+++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java
@@ -366,6 +366,13 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
+ " * SERIALIZABLE\n"
+ " * SQL_SERVER_SNAPSHOT\n";
private static final String TRANSACTION_ISOLATION_MODE_DISPLAY = "Transaction Isolation Mode";
+
+ public static final String QUERY_TIMEOUT_S_CONFIG = "query.timeout";
+ public static final int QUERY_TIMEOUT_S_DEFAULT = 0;
+ public static final String QUERY_TIMEOUT_S_DOC =
+ "Query timeout in seconds."
+ + "For backward compatibility, the default is 0. 0 means there is no timeout";
+ private static final String QUERY_TIMEOUT_S_DISPLAY = "Query timeout (s)";
private static final EnumRecommender TRANSACTION_ISOLATION_MODE_RECOMMENDER =
EnumRecommender.in(TransactionIsolationMode.values());
@@ -669,6 +676,16 @@ private static final void addModeOptions(ConfigDef config) {
++orderInGroup,
Width.MEDIUM,
QUERY_RETRIES_DISPLAY
+ ).define(
+ QUERY_TIMEOUT_S_CONFIG,
+ Type.INT,
+ QUERY_TIMEOUT_S_DEFAULT,
+ Importance.LOW,
+ QUERY_TIMEOUT_S_DOC,
+ MODE_GROUP,
+ ++orderInGroup,
+ Width.MEDIUM,
+ QUERY_TIMEOUT_S_DISPLAY
);
}
diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java
index 5cd8a6ab0..2d99c375b 100644
--- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java
+++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java
@@ -230,6 +230,8 @@ public void start(Map properties) {
String topicPrefix = config.topicPrefix();
JdbcSourceConnectorConfig.TimestampGranularity timestampGranularity
= JdbcSourceConnectorConfig.TimestampGranularity.get(config);
+
+ int queryTimeout = config.getInt(JdbcSourceTaskConfig.QUERY_TIMEOUT_S_CONFIG);
if (mode.equals(JdbcSourceTaskConfig.MODE_BULK)) {
tableQueue.add(
@@ -238,7 +240,8 @@ public void start(Map properties) {
queryMode,
tableOrQuery,
topicPrefix,
- suffix
+ suffix,
+ queryTimeout
)
);
} else if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)) {
@@ -254,7 +257,8 @@ public void start(Map properties) {
timestampDelayInterval,
timeZone,
suffix,
- timestampGranularity
+ timestampGranularity,
+ queryTimeout
)
);
} else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) {
@@ -269,7 +273,8 @@ public void start(Map properties) {
timestampDelayInterval,
timeZone,
suffix,
- timestampGranularity
+ timestampGranularity,
+ queryTimeout
)
);
} else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {
@@ -285,7 +290,8 @@ public void start(Map properties) {
timestampDelayInterval,
timeZone,
suffix,
- timestampGranularity
+ timestampGranularity,
+ queryTimeout
)
);
}
diff --git a/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java
index f926ccced..af180b676 100644
--- a/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java
+++ b/src/main/java/io/confluent/connect/jdbc/source/TableQuerier.java
@@ -47,6 +47,7 @@ public enum QueryMode {
protected final String topicPrefix;
protected final TableId tableId;
protected final String suffix;
+ protected final int queryTimeout;
// Mutable state
@@ -64,7 +65,8 @@ public TableQuerier(
QueryMode mode,
String nameOrQuery,
String topicPrefix,
- String suffix
+ String suffix,
+ int queryTimeout
) {
this.dialect = dialect;
this.mode = mode;
@@ -74,6 +76,7 @@ public TableQuerier(
this.lastUpdate = 0;
this.suffix = suffix;
this.attemptedRetries = 0;
+ this.queryTimeout = queryTimeout;
}
public long getLastUpdate() {
@@ -85,6 +88,9 @@ public PreparedStatement getOrCreatePreparedStatement(Connection db) throws SQLE
return stmt;
}
createPreparedStatement(db);
+ if (queryTimeout > 0) {
+ stmt.setQueryTimeout(queryTimeout);
+ }
return stmt;
}
diff --git a/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java
index 6be4f68be..8c5d42102 100644
--- a/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java
+++ b/src/main/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerier.java
@@ -83,8 +83,9 @@ public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode
String incrementingColumnName,
Map offsetMap, Long timestampDelay,
TimeZone timeZone, String suffix,
- TimestampGranularity timestampGranularity) {
- super(dialect, mode, name, topicPrefix, suffix);
+ TimestampGranularity timestampGranularity,
+ int queryTimeout) {
+ super(dialect, mode, name, topicPrefix, suffix, queryTimeout);
this.incrementingColumnName = incrementingColumnName;
this.timestampColumnNames = timestampColumnNames != null
? timestampColumnNames : Collections.emptyList();
diff --git a/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java b/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java
index 625660629..aee8b7f9b 100644
--- a/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java
+++ b/src/main/java/io/confluent/connect/jdbc/source/TimestampTableQuerier.java
@@ -64,7 +64,8 @@ public TimestampTableQuerier(
Long timestampDelay,
TimeZone timeZone,
String suffix,
- TimestampGranularity timestampGranularity
+ TimestampGranularity timestampGranularity,
+ int queryTimeout
) {
super(
dialect,
@@ -77,7 +78,8 @@ public TimestampTableQuerier(
timestampDelay,
timeZone,
suffix,
- timestampGranularity
+ timestampGranularity,
+ queryTimeout
);
this.latestCommittableTimestamp = this.offset.getTimestampOffset();
diff --git a/src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java b/src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java
index dba07c082..fdfd9f06e 100644
--- a/src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java
+++ b/src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java
@@ -22,6 +22,7 @@
import io.confluent.connect.jdbc.util.TableId;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.junit.Before;
@@ -32,6 +33,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
public class TableQuerierTest {
private static final String TABLE_NAME = "name";
@@ -39,6 +41,7 @@ public class TableQuerierTest {
private static final String SUFFIX = "/* SUFFIX */";
private static final Long TIMESTAMP_DELAY = 0l;
private static final String QUERY = "SELECT * FROM name";
+ private static final int QUERY_TIMEOUT = 10;
DatabaseDialect databaseDialectMock;
@@ -71,7 +74,8 @@ public void testTimestampIncrementingTableQuerierInTableModeWithSuffix() throws
TIMESTAMP_DELAY,
null,
SUFFIX,
- JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL
+ JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
+ JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);
querier.createPreparedStatement(connectionMock);
@@ -92,7 +96,8 @@ public void testTimestampIncrementingTableQuerierInQueryModeWithSuffix() throws
TIMESTAMP_DELAY,
null,
SUFFIX,
- JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL
+ JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
+ JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);
querier.createPreparedStatement(connectionMock);
@@ -107,7 +112,8 @@ public void testBulkTableQuerierInTableModeWithSuffix() throws SQLException {
QueryMode.TABLE,
TABLE_NAME,
null,
- SUFFIX
+ SUFFIX,
+ JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);
querier.createPreparedStatement(connectionMock);
@@ -122,7 +128,8 @@ public void testBulkTableQuerierInQueryModeWithSuffix() throws SQLException {
QueryMode.QUERY,
QUERY,
null,
- SUFFIX
+ SUFFIX,
+ JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);
querier.createPreparedStatement(connectionMock);
@@ -137,11 +144,104 @@ public void testBulkTableQuerierInQueryModeWithoutSuffix() throws SQLException {
QueryMode.QUERY,
QUERY,
null,
- "" /* default value */
+ "" /* default value */,
+ JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);
querier.createPreparedStatement(connectionMock);
verify(databaseDialectMock, times(1)).createPreparedStatement(Matchers.any(),Matchers.eq("SELECT * FROM name"));
- }
+ }
+
+ @Test
+ public void testTimestampIncrementingTableQuerierWithQueryTimeout() throws SQLException {
+ PreparedStatement stmtMock = mock(PreparedStatement.class);
+ when(databaseDialectMock.createPreparedStatement(Matchers.eq(connectionMock), Matchers.any(String.class)))
+ .thenReturn(stmtMock);
+
+ TimestampIncrementingTableQuerier querier = new TimestampIncrementingTableQuerier(
+ databaseDialectMock,
+ QueryMode.TABLE,
+ TABLE_NAME,
+ null,
+ null,
+ INCREMENTING_COLUMN_NAME,
+ null,
+ TIMESTAMP_DELAY,
+ null,
+ SUFFIX,
+ JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
+ QUERY_TIMEOUT
+ );
+
+ querier.getOrCreatePreparedStatement(connectionMock);
+
+ verify(stmtMock, times(1)).setQueryTimeout(Matchers.eq(QUERY_TIMEOUT));
+ }
+
+ @Test
+ public void testTimestampIncrementingTableQuerierWithoutQueryTimeout() throws SQLException {
+ PreparedStatement stmtMock = mock(PreparedStatement.class);
+ when(databaseDialectMock.createPreparedStatement(Matchers.eq(connectionMock), Matchers.any(String.class)))
+ .thenReturn(stmtMock);
+
+ TimestampIncrementingTableQuerier querier = new TimestampIncrementingTableQuerier(
+ databaseDialectMock,
+ QueryMode.QUERY,
+ QUERY,
+ null,
+ null,
+ INCREMENTING_COLUMN_NAME,
+ null,
+ TIMESTAMP_DELAY,
+ null,
+ SUFFIX,
+ JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
+ JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
+ );
+
+ querier.getOrCreatePreparedStatement(connectionMock);
+
+ verify(stmtMock, never()).setQueryTimeout(Matchers.anyInt());
+ }
+
+ @Test
+ public void testBulkTableQuerierWithQueryTimeout() throws SQLException {
+ PreparedStatement stmtMock = mock(PreparedStatement.class);
+ when(databaseDialectMock.createPreparedStatement(Matchers.eq(connectionMock), Matchers.any(String.class)))
+ .thenReturn(stmtMock);
+
+ BulkTableQuerier querier = new BulkTableQuerier(
+ databaseDialectMock,
+ QueryMode.QUERY,
+ QUERY,
+ null,
+ SUFFIX,
+ QUERY_TIMEOUT
+ );
+
+ querier.getOrCreatePreparedStatement(connectionMock);
+
+ verify(stmtMock, times(1)).setQueryTimeout(Matchers.eq(QUERY_TIMEOUT));
+ }
+
+ @Test
+ public void testBulkTableQuerierWithoutQueryTimeout() throws SQLException {
+ PreparedStatement stmtMock = mock(PreparedStatement.class);
+ when(databaseDialectMock.createPreparedStatement(Matchers.eq(connectionMock), Matchers.any(String.class)))
+ .thenReturn(stmtMock);
+
+ BulkTableQuerier querier = new BulkTableQuerier(
+ databaseDialectMock,
+ QueryMode.TABLE,
+ TABLE_NAME,
+ null,
+ SUFFIX,
+ JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
+ );
+
+ querier.getOrCreatePreparedStatement(connectionMock);
+
+ verify(stmtMock, never()).setQueryTimeout(Matchers.anyInt());
+ }
}
diff --git a/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerierTest.java b/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerierTest.java
index 4e68f96b5..ccd0fc8e2 100644
--- a/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerierTest.java
+++ b/src/test/java/io/confluent/connect/jdbc/source/TimestampIncrementingTableQuerierTest.java
@@ -100,7 +100,8 @@ private TimestampIncrementingTableQuerier querier(
10211197100L, // Timestamp delay
TimeZone.getTimeZone("UTC"),
"",
- JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL
+ JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
+ JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);
}
diff --git a/src/test/java/io/confluent/connect/jdbc/source/TimestampTableQuerierTest.java b/src/test/java/io/confluent/connect/jdbc/source/TimestampTableQuerierTest.java
index f7fefbbca..f7bb03163 100644
--- a/src/test/java/io/confluent/connect/jdbc/source/TimestampTableQuerierTest.java
+++ b/src/test/java/io/confluent/connect/jdbc/source/TimestampTableQuerierTest.java
@@ -97,7 +97,8 @@ private TimestampIncrementingTableQuerier querier(Timestamp initialTimestampOffs
10211197100L, // Timestamp delay
TimeZone.getTimeZone("UTC"),
"",
- JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL
+ JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
+ JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);
}