Skip to content

Commit

Permalink
Merge pull request #4557 from gchq/gh-4550_jooq_context_issue
Browse files Browse the repository at this point in the history
#4550 Fix datasource already in use issue
  • Loading branch information
stroomdev66 authored Oct 23, 2024
2 parents 50200cc + 03c9616 commit caaccc0
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,27 @@ public Processor create(final Processor processor) {
@Override
public Processor update(final Processor processor) {
return JooqUtil.contextResult(
processorDbConnProvider,
context -> {
final int count = context
.update(PROCESSOR)
.set(PROCESSOR.VERSION, PROCESSOR.VERSION.plus(1))
.set(PROCESSOR.UPDATE_TIME_MS, processor.getUpdateTimeMs())
.set(PROCESSOR.UPDATE_USER, processor.getUpdateUser())
.set(PROCESSOR.ENABLED, processor.isEnabled())
.set(PROCESSOR.DELETED, processor.isDeleted())
.where(PROCESSOR.ID.eq(processor.getId()))
.and(PROCESSOR.VERSION.eq(processor.getVersion()))
.execute();

if (count == 0) {
throw new DataChangedException("Failed to update processor, " +
"it may have been updated by another user or deleted");
}

return fetch(processor.getId()).orElseThrow(() ->
new RuntimeException("Error fetching updated processor"));
});
processorDbConnProvider,
context -> {
final int count = context
.update(PROCESSOR)
.set(PROCESSOR.VERSION, PROCESSOR.VERSION.plus(1))
.set(PROCESSOR.UPDATE_TIME_MS, processor.getUpdateTimeMs())
.set(PROCESSOR.UPDATE_USER, processor.getUpdateUser())
.set(PROCESSOR.ENABLED, processor.isEnabled())
.set(PROCESSOR.DELETED, processor.isDeleted())
.where(PROCESSOR.ID.eq(processor.getId()))
.and(PROCESSOR.VERSION.eq(processor.getVersion()))
.execute();

if (count == 0) {
throw new DataChangedException("Failed to update processor, " +
"it may have been updated by another user or deleted");
}

return fetch(context, processor.getId());
}).map(RECORD_TO_PROCESSOR_MAPPER)
.orElseThrow(() -> new RuntimeException("Error fetching updated processor"));
}

