From 472603f4587cd1f703b9dfd9eb1f836d83d7f85b Mon Sep 17 00:00:00 2001 From: DiCanio Date: Mon, 31 Jan 2022 16:26:56 +0100 Subject: [PATCH] Fix Race Condition Using Multiple Brokers Fixes a race condition that can occur when using multiple brokers. For the race condition to occur, one of the brokers must take a long time to publish a query while another one already managed to produce a result. This would lead to a missing database entry since the dispatch function is transactional over all brokers and does not commit any transaction until all brokers have published their query. In order to prevent this situation from happening, the database is not used for any sort of component communication anymore. Instead necessary broker and backend query ID translations are moved to the broker implementations. --- .../query/QueryHandlerService.java | 2 +- .../query/broker/BrokerClient.java | 44 ++++++------- .../query/broker/aktin/AktinBrokerClient.java | 54 +++++++++------- .../aktin/WrappedNotificationListener.java | 12 ++-- .../broker/direct/DirectBrokerClient.java | 53 +++++++++------- .../query/broker/dsf/DSFBrokerClient.java | 37 +++++++---- .../broker/dsf/DSFQueryResultCollector.java | 8 ++- .../query/broker/mock/MockBrokerClient.java | 53 +++++++++------- .../collect/QueryCollectSpringConfig.java | 6 +- .../query/collect/QueryStatusListener.java | 15 ++--- .../collect/QueryStatusListenerImpl.java | 29 +++++---- .../query/collect/QueryStatusUpdate.java | 13 ++++ .../query/dispatch/QueryDispatcher.java | 4 +- .../broker/direct/DirectBrokerClientIT.java | 37 ++++++----- .../broker/direct/DirectBrokerClientTest.java | 10 +-- .../broker/dsf/DSFQueryResultCollectorIT.java | 31 +++++----- .../query/broker/mock/MockBrokerClientIT.java | 50 +++++++++------ .../broker/mock/MockBrokerClientTest.java | 10 +-- .../collect/QueryStatusListenerImplIT.java | 61 ++++++++----------- 19 files changed, 303 insertions(+), 226 deletions(-) create mode 100644 src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusUpdate.java diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/QueryHandlerService.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/QueryHandlerService.java index 371f0e60..fe267637 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/QueryHandlerService.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/QueryHandlerService.java @@ -18,7 +18,6 @@ import static de.numcodex.feasibility_gui_backend.query.persistence.ResultType.SUCCESS; @Service -@Transactional @RequiredArgsConstructor public class QueryHandlerService { @@ -37,6 +36,7 @@ public Long runQuery(StructuredQuery structuredQuery) throws QueryDispatchExcept return queryId; } + @Transactional public QueryResult getQueryResult(Long queryId) { var singleSiteResults = resultRepository.findByQueryAndStatus(queryId, SUCCESS); diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/BrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/BrokerClient.java index 9cd7fe10..e556a5ee 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/BrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/BrokerClient.java @@ -28,70 +28,72 @@ public interface BrokerClient { void addQueryStatusListener(QueryStatusListener queryStatusListener) throws IOException; /** - * Creates a new unpublished query. + * Creates a new unpublished broker specific query. * - * @return The ID of the created query. + * @param backendQueryId Identifier for a backend specific query that a broker specific query is created for. + * @return Identifier of the broker specific query. * @throws IOException IO/communication error */ - String createQuery() throws IOException; + String createQuery(Long backendQueryId) throws IOException; /** - * Adds the necessary query definition to a query. + * Adds the necessary query definition to a broker specific query. *

* The query definition is added to a query identified by the given query ID. * - * @param queryId Identifies the query that the query definition shall be added to. - * @param mediaType ? - * @param content The actual query in plain text. + * @param brokerQueryId Identifies the broker specific query that the query definition shall be added to. + * @param mediaType MIME type for the query content. + * @param content The actual query in plain text. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws UnsupportedMediaTypeException If the given media type is not supported. * @throws IOException IO/communication error */ - void addQueryDefinition(String queryId, String mediaType, String content) throws QueryNotFoundException, + void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException, UnsupportedMediaTypeException, IOException; /** - * Publishes a query for distributed execution. + * Publishes a broker specific query for distributed execution. * - * @param queryId Identifies the query that shall be published. + * @param brokerQueryId Identifies the broker specific query that shall be published. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws IOException If the query was not published due IO/communication error */ - void publishQuery(String queryId) throws QueryNotFoundException, IOException; + void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException; /** - * Marks an already published query as closed. + * Marks an already published broker specific query as closed. *

* After calling this function calls to {@link #getResultFeasibility(String, String)} and * {@link #getResultSiteIds(String)} will fail. * - * @param queryId Identifies the query that shall be closed. + * @param brokerQueryId Identifies the broker specific query that shall be closed. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws IOException IO/communication error */ - void closeQuery(String queryId) throws QueryNotFoundException, IOException; + void closeQuery(String brokerQueryId) throws QueryNotFoundException, IOException; /** - * Gets the feasibility (measure count) of a published query for a specific site. + * Gets the feasibility (measure count) of a published broker specific query for a specific site. * - * @param queryId Identifies the query. - * @param siteId Identifies the site within the query whose feasibility shall be gotten. + * @param brokerQueryId Identifies the broker specific query. + * @param siteId Identifies the site within the query whose feasibility shall be gotten. * @return The feasibility for a specific site within a query. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws SiteNotFoundException If the given site ID does not identify a known site within a query. * @throws IOException IO/communication error */ - int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException, IOException; + int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException, + IOException; /** - * Gets all site IDs associated with a published query that can already provide a result. + * Gets all site IDs associated with a published broker specific query that can already provide a result. * - * @param queryId Identifies the query whose associated site IDs with results shall be gotten. + * @param brokerQueryId Identifies the broker specific query whose associated site IDs with results shall be gotten. * @return All site IDs of a specific query that can already provide results. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws IOException IO/communication error */ - List getResultSiteIds(String queryId) throws QueryNotFoundException, IOException; + List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException, IOException; /** * Gets the display name of a site. diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/AktinBrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/AktinBrokerClient.java index 8c24c7a7..141e6db9 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/AktinBrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/AktinBrokerClient.java @@ -6,14 +6,15 @@ import de.numcodex.feasibility_gui_backend.query.broker.UnsupportedMediaTypeException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; import de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType; -import lombok.AllArgsConstructor; import org.aktin.broker.client2.BrokerAdmin2; import org.aktin.broker.xml.Node; import org.aktin.broker.xml.RequestStatusInfo; import java.io.IOException; +import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; +import java.util.Objects; import static de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType.AKTIN; @@ -23,11 +24,16 @@ * @author R.W.Majeed * */ -@AllArgsConstructor public class AktinBrokerClient implements BrokerClient { - final private BrokerAdmin2 delegate; + private final BrokerAdmin2 delegate; + private final Map brokerToBackendQueryIdMapping; - @Override + public AktinBrokerClient(BrokerAdmin2 delegate) { + this.delegate = Objects.requireNonNull(delegate); + this.brokerToBackendQueryIdMapping = new HashMap<>(); + } + + @Override public BrokerClientType getBrokerType() { return AKTIN; } @@ -55,46 +61,49 @@ int unwrapSiteId(String siteId) { } @Override - public String createQuery() throws IOException { - return wrapQueryId(delegate.createRequest()); - } + public String createQuery(Long backendQueryId) throws IOException { + var brokerQueryId = wrapQueryId(delegate.createRequest()); + brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId); + + return brokerQueryId; + } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException, UnsupportedMediaTypeException, IOException { - delegate.putRequestDefinition(unwrapQueryId(queryId), mediaType, content); + delegate.putRequestDefinition(unwrapQueryId(brokerQueryId), mediaType, content); } @Override - public void publishQuery(String queryId) throws QueryNotFoundException, IOException { - delegate.publishRequest(unwrapQueryId(queryId)); + public void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException { + delegate.publishRequest(unwrapQueryId(brokerQueryId)); } @Override - public void closeQuery(String queryId) throws IOException { - delegate.closeRequest(unwrapQueryId(queryId)); + public void closeQuery(String brokerQueryId) throws IOException { + delegate.closeRequest(unwrapQueryId(brokerQueryId)); } @Override - public int getResultFeasibility(String queryId, String siteId) + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException, IOException { - String result = delegate.getResultString(unwrapQueryId(queryId), unwrapSiteId(siteId)); + String result = delegate.getResultString(unwrapQueryId(brokerQueryId), unwrapSiteId(siteId)); if( result == null ) { - throw new SiteNotFoundException(queryId, siteId); + throw new SiteNotFoundException(brokerQueryId, siteId); } return Integer.parseInt(result); } @Override - public List getResultSiteIds(String queryId) throws QueryNotFoundException, IOException { - List list = delegate.listRequestStatus(unwrapQueryId(queryId)); + public List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException, IOException { + List list = delegate.listRequestStatus(unwrapQueryId(brokerQueryId)); if( list == null ) { - throw new QueryNotFoundException(queryId); + throw new QueryNotFoundException(brokerQueryId); } return list.stream() .map( (info) -> wrapSiteId(info.node) ) - .collect(Collectors.toUnmodifiableList()); + .toList(); } @Override @@ -106,4 +115,7 @@ public String getSiteName(String siteId) throws SiteNotFoundException, IOExcepti return node.getCommonName(); } + Long getBackendQueryId(String brokerQueryId) { + return brokerToBackendQueryIdMapping.get(brokerQueryId); + } } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/WrappedNotificationListener.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/WrappedNotificationListener.java index ae10e1da..9c86c18d 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/WrappedNotificationListener.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/WrappedNotificationListener.java @@ -1,5 +1,6 @@ package de.numcodex.feasibility_gui_backend.query.broker.aktin; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatus; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; import lombok.AllArgsConstructor; @@ -22,7 +23,7 @@ @Log public class WrappedNotificationListener implements AdminNotificationListener{ final private AktinBrokerClient client; - final private QueryStatusListener delegate; + final private QueryStatusListener statusListener; @Override public void onRequestCreated(int requestId) { @@ -41,7 +42,7 @@ public void onRequestClosed(int requestId) { @Override public void onRequestStatusUpdate(int requestId, int nodeId, String status) { - QueryStatus dest; + QueryStatus queryStatus; RequestStatus rs; try{ rs = RequestStatus.valueOf(status); @@ -49,7 +50,7 @@ public void onRequestStatusUpdate(int requestId, int nodeId, String status) { return; // unsupported/unrecognized status } - dest = switch (rs) { + queryStatus = switch (rs) { case completed -> QueryStatus.COMPLETED; case processing -> QueryStatus.EXECUTING; case queued -> QueryStatus.QUEUED; @@ -57,7 +58,10 @@ public void onRequestStatusUpdate(int requestId, int nodeId, String status) { default -> QueryStatus.FAILED; }; log.log(Level.INFO,"Request status updated {0} {1} {2}", new Object[] {requestId,nodeId,status}); - delegate.onClientUpdate(client, client.wrapQueryId(requestId), client.wrapSiteId(nodeId), dest); + var statusUpdate = new QueryStatusUpdate(client, client.wrapQueryId(requestId), client.wrapSiteId(nodeId), + queryStatus); + var associatedBackendQueryId = client.getBackendQueryId(client.wrapQueryId(requestId)); + statusListener.onClientUpdate(associatedBackendQueryId, statusUpdate); } @Override diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClient.java index 015723c7..08c2d5e3 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClient.java @@ -2,6 +2,7 @@ import de.numcodex.feasibility_gui_backend.query.QueryMediaType; import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; @@ -35,7 +36,8 @@ public class DirectBrokerClient implements BrokerClient { private final WebClient webClient; private final List listeners; - private final Map queries; + private final Map brokerQueries; + private final Map brokerToBackendQueryIdMapping; /** * Creates a new {@link DirectBrokerClient} instance that uses the given web client to communicate with a Flare @@ -46,7 +48,8 @@ public class DirectBrokerClient implements BrokerClient { public DirectBrokerClient(WebClient webClient) { this.webClient = Objects.requireNonNull(webClient); listeners = new ArrayList<>(); - queries = new HashMap<>(); + brokerQueries = new HashMap<>(); + brokerToBackendQueryIdMapping = new HashMap<>(); } @Override @@ -60,24 +63,26 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) { } @Override - public String createQuery() { - var query = DirectQuery.create(); - queries.put(query.getQueryId(), query); + public String createQuery(Long backendQueryId) { + var brokerQuery = DirectQuery.create(); + var brokerQueryId = brokerQuery.getQueryId(); + brokerQueries.put(brokerQueryId, brokerQuery); + brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId); - return query.getQueryId(); + return brokerQueryId; } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) throws QueryNotFoundException { - findQuery(queryId).addQueryDefinition(mediaType, content); + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException { + findQuery(brokerQueryId).addQueryDefinition(mediaType, content); } @Override - public void publishQuery(String queryId) throws QueryNotFoundException, IOException { - var query = findQuery(queryId); + public void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException { + var query = findQuery(brokerQueryId); var structuredQueryContent = Optional.ofNullable(query.getQueryDefinition(STRUCTURED_QUERY)) .orElseThrow(() -> new IllegalStateException("Query with ID " - + queryId + + brokerQueryId + " does not contain a query definition for the mandatory type: " + STRUCTURED_QUERY )); @@ -96,32 +101,36 @@ public void publishQuery(String queryId) throws QueryNotFoundException, IOExcept .map(Integer::valueOf) .doOnError(error -> { log.error(error.getMessage(), error); - listeners.forEach(l -> l.onClientUpdate(this, queryId, SITE_1_ID, FAILED)); + var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, SITE_1_ID, FAILED); + var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId); + listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate)); }) .subscribe(val -> { query.registerSiteResults(SITE_1_ID, val); - listeners.forEach(l -> l.onClientUpdate(this, queryId, SITE_1_ID, COMPLETED)); + var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, SITE_1_ID, COMPLETED); + var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId); + listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate)); }); } catch (Exception e) { - throw new IOException("An error occurred while publishing the query with ID: " + queryId, e); + throw new IOException("An error occurred while publishing the query with ID: " + brokerQueryId, e); } } @Override - public void closeQuery(String queryId) throws QueryNotFoundException { - if (queries.remove(queryId) == null) { - throw new QueryNotFoundException(queryId); + public void closeQuery(String brokerQueryId) throws QueryNotFoundException { + if (brokerQueries.remove(brokerQueryId) == null) { + throw new QueryNotFoundException(brokerQueryId); } } @Override - public int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { - return findQuery(queryId).getSiteResult(siteId); + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { + return findQuery(brokerQueryId).getSiteResult(siteId); } @Override - public List getResultSiteIds(String queryId) throws QueryNotFoundException { - return findQuery(queryId).getSiteIdsWithResult(); + public List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException { + return findQuery(brokerQueryId).getSiteIdsWithResult(); } @Override @@ -137,7 +146,7 @@ public String getSiteName(String siteId) { * @throws QueryNotFoundException If the ID does not identify a known query. */ private DirectQuery findQuery(String queryId) throws QueryNotFoundException { - return Optional.ofNullable(queries.get(queryId)) + return Optional.ofNullable(brokerQueries.get(queryId)) .orElseThrow(() -> new QueryNotFoundException(queryId)); } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFBrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFBrokerClient.java index 289b86e6..75d17b07 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFBrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFBrokerClient.java @@ -9,7 +9,9 @@ import de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType.DSF; @@ -19,6 +21,7 @@ public final class DSFBrokerClient implements BrokerClient { private final QueryManager queryManager; private final QueryResultCollector queryResultCollector; + private final Map brokerToBackendQueryIdMapping; /** * Creates a new {@link DSFBrokerClient} instance. @@ -29,6 +32,7 @@ public final class DSFBrokerClient implements BrokerClient { public DSFBrokerClient(QueryManager queryManager, QueryResultCollector queryResultCollector) { this.queryManager = queryManager; this.queryResultCollector = queryResultCollector; + brokerToBackendQueryIdMapping = new HashMap<>(); } @Override @@ -42,35 +46,38 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) thro } @Override - public String createQuery() { - return queryManager.createQuery(); + public String createQuery(Long backendQueryId) { + var brokerQueryId = queryManager.createQuery(); + brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId); + + return brokerQueryId; } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) throws QueryNotFoundException, + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException, UnsupportedMediaTypeException { - queryManager.addQueryDefinition(queryId, mediaType, content); + queryManager.addQueryDefinition(brokerQueryId, mediaType, content); } @Override - public void publishQuery(String queryId) throws QueryNotFoundException, IOException { - queryManager.publishQuery(queryId); + public void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException { + queryManager.publishQuery(brokerQueryId); } @Override - public void closeQuery(String queryId) throws QueryNotFoundException { - queryManager.removeQuery(queryId); - queryResultCollector.removeResults(queryId); + public void closeQuery(String brokerQueryId) throws QueryNotFoundException { + queryManager.removeQuery(brokerQueryId); + queryResultCollector.removeResults(brokerQueryId); } @Override - public int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { - return queryResultCollector.getResultFeasibility(queryId, siteId); + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { + return queryResultCollector.getResultFeasibility(brokerQueryId, siteId); } @Override - public List getResultSiteIds(String queryId) throws QueryNotFoundException { - return queryResultCollector.getResultSiteIds(queryId); + public List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException { + return queryResultCollector.getResultSiteIds(brokerQueryId); } @Override @@ -78,4 +85,8 @@ public String getSiteName(String siteId) throws SiteNotFoundException { // TODO: implement (separate issue) return null; } + + Long getBackendQueryId(String brokerQueryId) { + return brokerToBackendQueryIdMapping.get(brokerQueryId); + } } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollector.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollector.java index f76f3098..d3b73f48 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollector.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollector.java @@ -2,6 +2,7 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.parser.IParser; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatus; @@ -74,8 +75,13 @@ private IParser setUpResourceParser() { private void notifyResultListeners(DSFQueryResult result) { for (Entry listener : listeners.entrySet()) { - listener.getValue().onClientUpdate(listener.getKey(), result.getQueryId(), result.getSiteId(), + var broker = listener.getKey(); + var statusListener = listener.getValue(); + var statusUpdate = new QueryStatusUpdate(broker, result.getQueryId(), result.getSiteId(), QueryStatus.COMPLETED); + var associatedBackendQueryId = broker.getBackendQueryId(result.getQueryId()); + + statusListener.onClientUpdate(associatedBackendQueryId, statusUpdate); } } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClient.java index 0223e88c..12618bff 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClient.java @@ -1,6 +1,7 @@ package de.numcodex.feasibility_gui_backend.query.broker.mock; import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; @@ -26,14 +27,16 @@ public class MockBrokerClient implements BrokerClient { private final List listeners; - private final Map queries; + private final Map brokerQueries; + private final Map brokerToBackendQueryIdMapping; private final Map siteNames; private final Map>> runningMocks; public MockBrokerClient() { listeners = new ArrayList<>(); - queries = new HashMap<>(); + brokerQueries = new HashMap<>(); + brokerToBackendQueryIdMapping = new HashMap<>(); siteNames = Map.of( "2", "Lübeck", "3", "Erlangen", @@ -54,53 +57,59 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) { } @Override - public String createQuery() { - var query = MockQuery.create(); - queries.put(query.getQueryId(), query); - runningMocks.put(query.getQueryId(), List.of()); - - return query.getQueryId(); + public String createQuery(Long backendQueryId) { + var brokerQuery = MockQuery.create(); + var brokerQueryId = brokerQuery.getQueryId(); + brokerQueries.put(brokerQueryId, brokerQuery); + brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId); + runningMocks.put(brokerQueryId, List.of()); + + return brokerQueryId; } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) { + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) { // No-Op since this is irrelevant. } @Override - public void publishQuery(String queryId) throws QueryNotFoundException { - var query = findQuery(queryId); + public void publishQuery(String brokerQueryId) throws QueryNotFoundException { + var query = findQuery(brokerQueryId); var mocks = siteNames.keySet() .stream().map(siteId -> CompletableFuture.runAsync(() -> { try { Thread.sleep(Math.round(2000 + 6000 * Math.random())); query.registerSiteResults(siteId, (int) Math.round(10 + 500 * Math.random())); - listeners.forEach(l -> l.onClientUpdate(this, queryId, siteId, COMPLETED)); + var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, siteId, COMPLETED); + var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId); + listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate)); } catch (InterruptedException e) { log.error(e.getMessage(), e); - listeners.forEach(l -> l.onClientUpdate(this, queryId, siteId, FAILED)); + var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, siteId, FAILED); + var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId); + listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate)); } })) .collect(Collectors.toList()); - runningMocks.put(queryId, mocks); + runningMocks.put(brokerQueryId, mocks); } @Override - public void closeQuery(String queryId) throws QueryNotFoundException { - Optional.ofNullable(runningMocks.get(queryId)) - .orElseThrow(() -> new QueryNotFoundException(queryId)) + public void closeQuery(String brokerQueryId) throws QueryNotFoundException { + Optional.ofNullable(runningMocks.get(brokerQueryId)) + .orElseThrow(() -> new QueryNotFoundException(brokerQueryId)) .forEach(rm -> rm.complete(null)); } @Override - public int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { - return findQuery(queryId).getSiteResult(siteId); + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { + return findQuery(brokerQueryId).getSiteResult(siteId); } @Override - public List getResultSiteIds(String queryId) throws QueryNotFoundException { - return findQuery(queryId).getSiteIdsWithResult(); + public List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException { + return findQuery(brokerQueryId).getSiteIdsWithResult(); } @Override @@ -116,7 +125,7 @@ public String getSiteName(String siteId) { * @throws QueryNotFoundException If the ID does not identify a known query. */ private MockQuery findQuery(String queryId) throws QueryNotFoundException { - return Optional.ofNullable(queries.get(queryId)) + return Optional.ofNullable(brokerQueries.get(queryId)) .orElseThrow(() -> new QueryNotFoundException(queryId)); } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryCollectSpringConfig.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryCollectSpringConfig.java index f4c225dc..a8c48993 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryCollectSpringConfig.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryCollectSpringConfig.java @@ -1,6 +1,6 @@ package de.numcodex.feasibility_gui_backend.query.collect; -import de.numcodex.feasibility_gui_backend.query.persistence.QueryDispatchRepository; +import de.numcodex.feasibility_gui_backend.query.persistence.QueryRepository; import de.numcodex.feasibility_gui_backend.query.persistence.ResultRepository; import de.numcodex.feasibility_gui_backend.query.persistence.SiteRepository; import org.springframework.context.annotation.Bean; @@ -10,9 +10,9 @@ public class QueryCollectSpringConfig { @Bean - public QueryStatusListener createQueryStatusListener(QueryDispatchRepository queryDispatchRepository, + public QueryStatusListener createQueryStatusListener(QueryRepository queryRepository, SiteRepository siteRepository, ResultRepository resultRepository) { - return new QueryStatusListenerImpl(queryDispatchRepository, siteRepository, resultRepository); + return new QueryStatusListenerImpl(queryRepository, siteRepository, resultRepository); } } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListener.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListener.java index b40f0095..8f327c53 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListener.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListener.java @@ -1,19 +1,16 @@ package de.numcodex.feasibility_gui_backend.query.collect; -import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; - /** - * Represents an entity capable of receiving results from a feasibility query running in a distributed fashion. + * Represents an entity capable of receiving results from brokers for broker specific queries that they run. */ public interface QueryStatusListener { /** - * Callback method to process feasibility query result updates. + * Processes update notifications from brokers to one of their broker specific queries that is associated with an + * internal (backend specific) query. * - * @param client The broker client that this update belongs to. - * @param queryId Identifies the query within the broker for which there is an update. - * @param siteId Identifies the site within the broker for which there is an update. Related to a specific query. - * @param status New state of the query. + * @param backendQueryId Identifier for a backend specific query that the status update is associated with. + * @param queryStatusUpdate Describes the update for a broker specific query. */ - void onClientUpdate(BrokerClient client, String queryId, String siteId, QueryStatus status); + void onClientUpdate(Long backendQueryId, QueryStatusUpdate queryStatusUpdate); } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImpl.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImpl.java index 870568fe..d880762b 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImpl.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImpl.java @@ -29,7 +29,7 @@ public class QueryStatusListenerImpl implements QueryStatusListener { @NonNull - private final QueryDispatchRepository queryDispatchRepository; + private final QueryRepository queryRepository; @NonNull private final SiteRepository siteRepository; @@ -38,17 +38,18 @@ public class QueryStatusListenerImpl implements QueryStatusListener { private final ResultRepository resultRepository; @Override - public void onClientUpdate(BrokerClient client, String externalQueryId, String externalSiteId, QueryStatus status) { - logQueryStatusChange(externalQueryId, externalSiteId, client.getBrokerType(), status); + public void onClientUpdate(Long backendQueryId, QueryStatusUpdate statusUpdate) { + logQueryStatusChange(statusUpdate.brokerQueryId(), statusUpdate.brokerSiteId(), + statusUpdate.source().getBrokerType(), statusUpdate.status()); try { - Optional matchesInPopulation = (status == COMPLETED) - ? Optional.of(resolveMatchesInPopulation(externalQueryId, externalSiteId, client)) + Optional matchesInPopulation = (statusUpdate.status() == COMPLETED) + ? Optional.of(resolveMatchesInPopulation(statusUpdate.brokerQueryId(), statusUpdate.brokerSiteId(), statusUpdate.source())) : Optional.empty(); - if (status == COMPLETED || status == FAILED) { - var internalQuery = lookupCorrespondingInternalQuery(externalQueryId, client.getBrokerType()); - var siteName = resolveSiteName(externalSiteId, client); + if (statusUpdate.status() == COMPLETED || statusUpdate.status() == FAILED) { + var internalQuery = lookupAssociatedBackendQuery(backendQueryId); + var siteName = resolveSiteName(statusUpdate.brokerSiteId(), statusUpdate.source()); var site = lookupSite(siteName) .orElseGet(() -> createSite(siteName)); @@ -56,7 +57,7 @@ public void onClientUpdate(BrokerClient client, String externalQueryId, String e } } catch (QueryResultCollectException e) { log.error("cannot persist result of query '%s' for site '%s' with status '%s'".formatted( - externalQueryId, externalSiteId, status.toString()), e); + statusUpdate.brokerQueryId(), statusUpdate.brokerSiteId(), statusUpdate.status().toString()), e); } } @@ -80,12 +81,10 @@ private int resolveMatchesInPopulation(String externalQueryId, String externalSi } } - private Query lookupCorrespondingInternalQuery(String externalQueryId, BrokerClientType brokerClientType) - throws QueryResultCollectException { - return queryDispatchRepository.findByExternalQueryIdAndBrokerType(externalQueryId, brokerClientType) - .flatMap(queryDispatch -> Optional.of(queryDispatch.getQuery())) - .orElseThrow(() -> new QueryResultCollectException("cannot resolve internal query for external query '%s'" - .formatted(externalQueryId))); + private Query lookupAssociatedBackendQuery(Long backendQueryId) throws QueryResultCollectException { + return queryRepository.findById(backendQueryId) + .orElseThrow(() -> new QueryResultCollectException("cannot find backend query with id '%s'" + .formatted(backendQueryId))); } private String resolveSiteName(String externalSiteId, BrokerClient client) throws QueryResultCollectException { diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusUpdate.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusUpdate.java new file mode 100644 index 00000000..e9b07b9b --- /dev/null +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusUpdate.java @@ -0,0 +1,13 @@ +package de.numcodex.feasibility_gui_backend.query.collect; + +import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; + +/** + * Defines a status update on a broker specific query. + *

+ * Comprises information about the broker that the update originates from, the associated broker specific query ID, the + * associated broker specific site ID as well as the query status itself. + */ +public record QueryStatusUpdate(BrokerClient source, String brokerQueryId, String brokerSiteId, + QueryStatus status) { +} diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/dispatch/QueryDispatcher.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/dispatch/QueryDispatcher.java index 5594c124..b12940bd 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/dispatch/QueryDispatcher.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/dispatch/QueryDispatcher.java @@ -80,7 +80,7 @@ public Long enqueueNewQuery(StructuredQuery query) throws QueryDispatchException /** * Dispatches (publishes) an already enqueued query in a broadcast fashion using configured {@link BrokerClient}s. * - * @param queryId Identifies the query that shall be dispatched. + * @param queryId Identifies the backend query that shall be dispatched. * @throws QueryDispatchException If an error occurs while dispatching the query. */ // TODO: Pass in audit information! (actor) @@ -92,7 +92,7 @@ public void dispatchEnqueuedQuery(Long queryId) throws QueryDispatchException { // TODO: error handling + asynchronous dispatch! try { for (BrokerClient broker : queryBrokerClients) { - var brokerQueryId = broker.createQuery(); + var brokerQueryId = broker.createQuery(queryId); for (Entry queryBodyFormats : translatedQueryBodyFormats.entrySet()) { broker.addQueryDefinition(brokerQueryId, queryBodyFormats.getKey().getRepresentation(), diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientIT.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientIT.java index abb521ec..a865bc30 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientIT.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientIT.java @@ -3,6 +3,7 @@ import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import org.junit.jupiter.api.AfterEach; @@ -28,6 +29,7 @@ class DirectBrokerClientIT { private static final int ASYNC_TIMEOUT_WAIT_MS = 2000; + private static final Long TEST_BACKEND_QUERY_ID = 1L; DirectBrokerClient client; WebClient webClient; @@ -48,14 +50,14 @@ void tearDown() throws IOException { @Test void testPublishQuery() throws QueryNotFoundException, IOException, InterruptedException, SiteNotFoundException { - var queryId = client.createQuery(); - client.addQueryDefinition(queryId, STRUCTURED_QUERY.getRepresentation(), "foo"); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); + client.addQueryDefinition(brokerQueryId, STRUCTURED_QUERY.getRepresentation(), "foo"); mockWebServer.enqueue(new MockResponse().setBody("123").setHeader(CONTENT_TYPE, "internal/json")); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); + client.publishQuery(brokerQueryId); var recordedRequest = mockWebServer.takeRequest(); assertEquals("application/json", recordedRequest.getHeader(CONTENT_TYPE)); @@ -63,38 +65,43 @@ void testPublishQuery() throws QueryNotFoundException, IOException, InterruptedE assertEquals("POST", recordedRequest.getMethod()); assertEquals("foo", recordedRequest.getBody().readUtf8()); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "1", COMPLETED); + var statusUpdate = new QueryStatusUpdate(client, brokerQueryId, "1", COMPLETED); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, statusUpdate); - assertEquals(1, client.getResultSiteIds(queryId).size()); - assertEquals("1", client.getResultSiteIds(queryId).get(0)); - assertEquals(123, client.getResultFeasibility(queryId, "1")); + assertEquals(1, client.getResultSiteIds(brokerQueryId).size()); + assertEquals("1", client.getResultSiteIds(brokerQueryId).get(0)); + assertEquals(123, client.getResultFeasibility(brokerQueryId, "1")); } @Test void testPublishQueryServerError() throws QueryNotFoundException, IOException { - var queryId = client.createQuery(); - client.addQueryDefinition(queryId, STRUCTURED_QUERY.getRepresentation(), "foo"); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); + client.addQueryDefinition(brokerQueryId, STRUCTURED_QUERY.getRepresentation(), "foo"); mockWebServer.enqueue(new MockResponse().setStatus(INTERNAL_SERVER_ERROR.toString())); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); + client.publishQuery(brokerQueryId); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS).only()).onClientUpdate(client, queryId, "1", FAILED); + var statusUpdate = new QueryStatusUpdate(client, brokerQueryId, "1", FAILED); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS).only()).onClientUpdate(TEST_BACKEND_QUERY_ID, + statusUpdate); } @Test void testPublishQueryUnexpectedResponseBody() throws QueryNotFoundException, IOException { - var queryId = client.createQuery(); - client.addQueryDefinition(queryId, STRUCTURED_QUERY.getRepresentation(), "foo"); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); + client.addQueryDefinition(brokerQueryId, STRUCTURED_QUERY.getRepresentation(), "foo"); mockWebServer.enqueue(new MockResponse().setBody("not-a-number").setHeader(CONTENT_TYPE, "internal/json")); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); + client.publishQuery(brokerQueryId); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS).only()).onClientUpdate(client, queryId, "1", FAILED); + var statusUpdate = new QueryStatusUpdate(client, brokerQueryId, "1", FAILED); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS).only()).onClientUpdate(TEST_BACKEND_QUERY_ID, + statusUpdate); } } diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientTest.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientTest.java index 98cf3018..17e99f40 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientTest.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientTest.java @@ -14,6 +14,8 @@ @ExtendWith(MockitoExtension.class) class DirectBrokerClientTest { + private static final Long TEST_BACKEND_QUERY_ID = 1L; + @SuppressWarnings("unused") @Mock WebClient webClient; @@ -28,7 +30,7 @@ void testPublishNonExistingQuery() { @Test void testPublishExistingQueryWithoutStructuredQueryDefinition() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertThrows(IllegalStateException.class, () -> client.publishQuery(queryId)); } @@ -46,7 +48,7 @@ void testCloseQueryWhichDoesNotExist() { @Test void testCloseQuery() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertDoesNotThrow(() -> client.closeQuery(queryId)); } @@ -57,7 +59,7 @@ void testGetResultFeasibilityForQueryWhichDoesNotExist() { @Test void testGetResultFeasibilityForUnknownSite() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertThrows(SiteNotFoundException.class, () -> client.getResultFeasibility(queryId, "unknown-site-id")); } @@ -68,7 +70,7 @@ void testGetResultSiteIdsForQueryWhichDoesNotExist() { @Test void testGetResultSiteIdsForUnpublishedQuery() throws QueryNotFoundException { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); var resultSiteIds = client.getResultSiteIds(queryId); assertTrue(resultSiteIds.isEmpty()); diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollectorIT.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollectorIT.java index f4b5d037..54b593ae 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollectorIT.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollectorIT.java @@ -55,7 +55,7 @@ public void setUp() { resultCollector = new DSFQueryResultCollector(resultStore, fhirCtx, fhirWebClientProvider, resultHandler); } - private Task createTestTask(String queryId, String siteId, String measureReportReference, String profile) { + private Task createTestTask(String brokerQueryId, String siteId, String measureReportReference, String profile) { Task task = new Task() .setStatus(COMPLETED) .setIntent(ORDER) @@ -81,7 +81,7 @@ private Task createTestTask(String queryId, String siteId, String measureReportR .addCoding(new Coding() .setSystem("http://highmed.org/fhir/CodeSystem/bpmn-message") .setCode("business-key"))) - .setValue(new StringType(queryId)); + .setValue(new StringType(brokerQueryId)); task.addInput() .setType(new CodeableConcept() .addCoding(new Coding() @@ -120,10 +120,10 @@ private MeasureReport createTestMeasureReport(int measureCount) { @Test public void testRegisteredListenerGetsNotifiedOnUpdate() throws IOException, FhirWebClientProvisionException { - String queryId = UUID.randomUUID().toString(); + String brokerQueryId = UUID.randomUUID().toString(); String siteId = "DIC"; String measureReportId = UUID.randomUUID().toString(); - Task task = createTestTask(queryId, siteId, "MeasureReport/" + measureReportId, SINGLE_DIC_RESULT_PROFILE); + Task task = createTestTask(brokerQueryId, siteId, "MeasureReport/" + measureReportId, SINGLE_DIC_RESULT_PROFILE); int measureCount = 5; MeasureReport measureReport = createTestMeasureReport(measureCount); @@ -133,19 +133,19 @@ public void testRegisteredListenerGetsNotifiedOnUpdate() throws IOException, Fhi when(fhirClient.read(MeasureReport.class, measureReportId)).thenReturn(measureReport); var actual = new Object() { - String queryId = null; + String brokerQueryId = null; String siteId = null; QueryStatus status = null; }; - resultCollector.addResultListener(brokerClient, (broker, qId, sId, status) -> { - actual.queryId = qId; - actual.siteId = sId; - actual.status = status; + resultCollector.addResultListener(brokerClient, (backendQueryId, statusUpdate) -> { + actual.brokerQueryId = statusUpdate.brokerQueryId(); + actual.siteId = statusUpdate.brokerSiteId(); + actual.status = statusUpdate.status(); }); websocketClient.fakeIncomingMessage(task); - assertEquals(queryId, actual.queryId); + assertEquals(brokerQueryId, actual.brokerQueryId); assertEquals(siteId, actual.siteId); assertEquals(QueryStatus.COMPLETED, actual.status); } @@ -162,9 +162,10 @@ public void testResultFeasibilityIsPresentAfterListenerGetsNotifiedOnUpdate() th when(fhirWebClientProvider.provideFhirWebserviceClient()).thenReturn(fhirClient); when(fhirClient.read(MeasureReport.class, measureReportId)).thenReturn(measureReport); - resultCollector.addResultListener(brokerClient, (broker, qId, cId, status) -> { + resultCollector.addResultListener(brokerClient, (backendQueryId, statusUpdate) -> { try { - int resultFeasibility = resultCollector.getResultFeasibility(qId, cId); + int resultFeasibility = resultCollector.getResultFeasibility(statusUpdate.brokerQueryId(), + statusUpdate.brokerSiteId()); assertEquals(measureCount, resultFeasibility); } catch (Exception e) { @@ -188,9 +189,9 @@ public void testSiteIdsArePresentAfterListenerGetsNotifiedOnUpdate() throws IOEx when(fhirWebClientProvider.provideFhirWebserviceClient()).thenReturn(fhirClient); when(fhirClient.read(MeasureReport.class, measureReportId)).thenReturn(measureReport); - resultCollector.addResultListener(brokerClient, (broker, qId, sId, status) -> { + resultCollector.addResultListener(brokerClient, (backendQueryId, statusUpdate) -> { try { - List siteIds = resultCollector.getResultSiteIds(qId); + List siteIds = resultCollector.getResultSiteIds(statusUpdate.brokerQueryId()); assertEquals(List.of(siteId), siteIds); } catch (Exception e) { @@ -207,7 +208,7 @@ public void testRegisteredListenersGetNotNotifiedOnIncomingTasksThatAreNoResults Task task = createTestTask(UUID.randomUUID().toString(), "DIC", "MeasureReport/" + measureReportId, "other-profile"); when(fhirWebClientProvider.provideFhirWebsocketClient()).thenReturn(websocketClient); - resultCollector.addResultListener(brokerClient, (broker, qId, cId, status) -> fail()); + resultCollector.addResultListener(brokerClient, (backendQueryId, statusUpdate) -> fail()); websocketClient.fakeIncomingMessage(task); } diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientIT.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientIT.java index d1685c82..b2a90a45 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientIT.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientIT.java @@ -3,6 +3,7 @@ import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -15,6 +16,7 @@ public class MockBrokerClientIT { private static final int ASYNC_TIMEOUT_WAIT_MS = 9000; + private static final Long TEST_BACKEND_QUERY_ID = 1L; MockBrokerClient client; @@ -26,36 +28,44 @@ void setUp() { @Test void testPublishQuery() throws QueryNotFoundException, SiteNotFoundException { - var queryId = client.createQuery(); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); - - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "2", COMPLETED); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "3", COMPLETED); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "4", COMPLETED); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "5", COMPLETED); - - assertEquals(4, client.getResultSiteIds(queryId).size()); - assertTrue(client.getResultFeasibility(queryId, "2") >= 10); - assertTrue(client.getResultFeasibility(queryId, "3") >= 10); - assertTrue(client.getResultFeasibility(queryId, "4") >= 10); - assertTrue(client.getResultFeasibility(queryId, "5") >= 10); + client.publishQuery(brokerQueryId); + + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "2", COMPLETED)); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "3", COMPLETED)); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "4", COMPLETED)); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "5", COMPLETED)); + + assertEquals(4, client.getResultSiteIds(brokerQueryId).size()); + assertTrue(client.getResultFeasibility(brokerQueryId, "2") >= 10); + assertTrue(client.getResultFeasibility(brokerQueryId, "3") >= 10); + assertTrue(client.getResultFeasibility(brokerQueryId, "4") >= 10); + assertTrue(client.getResultFeasibility(brokerQueryId, "5") >= 10); } @Test void testCloseQueryWhichIsRunning() throws QueryNotFoundException { - var queryId = client.createQuery(); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); - client.closeQuery(queryId); + client.publishQuery(brokerQueryId); + client.closeQuery(brokerQueryId); - verify(statusListener, never()).onClientUpdate(client, queryId, "1", COMPLETED); - verify(statusListener, never()).onClientUpdate(client, queryId, "2", COMPLETED); - verify(statusListener, never()).onClientUpdate(client, queryId, "3", COMPLETED); - verify(statusListener, never()).onClientUpdate(client, queryId, "4", COMPLETED); + verify(statusListener, never()).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "1", COMPLETED)); + verify(statusListener, never()).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "2", COMPLETED)); + verify(statusListener, never()).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "3", COMPLETED)); + verify(statusListener, never()).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "4", COMPLETED)); } } diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientTest.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientTest.java index c2489a3d..e18df3ba 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientTest.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientTest.java @@ -9,6 +9,8 @@ class MockBrokerClientTest { + private static final Long TEST_BACKEND_QUERY_ID = 1L; + MockBrokerClient client; @BeforeEach @@ -23,7 +25,7 @@ void testAddQueryDefinitionToNonExistingQuery() { @Test void testAddQueryDefinitionToExistingQuery() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertDoesNotThrow(() -> client.addQueryDefinition(queryId, "application/json", "")); } @@ -34,7 +36,7 @@ void testCloseQueryWhichDoesNotExist() { @Test void testCloseQueryWhichHasNotYetBeenPublished() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertDoesNotThrow(() -> client.closeQuery(queryId)); } @@ -45,7 +47,7 @@ void testGetResultFeasibilityForQueryWhichDoesNotExist() { @Test void testGetResultFeasibilityForUnknownSite() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertThrows(SiteNotFoundException.class, () -> client.getResultFeasibility(queryId, "unknown-site-id")); } @@ -56,7 +58,7 @@ void testGetResultSiteIdsForQueryWhichDoesNotExist() { @Test void testGetResultSiteIdsForUnpublishedQuery() throws QueryNotFoundException { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); var resultSiteIds = client.getResultSiteIds(queryId); assertTrue(resultSiteIds.isEmpty()); diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImplIT.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImplIT.java index c133a3d1..2babd1c7 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImplIT.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImplIT.java @@ -1,8 +1,8 @@ package de.numcodex.feasibility_gui_backend.query.collect; import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; +import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.persistence.*; -import de.numcodex.feasibility_gui_backend.query.persistence.QueryDispatch.QueryDispatchId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -14,8 +14,6 @@ import org.springframework.context.annotation.Import; import org.testcontainers.junit.jupiter.Testcontainers; -import java.sql.Timestamp; -import java.time.Instant; import java.util.List; import static de.numcodex.feasibility_gui_backend.query.collect.QueryStatus.COMPLETED; @@ -41,9 +39,6 @@ class QueryStatusListenerImplIT { @Autowired private QueryRepository queryRepository; - @Autowired - private QueryDispatchRepository queryDispatchRepository; - @Autowired private SiteRepository siteRepository; @@ -55,10 +50,11 @@ class QueryStatusListenerImplIT { private static final String TEST_SITE_NAME = "TestSite"; private static final int TEST_MATCHES_IN_POPULATION = 100; - private static final String EXTERNAL_QUERY_ID = "37ff21b1-3d5f-49b0-ac09-063c94eac7aa"; + private static final String BROKER_QUERY_ID = "37ff21b1-3d5f-49b0-ac09-063c94eac7aa"; private static final BrokerClientType FAKE_BROKER_CLIENT_TYPE = DIRECT; private String testSiteId; + private Long testBackendQueryId; @BeforeEach public void setUpDatabaseState() { @@ -68,22 +64,14 @@ public void setUpDatabaseState() { var testQuery = new Query(); testQuery.setQueryContent(fakeContent); - var internalTestQuery = queryRepository.save(testQuery); + testBackendQueryId = queryRepository.save(testQuery).getId(); var testSite = new Site(); testSite.setSiteName(TEST_SITE_NAME); testSiteId = String.valueOf(siteRepository.save(testSite).getId()); - var testQueryDispatchId = new QueryDispatchId(); - testQueryDispatchId.setQueryId(internalTestQuery.getId()); - testQueryDispatchId.setExternalId(EXTERNAL_QUERY_ID); - testQueryDispatchId.setBrokerType(FAKE_BROKER_CLIENT_TYPE); - - var testQueryDispatch = new QueryDispatch(); - testQueryDispatch.setId(testQueryDispatchId); - testQueryDispatch.setQuery(internalTestQuery); - testQueryDispatch.setDispatchedAt(Timestamp.from(Instant.now())); - queryDispatchRepository.save(testQueryDispatch); + // Dispatch Table setup has been left out for brevity (will be filled at runtime but is not important for any of + // the test cases). } @ParameterizedTest @@ -93,18 +81,18 @@ public void setUpDatabaseState() { public void testPersistResult_NonTerminatingStatusChangesDoesNotLeadToPersistedResult(QueryStatus status) { var fakeBrokerClient = new FakeBrokerClient(); - assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(fakeBrokerClient, EXTERNAL_QUERY_ID, testSiteId, - status)); + var statusUpdate = new QueryStatusUpdate(fakeBrokerClient, BROKER_QUERY_ID, testSiteId, status); + assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(testBackendQueryId, statusUpdate)); assertEquals(0, resultRepository.count()); } @Test - public void testPersistResult_UnknownExternalQueryIdDoesNotLeadToPersistedResult() { + public void testPersistResult_UnknownBrokerQueryIdDoesNotLeadToPersistedResult() { var fakeBrokerClient = new FakeBrokerClient(); - var unknownExternalQueryId = "some_unknown_id"; + var unknownBrokerQueryId = "some_unknown_id"; - assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(fakeBrokerClient, unknownExternalQueryId, testSiteId, - COMPLETED)); + var statusUpdate = new QueryStatusUpdate(fakeBrokerClient, unknownBrokerQueryId, testSiteId, COMPLETED); + assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(testBackendQueryId, statusUpdate)); assertEquals(0, resultRepository.count()); } @@ -112,8 +100,8 @@ public void testPersistResult_UnknownExternalQueryIdDoesNotLeadToPersistedResult public void testPersistResult_CompleteStatusLeadsToPersistedResultWithMatchesInPopulation() { var fakeBrokerClient = new FakeBrokerClient(); - assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(fakeBrokerClient, EXTERNAL_QUERY_ID, testSiteId, - COMPLETED)); + var statusUpdate = new QueryStatusUpdate(fakeBrokerClient, BROKER_QUERY_ID, testSiteId, COMPLETED); + assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(testBackendQueryId, statusUpdate)); var registeredResults = resultRepository.findAll(); assertEquals(1, registeredResults.size()); @@ -126,8 +114,8 @@ public void testPersistResult_CompleteStatusLeadsToPersistedResultWithMatchesInP public void testPersistResult_FailedStatusLeadsToPersistedResultWithoutMatchesInPopulation() { var fakeBrokerClient = new FakeBrokerClient(); - assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(fakeBrokerClient, EXTERNAL_QUERY_ID, testSiteId, - FAILED)); + var statusUpdate = new QueryStatusUpdate(fakeBrokerClient, BROKER_QUERY_ID, testSiteId, FAILED); + assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(testBackendQueryId, statusUpdate)); var registeredResults = resultRepository.findAll(); assertEquals(1, registeredResults.size()); @@ -149,33 +137,38 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) { } @Override - public String createQuery() { + public String createQuery(Long backendQueryId) { // NO-OP return null; } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) { + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) { // NO-OP } @Override - public void publishQuery(String queryId) { + public void publishQuery(String brokerQueryId) { // NO-OP } @Override - public void closeQuery(String queryId) { + public void closeQuery(String brokerQueryId) { // NO-OP } @Override - public int getResultFeasibility(String queryId, String siteId) { + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException { + if (!brokerQueryId.equals(BROKER_QUERY_ID)) { + throw new QueryNotFoundException("cannot find broker specific query for id '%s'" + .formatted(brokerQueryId)); + } return TEST_MATCHES_IN_POPULATION; + } @Override - public List getResultSiteIds(String queryId) { + public List getResultSiteIds(String brokerQueryId) { // NO-OP return null; }