Skip to content

Commit

Permalink
Rethrow ConnectException from ConnectionProvider as RetriableExceptio…
Browse files Browse the repository at this point in the history
…n to provide proper retries.
  • Loading branch information
yevgenp committed Oct 31, 2022
1 parent 8f20bdf commit 3ed5ad9
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 16 deletions.
32 changes: 16 additions & 16 deletions src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.connect.jdbc.sink;

import static io.confluent.connect.jdbc.util.ExceptionUtil.iterator;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -88,24 +90,24 @@ public void put(Collection<SinkRecord> records) {
} else {
throw tace;
}
} catch (SQLException sqle) {
} catch (ConnectException | SQLException ex) {
log.warn(
"Write of {} records failed, remainingRetries={}",
records.size(),
remainingRetries,
sqle
ex
);
int totalExceptions = 0;
for (Throwable e :sqle) {
for (Throwable e : iterator(ex)) {
totalExceptions++;
}
SQLException sqlAllMessagesException = getAllMessagesException(sqle);
Exception allMessagesException = getAllMessagesException(ex);
if (remainingRetries > 0) {
writer.closeQuietly();
initWriter();
remainingRetries--;
context.timeout(config.retryBackoffMs);
throw new RetriableException(sqlAllMessagesException);
throw new RetriableException(allMessagesException);
} else {
if (reporter != null) {
unrollAndRetry(records);
Expand All @@ -116,10 +118,10 @@ public void put(Collection<SinkRecord> records) {
+ "For complete details on each exception, please enable DEBUG logging.",
totalExceptions);
int exceptionCount = 1;
for (Throwable e : sqle) {
for (Throwable e : iterator(ex)) {
log.debug("Exception {}:", exceptionCount++, e);
}
throw new ConnectException(sqlAllMessagesException);
throw new ConnectException(allMessagesException);
}
}
}
Expand All @@ -135,21 +137,19 @@ private void unrollAndRetry(Collection<SinkRecord> records) {
reporter.report(record, tace);
writer.closeQuietly();
} catch (SQLException sqle) {
SQLException sqlAllMessagesException = getAllMessagesException(sqle);
reporter.report(record, sqlAllMessagesException);
Exception allMessagesException = getAllMessagesException(sqle);
reporter.report(record, allMessagesException);
writer.closeQuietly();
}
}
}

private SQLException getAllMessagesException(SQLException sqle) {
String sqleAllMessages = "Exception chain:" + System.lineSeparator();
for (Throwable e : sqle) {
sqleAllMessages += e + System.lineSeparator();
private Exception getAllMessagesException(Throwable ex) {
StringBuilder allMessages = new StringBuilder("Exception chain:" + System.lineSeparator());
for (Throwable e : iterator(ex)) {
allMessages.append(e).append(System.lineSeparator());
}
SQLException sqlAllMessagesException = new SQLException(sqleAllMessages);
sqlAllMessagesException.setNextException(sqle);
return sqlAllMessagesException;
return new SQLException(allMessages.toString());
}

@Override
Expand Down
58 changes: 58 additions & 0 deletions src/main/java/io/confluent/connect/jdbc/util/ExceptionUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.jdbc.util;

import java.sql.SQLException;
import java.util.Iterator;
import java.util.NoSuchElementException;

public class ExceptionUtil {

public static Iterable<Throwable> iterator(Throwable ex) {
return ex instanceof SQLException
? ((SQLException) ex)
: () -> new Iterator<Throwable>() {
Throwable firstException = ex;
Throwable cause = firstException.getCause();

@Override
public boolean hasNext() {
return firstException != null || cause != null;
}

@Override
public Throwable next() {
Throwable throwable;
if (firstException != null) {
throwable = firstException;
firstException = null;
} else if (cause != null) {
throwable = cause;
cause = cause.getCause();
} else {
throw new NoSuchElementException();
}
return throwable;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}

}
31 changes: 31 additions & 0 deletions src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;

import java.io.IOException;
import java.io.PrintWriter;
Expand Down Expand Up @@ -52,6 +54,7 @@
import org.junit.Test;

import io.confluent.connect.jdbc.util.DateTimeUtils;
import org.mockito.Mockito;

public class JdbcSinkTaskTest extends EasyMockSupport {
private final SqliteHelper sqliteHelper = new SqliteHelper(getClass().getSimpleName());
Expand Down Expand Up @@ -302,6 +305,34 @@ void initWriter() {
verifyAll();
}

@Test
public void retryOnConnectException() throws SQLException {
final int maxRetries = 2;
final int retryBackoffMs = 1000;

List<SinkRecord> records = createRecordsList(1);
JdbcDbWriter mockWriter = Mockito.mock(JdbcDbWriter.class);
doThrow(new ConnectException("error")).when(mockWriter).write(any());
JdbcSinkTask task = new JdbcSinkTask() {
@Override
void initWriter() {
this.writer = mockWriter;
}
};
task.initialize(Mockito.mock(SinkTaskContext.class));

Map<String, String> props = setupBasicProps(maxRetries, retryBackoffMs);
task.start(props);

try {
task.put(records);
fail();
} catch (RetriableException expected) {
assertEquals(SQLException.class, expected.getCause().getClass());
assertTrue(expected.getMessage().contains("ConnectException"));
}
}

@Test
public void errorReporting() throws SQLException {
List<SinkRecord> records = createRecordsList(1);
Expand Down

0 comments on commit 3ed5ad9

Please sign in to comment.