@Override
Expand All @@ -149,8 +149,7 @@ public int logicalDeleteByProcessorId(final int processorId) {
return JooqUtil.transactionResult(processorDbConnProvider, context -> {
// Logically delete all the child filters first
processorFilterDao.logicalDeleteByProcessorId(processorId, context);
final int count = logicalDeleteByProcessorId(processorId, context);
return count;
return logicalDeleteByProcessorId(processorId, context);
});
} catch (final Exception e) {
throw new RuntimeException("Error deleting filters and processor for processor id " + processorId, e);
Expand Down Expand Up @@ -196,7 +195,7 @@ public int physicalDeleteOldProcessors(final Instant deleteThreshold) {
totalCount.addAndGet(count);
} catch (final DataAccessException e) {
if (e.getCause() instanceof final SQLIntegrityConstraintViolationException sqlEx) {
LOGGER.debug("Expected constraint violation exception: " + sqlEx.getMessage(), e);
LOGGER.debug(() -> "Expected constraint violation exception: " + sqlEx.getMessage(), e);
} else {
throw e;
}
Expand All @@ -208,14 +207,18 @@ public int physicalDeleteOldProcessors(final Instant deleteThreshold) {

@Override
public Optional<Processor> fetch(final int id) {
return JooqUtil.contextResult(processorDbConnProvider, context -> context
.select()
.from(PROCESSOR)
.where(PROCESSOR.ID.eq(id))
.fetchOptional())
return JooqUtil.contextResult(processorDbConnProvider, context -> fetch(context, id))
.map(RECORD_TO_PROCESSOR_MAPPER);
}

private Optional<Record> fetch(final DSLContext context, final int id) {
return context
.select()
.from(PROCESSOR)
.where(PROCESSOR.ID.eq(id))
.fetchOptional();
}

@Override
public Optional<Processor> fetchByPipelineUuid(final String pipelineUuid) {
Objects.requireNonNull(pipelineUuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private ProcessorFilterTracker createTracker(final DSLContext context) {
final ProcessorFilterTracker tracker = new ProcessorFilterTracker();
tracker.setVersion(1);
tracker.setStatus(ProcessorFilterTrackerStatus.CREATED);
final int id = context
final Integer id = context
.insertInto(PROCESSOR_FILTER_TRACKER)
.columns(
PROCESSOR_FILTER_TRACKER.VERSION,
Expand Down Expand Up @@ -147,14 +147,15 @@ private ProcessorFilterTracker createTracker(final DSLContext context) {
NullSafe.get(tracker.getStatus(), ProcessorFilterTrackerStatus::getPrimitiveValue))
.returning(PROCESSOR_FILTER_TRACKER.ID)
.fetchOne(PROCESSOR_FILTER_TRACKER.ID);
Objects.requireNonNull(id);
tracker.setId(id);
return tracker;
}

private ProcessorFilter createFilter(final DSLContext context, final ProcessorFilter filter) {
filter.setVersion(1);
final String data = queryDataXMLSerialiser.serialise(filter.getQueryData());
final int id = context
final Integer id = context
.insertInto(PROCESSOR_FILTER)
.columns(PROCESSOR_FILTER.VERSION,
PROCESSOR_FILTER.CREATE_TIME_MS,
Expand Down Expand Up @@ -192,6 +193,7 @@ private ProcessorFilter createFilter(final DSLContext context, final ProcessorFi
NullSafe.get(filter.getRunAsUser(), UserRef::getUuid))
.returning(PROCESSOR_FILTER.ID)
.fetchOne(PROCESSOR_FILTER.ID);
Objects.requireNonNull(id);
filter.setId(id);
return filter;
}
Expand Down Expand Up @@ -221,7 +223,7 @@ private ProcessorFilter updateFilter(final DSLContext context, final ProcessorFi
"it may have been updated by another user or deleted");
}

return fetch(filter.getId()).orElseThrow(() ->
return fetch(context, filter.getId()).map(this::mapRecord).orElseThrow(() ->
new RuntimeException("Error fetching updated processor filter"));
}

Expand Down Expand Up @@ -379,17 +381,19 @@ public Set<String> physicalDeleteOldProcessorFilters(final Instant deleteThresho

@Override
public Optional<ProcessorFilter> fetch(final int id) {
return JooqUtil.contextResult(processorDbConnProvider, context ->
context
.select()
.from(PROCESSOR_FILTER)
.join(PROCESSOR_FILTER_TRACKER)
.on(PROCESSOR_FILTER.FK_PROCESSOR_FILTER_TRACKER_ID.eq(PROCESSOR_FILTER_TRACKER.ID))
.join(PROCESSOR)
.on(PROCESSOR_FILTER.FK_PROCESSOR_ID.eq(PROCESSOR.ID))
.where(PROCESSOR_FILTER.ID.eq(id))
.fetchOptional())
.map(this::mapRecord);
return JooqUtil.contextResult(processorDbConnProvider, context -> fetch(context, id)).map(this::mapRecord);
}

private Optional<Record> fetch(final DSLContext context, final int id) {
return context
.select()
.from(PROCESSOR_FILTER)
.join(PROCESSOR_FILTER_TRACKER)
.on(PROCESSOR_FILTER.FK_PROCESSOR_FILTER_TRACKER_ID.eq(PROCESSOR_FILTER_TRACKER.ID))
.join(PROCESSOR)
.on(PROCESSOR_FILTER.FK_PROCESSOR_ID.eq(PROCESSOR.ID))
.where(PROCESSOR_FILTER.ID.eq(id))
.fetchOptional();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
Expand Down Expand Up @@ -90,7 +91,7 @@ public User create(final User user) {
private User create(final DSLContext context, final User user) {
user.setVersion(1);
user.setUuid(UUID.randomUUID().toString());
final int id = context
final Integer id = context
.insertInto(STROOM_USER)
.columns(STROOM_USER.VERSION,
STROOM_USER.CREATE_TIME_MS,
Expand All @@ -116,6 +117,7 @@ private User create(final DSLContext context, final User user) {
user.getFullName())
.returning(STROOM_USER.ID)
.fetchOne(STROOM_USER.ID);
Objects.requireNonNull(id);
user.setId(id);
return user;
}
Expand Down
24 changes: 24 additions & 0 deletions unreleased_changes/20241023_110006_453__4550.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
* Issue **#4550** : Fix datasource already in use issue.


```sh
# ********************************************************************************
# Issue title: Datasource already in use
# Issue link: https://github.com/gchq/stroom/issues/4550
# ********************************************************************************

# ONLY the top line will be included as a change entry in the CHANGELOG.
# The entry should be in GitHub flavour markdown and should be written on a SINGLE
# line with no hard breaks. You can have multiple change files for a single GitHub issue.
# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than
# 'Fixed nasty bug'.
#
# Examples of acceptable entries are:
#
#
# * Issue **123** : Fix bug with an associated GitHub issue in this repository
#
# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository
#
# * Fix bug with no associated GitHub issue.
```

0 comments on commit caaccc0

Please sign in to comment.