Skip to content

Commit

Permalink
added error handling for
Browse files Browse the repository at this point in the history
  • Loading branch information
averemee committed Apr 19, 2024
1 parent b0f3467 commit 28bd2da
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,18 @@ public void run() {
xid, timestamp, lastScn);
}
if (useChronicleQueue) {
transaction = new OraCdcTransactionChronicleQueue(processLobs, queuesRoot, xid);
try {
transaction = new OraCdcTransactionChronicleQueue(processLobs, queuesRoot, xid);
} catch (Exception cqe) {
LOGGER.error(
"\n=====================\n" +
"'{}' while initializing Chronicle Queue.\n" +
"\tREF. https://github.com/OpenHFT/Chronicle-Queue/issues/1446\n" +
"Please send errorstack below to [email protected]\n{}\n" +
"=====================\n",
cqe.getMessage(), ExceptionUtils.getExceptionStackTrace(cqe));
throw new ConnectException(cqe);
}
} else {
transaction = new OraCdcTransactionArrayList(xid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ public class OraCdcTransactionChronicleQueue extends OraCdcTransactionBase imple
private static final String QUEUE_DIR = "queueDirectory";
private static final String QUEUE_OFFSET = "tailerOffset";
private static final String PROCESS_LOBS = "processLobs";
private static final String CQ_ISSUE_1446_RETRY_MSG = "Received https://github.com/OpenHFT/Chronicle-Queue/issues/1446, will try again";
private static final String CQ_ISSUE_1446_MSG =
"\n=====================\n" +
"'{}' while initializing Chronicle Queue.\n" +
"Perhaps this is https://github.com/OpenHFT/Chronicle-Queue/issues/1446\n" +
"Please suggest increase the value of system property \"chronicle.table.store.timeoutMS\".\n" +
"\tFor more information on Chronicle Queue parameters please visit https://github.com/OpenHFT/Chronicle-Queue/blob/ea/systemProperties.adoc .\n" +
"=====================\n";
private static final int LOCK_RETRY = 5;

private long firstChange;
private long nextChange;
Expand Down Expand Up @@ -90,19 +99,58 @@ public OraCdcTransactionChronicleQueue(final boolean processLobs, final Path roo
}
}
try {
statements = ChronicleQueue
.singleBuilder(queueDirectory)
.build();
boolean cqDone = false;
Exception lastException = null;
for (int i = 0; i < LOCK_RETRY; i++) {
try {
statements = ChronicleQueue
.singleBuilder(queueDirectory)
.build();
cqDone = true;
} catch (IllegalStateException ise) {
LOGGER.error(CQ_ISSUE_1446_RETRY_MSG);
deleteDir(queueDirectory);
if (i == LOCK_RETRY - 1) {
lastException = ise;
}
}
if (cqDone) {
break;
}
}
if (!cqDone) {
LOGGER.error(CQ_ISSUE_1446_MSG, lastException.getMessage());
throw lastException;
}
tailer = statements.createTailer();
appender = statements.acquireAppender();
queueSize = 0;
tailerOffset = 0;
if (processLobs) {
lobs = ChronicleQueue
.singleBuilder(lobsQueueDirectory)
.build();
lobsTailer = lobs.createTailer();
lobsAppender = lobs.acquireAppender();
cqDone = false;
for (int i = 0; i < LOCK_RETRY; i++) {
try {
lobs = ChronicleQueue
.singleBuilder(lobsQueueDirectory)
.build();
cqDone = true;
} catch (IllegalStateException ise) {
LOGGER.error(CQ_ISSUE_1446_RETRY_MSG);
deleteDir(lobsQueueDirectory);
if (i == LOCK_RETRY - 1) {
lastException = ise;
}
}
if (cqDone) {
break;
}
}
if (!cqDone) {
LOGGER.error(CQ_ISSUE_1446_MSG, lastException.getMessage());
throw lastException;
}
lobsTailer = lobs.createTailer();
lobsAppender = lobs.acquireAppender();
}
} catch (Exception e) {
LOGGER.error("Unable to create Chronicle Queue!");
Expand Down Expand Up @@ -329,23 +377,23 @@ public void close() {
statements.close();
}
statements = null;
if (processLobs) {
deleteDir(lobsQueueDirectory);
}
deleteDir(queueDirectory);
}

private void deleteDir(final Path directory) {
try {
if (processLobs) {
Files.walk(lobsQueueDirectory)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
}
Files.walk(queueDirectory)
Files.walk(directory)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (NoSuchFileException nsf) {
LOGGER.error(nsf.getMessage());
} catch (IOException ioe) {
LOGGER.error("Unable to delete Cronicle Queue files.");
LOGGER.error(ExceptionUtils.getExceptionStackTrace(ioe));
}
LOGGER.error(ioe.getMessage());
}
}


Expand Down

0 comments on commit 28bd2da

Please sign in to comment.