Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query timeout #1270

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
files="(DataConverter|GenericDatabaseDialect|JdbcSourceTask).java"/>

<suppress checks="ParameterNumber"
files="(ColumnDefinition|GenericDatabaseDialect|SqlServerDatabaseDialect|PostgreSqlDatabaseDialect|TimestampIncrementingTableQuerier).java"/>
files="(ColumnDefinition|GenericDatabaseDialect|SqlServerDatabaseDialect|PostgreSqlDatabaseDialect|TimestampIncrementingTableQuerier|TimestampTableQuerier).java"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ public BulkTableQuerier(
QueryMode mode,
String name,
String topicPrefix,
String suffix
String suffix,
int queryTimeout
) {
super(dialect, mode, name, topicPrefix, suffix);
super(dialect, mode, name, topicPrefix, suffix, queryTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
+ " * SERIALIZABLE\n"
+ " * SQL_SERVER_SNAPSHOT\n";
private static final String TRANSACTION_ISOLATION_MODE_DISPLAY = "Transaction Isolation Mode";

public static final String QUERY_TIMEOUT_S_CONFIG = "query.timeout";
public static final int QUERY_TIMEOUT_S_DEFAULT = 0;
public static final String QUERY_TIMEOUT_S_DOC =
"Query timeout in seconds."
+ "For backward compatibility, the default is 0. 0 means there is no timeout";
private static final String QUERY_TIMEOUT_S_DISPLAY = "Query timeout (s)";

private static final EnumRecommender TRANSACTION_ISOLATION_MODE_RECOMMENDER =
EnumRecommender.in(TransactionIsolationMode.values());
Expand Down Expand Up @@ -669,6 +676,16 @@ private static final void addModeOptions(ConfigDef config) {
++orderInGroup,
Width.MEDIUM,
QUERY_RETRIES_DISPLAY
).define(
QUERY_TIMEOUT_S_CONFIG,
Type.INT,
QUERY_TIMEOUT_S_DEFAULT,
Importance.LOW,
QUERY_TIMEOUT_S_DOC,
MODE_GROUP,
++orderInGroup,
Width.MEDIUM,
QUERY_TIMEOUT_S_DISPLAY
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ public void start(Map<String, String> properties) {
String topicPrefix = config.topicPrefix();
JdbcSourceConnectorConfig.TimestampGranularity timestampGranularity
= JdbcSourceConnectorConfig.TimestampGranularity.get(config);

int queryTimeout = config.getInt(JdbcSourceTaskConfig.QUERY_TIMEOUT_S_CONFIG);

if (mode.equals(JdbcSourceTaskConfig.MODE_BULK)) {
tableQueue.add(
Expand All @@ -238,7 +240,8 @@ public void start(Map<String, String> properties) {
queryMode,
tableOrQuery,
topicPrefix,
suffix
suffix,
queryTimeout
)
);
} else if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)) {
Expand All @@ -254,7 +257,8 @@ public void start(Map<String, String> properties) {
timestampDelayInterval,
timeZone,
suffix,
timestampGranularity
timestampGranularity,
queryTimeout
)
);
} else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) {
Expand All @@ -269,7 +273,8 @@ public void start(Map<String, String> properties) {
timestampDelayInterval,
timeZone,
suffix,
timestampGranularity
timestampGranularity,
queryTimeout
)
);
} else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {
Expand All @@ -285,7 +290,8 @@ public void start(Map<String, String> properties) {
timestampDelayInterval,
timeZone,
suffix,
timestampGranularity
timestampGranularity,
queryTimeout
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public enum QueryMode {
protected final String topicPrefix;
protected final TableId tableId;
protected final String suffix;
protected final int queryTimeout;

// Mutable state

Expand All @@ -64,7 +65,8 @@ public TableQuerier(
QueryMode mode,
String nameOrQuery,
String topicPrefix,
String suffix
String suffix,
int queryTimeout
) {
this.dialect = dialect;
this.mode = mode;
Expand All @@ -74,6 +76,7 @@ public TableQuerier(
this.lastUpdate = 0;
this.suffix = suffix;
this.attemptedRetries = 0;
this.queryTimeout = queryTimeout;
}

public long getLastUpdate() {
Expand All @@ -85,6 +88,9 @@ public PreparedStatement getOrCreatePreparedStatement(Connection db) throws SQLE
return stmt;
}
createPreparedStatement(db);
if (queryTimeout > 0) {
stmt.setQueryTimeout(queryTimeout);
}
return stmt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ public TimestampIncrementingTableQuerier(DatabaseDialect dialect, QueryMode mode
String incrementingColumnName,
Map<String, Object> offsetMap, Long timestampDelay,
TimeZone timeZone, String suffix,
TimestampGranularity timestampGranularity) {
super(dialect, mode, name, topicPrefix, suffix);
TimestampGranularity timestampGranularity,
int queryTimeout) {
super(dialect, mode, name, topicPrefix, suffix, queryTimeout);
this.incrementingColumnName = incrementingColumnName;
this.timestampColumnNames = timestampColumnNames != null
? timestampColumnNames : Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public TimestampTableQuerier(
Long timestampDelay,
TimeZone timeZone,
String suffix,
TimestampGranularity timestampGranularity
TimestampGranularity timestampGranularity,
int queryTimeout
) {
super(
dialect,
Expand All @@ -77,7 +78,8 @@ public TimestampTableQuerier(
timestampDelay,
timeZone,
suffix,
timestampGranularity
timestampGranularity,
queryTimeout
);

this.latestCommittableTimestamp = this.offset.getTimestampOffset();
Expand Down
112 changes: 106 additions & 6 deletions src/test/java/io/confluent/connect/jdbc/source/TableQuerierTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.connect.jdbc.util.TableId;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.junit.Before;
Expand All @@ -32,13 +33,15 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.never;

public class TableQuerierTest {
private static final String TABLE_NAME = "name";
private static final String INCREMENTING_COLUMN_NAME = "column";
private static final String SUFFIX = "/* SUFFIX */";
private static final Long TIMESTAMP_DELAY = 0l;
private static final String QUERY = "SELECT * FROM name";
private static final int QUERY_TIMEOUT = 10;

DatabaseDialect databaseDialectMock;

Expand Down Expand Up @@ -71,7 +74,8 @@ public void testTimestampIncrementingTableQuerierInTableModeWithSuffix() throws
TIMESTAMP_DELAY,
null,
SUFFIX,
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);

querier.createPreparedStatement(connectionMock);
Expand All @@ -92,7 +96,8 @@ public void testTimestampIncrementingTableQuerierInQueryModeWithSuffix() throws
TIMESTAMP_DELAY,
null,
SUFFIX,
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);

querier.createPreparedStatement(connectionMock);
Expand All @@ -107,7 +112,8 @@ public void testBulkTableQuerierInTableModeWithSuffix() throws SQLException {
QueryMode.TABLE,
TABLE_NAME,
null,
SUFFIX
SUFFIX,
JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);

querier.createPreparedStatement(connectionMock);
Expand All @@ -122,7 +128,8 @@ public void testBulkTableQuerierInQueryModeWithSuffix() throws SQLException {
QueryMode.QUERY,
QUERY,
null,
SUFFIX
SUFFIX,
JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);

querier.createPreparedStatement(connectionMock);
Expand All @@ -137,11 +144,104 @@ public void testBulkTableQuerierInQueryModeWithoutSuffix() throws SQLException {
QueryMode.QUERY,
QUERY,
null,
"" /* default value */
"" /* default value */,
JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);

querier.createPreparedStatement(connectionMock);

verify(databaseDialectMock, times(1)).createPreparedStatement(Matchers.any(),Matchers.eq("SELECT * FROM name"));
}
}

@Test
public void testTimestampIncrementingTableQuerierWithQueryTimeout() throws SQLException {
PreparedStatement stmtMock = mock(PreparedStatement.class);
when(databaseDialectMock.createPreparedStatement(Matchers.eq(connectionMock), Matchers.any(String.class)))
.thenReturn(stmtMock);

TimestampIncrementingTableQuerier querier = new TimestampIncrementingTableQuerier(
databaseDialectMock,
QueryMode.TABLE,
TABLE_NAME,
null,
null,
INCREMENTING_COLUMN_NAME,
null,
TIMESTAMP_DELAY,
null,
SUFFIX,
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
QUERY_TIMEOUT
);

querier.getOrCreatePreparedStatement(connectionMock);

verify(stmtMock, times(1)).setQueryTimeout(Matchers.eq(QUERY_TIMEOUT));
}

@Test
public void testTimestampIncrementingTableQuerierWithoutQueryTimeout() throws SQLException {
PreparedStatement stmtMock = mock(PreparedStatement.class);
when(databaseDialectMock.createPreparedStatement(Matchers.eq(connectionMock), Matchers.any(String.class)))
.thenReturn(stmtMock);

TimestampIncrementingTableQuerier querier = new TimestampIncrementingTableQuerier(
databaseDialectMock,
QueryMode.QUERY,
QUERY,
null,
null,
INCREMENTING_COLUMN_NAME,
null,
TIMESTAMP_DELAY,
null,
SUFFIX,
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);

querier.getOrCreatePreparedStatement(connectionMock);

verify(stmtMock, never()).setQueryTimeout(Matchers.anyInt());
}

@Test
public void testBulkTableQuerierWithQueryTimeout() throws SQLException {
PreparedStatement stmtMock = mock(PreparedStatement.class);
when(databaseDialectMock.createPreparedStatement(Matchers.eq(connectionMock), Matchers.any(String.class)))
.thenReturn(stmtMock);

BulkTableQuerier querier = new BulkTableQuerier(
databaseDialectMock,
QueryMode.QUERY,
QUERY,
null,
SUFFIX,
QUERY_TIMEOUT
);

querier.getOrCreatePreparedStatement(connectionMock);

verify(stmtMock, times(1)).setQueryTimeout(Matchers.eq(QUERY_TIMEOUT));
}

@Test
public void testBulkTableQuerierWithoutQueryTimeout() throws SQLException {
PreparedStatement stmtMock = mock(PreparedStatement.class);
when(databaseDialectMock.createPreparedStatement(Matchers.eq(connectionMock), Matchers.any(String.class)))
.thenReturn(stmtMock);

BulkTableQuerier querier = new BulkTableQuerier(
databaseDialectMock,
QueryMode.TABLE,
TABLE_NAME,
null,
SUFFIX,
JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);

querier.getOrCreatePreparedStatement(connectionMock);

verify(stmtMock, never()).setQueryTimeout(Matchers.anyInt());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ private TimestampIncrementingTableQuerier querier(
10211197100L, // Timestamp delay
TimeZone.getTimeZone("UTC"),
"",
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ private TimestampIncrementingTableQuerier querier(Timestamp initialTimestampOffs
10211197100L, // Timestamp delay
TimeZone.getTimeZone("UTC"),
"",
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL
JdbcSourceConnectorConfig.TimestampGranularity.CONNECT_LOGICAL,
JdbcSourceConnectorConfig.QUERY_TIMEOUT_S_DEFAULT
);
}

Expand Down