From 692d780b95640b3ddfc53319f49b2843837648d2 Mon Sep 17 00:00:00 2001 From: yevgenp Date: Mon, 31 Oct 2022 14:02:25 +0200 Subject: [PATCH] Rethrow ConnectException from ConnectionProvider as RetriableException to provide proper retries. --- .../connect/jdbc/sink/JdbcSinkTask.java | 32 +++++++------- .../connect/jdbc/util/ExceptionUtil.java | 44 +++++++++++++++++++ .../connect/jdbc/sink/JdbcSinkTaskTest.java | 31 +++++++++++++ 3 files changed, 91 insertions(+), 16 deletions(-) create mode 100644 src/main/java/io/confluent/connect/jdbc/util/ExceptionUtil.java diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java index 43f8a16605..281748f2ff 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java @@ -15,6 +15,8 @@ package io.confluent.connect.jdbc.sink; +import static io.confluent.connect.jdbc.util.ExceptionUtil.iterator; + import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; @@ -88,24 +90,24 @@ public void put(Collection records) { } else { throw tace; } - } catch (SQLException sqle) { + } catch (ConnectException | SQLException ex) { log.warn( "Write of {} records failed, remainingRetries={}", records.size(), remainingRetries, - sqle + ex ); int totalExceptions = 0; - for (Throwable e :sqle) { + for (Throwable e : iterator(ex)) { totalExceptions++; } - SQLException sqlAllMessagesException = getAllMessagesException(sqle); + Exception allMessagesException = getAllMessagesException(ex); if (remainingRetries > 0) { writer.closeQuietly(); initWriter(); remainingRetries--; context.timeout(config.retryBackoffMs); - throw new RetriableException(sqlAllMessagesException); + throw new RetriableException(allMessagesException); } else { if (reporter != null) { unrollAndRetry(records); @@ -116,10 +118,10 @@ public void put(Collection records) { + "For complete details on each exception, please enable DEBUG logging.", totalExceptions); int exceptionCount = 1; - for (Throwable e : sqle) { + for (Throwable e : iterator(ex)) { log.debug("Exception {}:", exceptionCount++, e); } - throw new ConnectException(sqlAllMessagesException); + throw new ConnectException(allMessagesException); } } } @@ -135,21 +137,19 @@ private void unrollAndRetry(Collection records) { reporter.report(record, tace); writer.closeQuietly(); } catch (SQLException sqle) { - SQLException sqlAllMessagesException = getAllMessagesException(sqle); - reporter.report(record, sqlAllMessagesException); + Exception allMessagesException = getAllMessagesException(sqle); + reporter.report(record, allMessagesException); writer.closeQuietly(); } } } - private SQLException getAllMessagesException(SQLException sqle) { - String sqleAllMessages = "Exception chain:" + System.lineSeparator(); - for (Throwable e : sqle) { - sqleAllMessages += e + System.lineSeparator(); + private Exception getAllMessagesException(Throwable ex) { + StringBuilder allMessages = new StringBuilder("Exception chain:" + System.lineSeparator()); + for (Throwable e : iterator(ex)) { + allMessages.append(e).append(System.lineSeparator()); } - SQLException sqlAllMessagesException = new SQLException(sqleAllMessages); - sqlAllMessagesException.setNextException(sqle); - return sqlAllMessagesException; + return new SQLException(allMessages.toString()); } @Override diff --git a/src/main/java/io/confluent/connect/jdbc/util/ExceptionUtil.java b/src/main/java/io/confluent/connect/jdbc/util/ExceptionUtil.java new file mode 100644 index 0000000000..83541f3849 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/util/ExceptionUtil.java @@ -0,0 +1,44 @@ +package io.confluent.connect.jdbc.util; + +import java.sql.SQLException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class ExceptionUtil { + + public static Iterable iterator(Throwable ex) { + return ex instanceof SQLException + ? ((SQLException) ex) + : () -> new Iterator() { + Throwable firstException = ex; + Throwable cause = firstException.getCause(); + + @Override + public boolean hasNext() { + return firstException != null || cause != null; + } + + @Override + public Throwable next() { + Throwable throwable; + if(firstException != null){ + throwable = firstException; + firstException = null; + } + else if(cause != null){ + throwable = cause; + cause = cause.getCause(); + } + else + throw new NoSuchElementException(); + return throwable; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + +} diff --git a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java index 45f8be01b5..369575514e 100644 --- a/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java +++ b/src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import java.io.IOException; import java.io.PrintWriter; @@ -52,6 +54,7 @@ import org.junit.Test; import io.confluent.connect.jdbc.util.DateTimeUtils; +import org.mockito.Mockito; public class JdbcSinkTaskTest extends EasyMockSupport { private final SqliteHelper sqliteHelper = new SqliteHelper(getClass().getSimpleName()); @@ -302,6 +305,34 @@ void initWriter() { verifyAll(); } + @Test + public void retryOnConnectException() throws SQLException { + final int maxRetries = 2; + final int retryBackoffMs = 1000; + + List records = createRecordsList(1); + JdbcDbWriter mockWriter = Mockito.mock(JdbcDbWriter.class); + doThrow(new ConnectException("error")).when(mockWriter).write(any()); + JdbcSinkTask task = new JdbcSinkTask() { + @Override + void initWriter() { + this.writer = mockWriter; + } + }; + task.initialize(Mockito.mock(SinkTaskContext.class)); + + Map props = setupBasicProps(maxRetries, retryBackoffMs); + task.start(props); + + try { + task.put(records); + fail(); + } catch (RetriableException expected) { + assertEquals(SQLException.class, expected.getCause().getClass()); + assertTrue(expected.getMessage().contains("ConnectException")); + } + } + @Test public void errorReporting() throws SQLException { List records = createRecordsList(1);