diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/QueryHandlerService.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/QueryHandlerService.java index 371f0e60..fe267637 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/QueryHandlerService.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/QueryHandlerService.java @@ -18,7 +18,6 @@ import static de.numcodex.feasibility_gui_backend.query.persistence.ResultType.SUCCESS; @Service -@Transactional @RequiredArgsConstructor public class QueryHandlerService { @@ -37,6 +36,7 @@ public Long runQuery(StructuredQuery structuredQuery) throws QueryDispatchExcept return queryId; } + @Transactional public QueryResult getQueryResult(Long queryId) { var singleSiteResults = resultRepository.findByQueryAndStatus(queryId, SUCCESS); diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/BrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/BrokerClient.java index 9cd7fe10..e556a5ee 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/BrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/BrokerClient.java @@ -28,70 +28,72 @@ public interface BrokerClient { void addQueryStatusListener(QueryStatusListener queryStatusListener) throws IOException; /** - * Creates a new unpublished query. + * Creates a new unpublished broker specific query. * - * @return The ID of the created query. + * @param backendQueryId Identifier for a backend specific query that a broker specific query is created for. + * @return Identifier of the broker specific query. * @throws IOException IO/communication error */ - String createQuery() throws IOException; + String createQuery(Long backendQueryId) throws IOException; /** - * Adds the necessary query definition to a query. + * Adds the necessary query definition to a broker specific query. *

* The query definition is added to a query identified by the given query ID. * - * @param queryId Identifies the query that the query definition shall be added to. - * @param mediaType ? - * @param content The actual query in plain text. + * @param brokerQueryId Identifies the broker specific query that the query definition shall be added to. + * @param mediaType MIME type for the query content. + * @param content The actual query in plain text. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws UnsupportedMediaTypeException If the given media type is not supported. * @throws IOException IO/communication error */ - void addQueryDefinition(String queryId, String mediaType, String content) throws QueryNotFoundException, + void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException, UnsupportedMediaTypeException, IOException; /** - * Publishes a query for distributed execution. + * Publishes a broker specific query for distributed execution. * - * @param queryId Identifies the query that shall be published. + * @param brokerQueryId Identifies the broker specific query that shall be published. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws IOException If the query was not published due IO/communication error */ - void publishQuery(String queryId) throws QueryNotFoundException, IOException; + void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException; /** - * Marks an already published query as closed. + * Marks an already published broker specific query as closed. *

* After calling this function calls to {@link #getResultFeasibility(String, String)} and * {@link #getResultSiteIds(String)} will fail. * - * @param queryId Identifies the query that shall be closed. + * @param brokerQueryId Identifies the broker specific query that shall be closed. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws IOException IO/communication error */ - void closeQuery(String queryId) throws QueryNotFoundException, IOException; + void closeQuery(String brokerQueryId) throws QueryNotFoundException, IOException; /** - * Gets the feasibility (measure count) of a published query for a specific site. + * Gets the feasibility (measure count) of a published broker specific query for a specific site. * - * @param queryId Identifies the query. - * @param siteId Identifies the site within the query whose feasibility shall be gotten. + * @param brokerQueryId Identifies the broker specific query. + * @param siteId Identifies the site within the query whose feasibility shall be gotten. * @return The feasibility for a specific site within a query. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws SiteNotFoundException If the given site ID does not identify a known site within a query. * @throws IOException IO/communication error */ - int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException, IOException; + int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException, + IOException; /** - * Gets all site IDs associated with a published query that can already provide a result. + * Gets all site IDs associated with a published broker specific query that can already provide a result. * - * @param queryId Identifies the query whose associated site IDs with results shall be gotten. + * @param brokerQueryId Identifies the broker specific query whose associated site IDs with results shall be gotten. * @return All site IDs of a specific query that can already provide results. * @throws QueryNotFoundException If the given query ID does not identify a known query. * @throws IOException IO/communication error */ - List getResultSiteIds(String queryId) throws QueryNotFoundException, IOException; + List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException, IOException; /** * Gets the display name of a site. diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/AktinBrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/AktinBrokerClient.java index 8c24c7a7..141e6db9 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/AktinBrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/AktinBrokerClient.java @@ -6,14 +6,15 @@ import de.numcodex.feasibility_gui_backend.query.broker.UnsupportedMediaTypeException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; import de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType; -import lombok.AllArgsConstructor; import org.aktin.broker.client2.BrokerAdmin2; import org.aktin.broker.xml.Node; import org.aktin.broker.xml.RequestStatusInfo; import java.io.IOException; +import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; +import java.util.Objects; import static de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType.AKTIN; @@ -23,11 +24,16 @@ * @author R.W.Majeed * */ -@AllArgsConstructor public class AktinBrokerClient implements BrokerClient { - final private BrokerAdmin2 delegate; + private final BrokerAdmin2 delegate; + private final Map brokerToBackendQueryIdMapping; - @Override + public AktinBrokerClient(BrokerAdmin2 delegate) { + this.delegate = Objects.requireNonNull(delegate); + this.brokerToBackendQueryIdMapping = new HashMap<>(); + } + + @Override public BrokerClientType getBrokerType() { return AKTIN; } @@ -55,46 +61,49 @@ int unwrapSiteId(String siteId) { } @Override - public String createQuery() throws IOException { - return wrapQueryId(delegate.createRequest()); - } + public String createQuery(Long backendQueryId) throws IOException { + var brokerQueryId = wrapQueryId(delegate.createRequest()); + brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId); + + return brokerQueryId; + } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException, UnsupportedMediaTypeException, IOException { - delegate.putRequestDefinition(unwrapQueryId(queryId), mediaType, content); + delegate.putRequestDefinition(unwrapQueryId(brokerQueryId), mediaType, content); } @Override - public void publishQuery(String queryId) throws QueryNotFoundException, IOException { - delegate.publishRequest(unwrapQueryId(queryId)); + public void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException { + delegate.publishRequest(unwrapQueryId(brokerQueryId)); } @Override - public void closeQuery(String queryId) throws IOException { - delegate.closeRequest(unwrapQueryId(queryId)); + public void closeQuery(String brokerQueryId) throws IOException { + delegate.closeRequest(unwrapQueryId(brokerQueryId)); } @Override - public int getResultFeasibility(String queryId, String siteId) + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException, IOException { - String result = delegate.getResultString(unwrapQueryId(queryId), unwrapSiteId(siteId)); + String result = delegate.getResultString(unwrapQueryId(brokerQueryId), unwrapSiteId(siteId)); if( result == null ) { - throw new SiteNotFoundException(queryId, siteId); + throw new SiteNotFoundException(brokerQueryId, siteId); } return Integer.parseInt(result); } @Override - public List getResultSiteIds(String queryId) throws QueryNotFoundException, IOException { - List list = delegate.listRequestStatus(unwrapQueryId(queryId)); + public List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException, IOException { + List list = delegate.listRequestStatus(unwrapQueryId(brokerQueryId)); if( list == null ) { - throw new QueryNotFoundException(queryId); + throw new QueryNotFoundException(brokerQueryId); } return list.stream() .map( (info) -> wrapSiteId(info.node) ) - .collect(Collectors.toUnmodifiableList()); + .toList(); } @Override @@ -106,4 +115,7 @@ public String getSiteName(String siteId) throws SiteNotFoundException, IOExcepti return node.getCommonName(); } + Long getBackendQueryId(String brokerQueryId) { + return brokerToBackendQueryIdMapping.get(brokerQueryId); + } } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/WrappedNotificationListener.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/WrappedNotificationListener.java index ae10e1da..9c86c18d 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/WrappedNotificationListener.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/aktin/WrappedNotificationListener.java @@ -1,5 +1,6 @@ package de.numcodex.feasibility_gui_backend.query.broker.aktin; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatus; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; import lombok.AllArgsConstructor; @@ -22,7 +23,7 @@ @Log public class WrappedNotificationListener implements AdminNotificationListener{ final private AktinBrokerClient client; - final private QueryStatusListener delegate; + final private QueryStatusListener statusListener; @Override public void onRequestCreated(int requestId) { @@ -41,7 +42,7 @@ public void onRequestClosed(int requestId) { @Override public void onRequestStatusUpdate(int requestId, int nodeId, String status) { - QueryStatus dest; + QueryStatus queryStatus; RequestStatus rs; try{ rs = RequestStatus.valueOf(status); @@ -49,7 +50,7 @@ public void onRequestStatusUpdate(int requestId, int nodeId, String status) { return; // unsupported/unrecognized status } - dest = switch (rs) { + queryStatus = switch (rs) { case completed -> QueryStatus.COMPLETED; case processing -> QueryStatus.EXECUTING; case queued -> QueryStatus.QUEUED; @@ -57,7 +58,10 @@ public void onRequestStatusUpdate(int requestId, int nodeId, String status) { default -> QueryStatus.FAILED; }; log.log(Level.INFO,"Request status updated {0} {1} {2}", new Object[] {requestId,nodeId,status}); - delegate.onClientUpdate(client, client.wrapQueryId(requestId), client.wrapSiteId(nodeId), dest); + var statusUpdate = new QueryStatusUpdate(client, client.wrapQueryId(requestId), client.wrapSiteId(nodeId), + queryStatus); + var associatedBackendQueryId = client.getBackendQueryId(client.wrapQueryId(requestId)); + statusListener.onClientUpdate(associatedBackendQueryId, statusUpdate); } @Override diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClient.java index 015723c7..08c2d5e3 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClient.java @@ -2,6 +2,7 @@ import de.numcodex.feasibility_gui_backend.query.QueryMediaType; import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; @@ -35,7 +36,8 @@ public class DirectBrokerClient implements BrokerClient { private final WebClient webClient; private final List listeners; - private final Map queries; + private final Map brokerQueries; + private final Map brokerToBackendQueryIdMapping; /** * Creates a new {@link DirectBrokerClient} instance that uses the given web client to communicate with a Flare @@ -46,7 +48,8 @@ public class DirectBrokerClient implements BrokerClient { public DirectBrokerClient(WebClient webClient) { this.webClient = Objects.requireNonNull(webClient); listeners = new ArrayList<>(); - queries = new HashMap<>(); + brokerQueries = new HashMap<>(); + brokerToBackendQueryIdMapping = new HashMap<>(); } @Override @@ -60,24 +63,26 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) { } @Override - public String createQuery() { - var query = DirectQuery.create(); - queries.put(query.getQueryId(), query); + public String createQuery(Long backendQueryId) { + var brokerQuery = DirectQuery.create(); + var brokerQueryId = brokerQuery.getQueryId(); + brokerQueries.put(brokerQueryId, brokerQuery); + brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId); - return query.getQueryId(); + return brokerQueryId; } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) throws QueryNotFoundException { - findQuery(queryId).addQueryDefinition(mediaType, content); + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException { + findQuery(brokerQueryId).addQueryDefinition(mediaType, content); } @Override - public void publishQuery(String queryId) throws QueryNotFoundException, IOException { - var query = findQuery(queryId); + public void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException { + var query = findQuery(brokerQueryId); var structuredQueryContent = Optional.ofNullable(query.getQueryDefinition(STRUCTURED_QUERY)) .orElseThrow(() -> new IllegalStateException("Query with ID " - + queryId + + brokerQueryId + " does not contain a query definition for the mandatory type: " + STRUCTURED_QUERY )); @@ -96,32 +101,36 @@ public void publishQuery(String queryId) throws QueryNotFoundException, IOExcept .map(Integer::valueOf) .doOnError(error -> { log.error(error.getMessage(), error); - listeners.forEach(l -> l.onClientUpdate(this, queryId, SITE_1_ID, FAILED)); + var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, SITE_1_ID, FAILED); + var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId); + listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate)); }) .subscribe(val -> { query.registerSiteResults(SITE_1_ID, val); - listeners.forEach(l -> l.onClientUpdate(this, queryId, SITE_1_ID, COMPLETED)); + var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, SITE_1_ID, COMPLETED); + var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId); + listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate)); }); } catch (Exception e) { - throw new IOException("An error occurred while publishing the query with ID: " + queryId, e); + throw new IOException("An error occurred while publishing the query with ID: " + brokerQueryId, e); } } @Override - public void closeQuery(String queryId) throws QueryNotFoundException { - if (queries.remove(queryId) == null) { - throw new QueryNotFoundException(queryId); + public void closeQuery(String brokerQueryId) throws QueryNotFoundException { + if (brokerQueries.remove(brokerQueryId) == null) { + throw new QueryNotFoundException(brokerQueryId); } } @Override - public int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { - return findQuery(queryId).getSiteResult(siteId); + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { + return findQuery(brokerQueryId).getSiteResult(siteId); } @Override - public List getResultSiteIds(String queryId) throws QueryNotFoundException { - return findQuery(queryId).getSiteIdsWithResult(); + public List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException { + return findQuery(brokerQueryId).getSiteIdsWithResult(); } @Override @@ -137,7 +146,7 @@ public String getSiteName(String siteId) { * @throws QueryNotFoundException If the ID does not identify a known query. */ private DirectQuery findQuery(String queryId) throws QueryNotFoundException { - return Optional.ofNullable(queries.get(queryId)) + return Optional.ofNullable(brokerQueries.get(queryId)) .orElseThrow(() -> new QueryNotFoundException(queryId)); } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFBrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFBrokerClient.java index 1b8e535b..c13b82d2 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFBrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFBrokerClient.java @@ -9,7 +9,9 @@ import de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType.DSF; @@ -19,6 +21,7 @@ public final class DSFBrokerClient implements BrokerClient { private final QueryManager queryManager; private final QueryResultCollector queryResultCollector; + private final Map brokerToBackendQueryIdMapping; /** * Creates a new {@link DSFBrokerClient} instance. @@ -29,6 +32,7 @@ public final class DSFBrokerClient implements BrokerClient { public DSFBrokerClient(QueryManager queryManager, QueryResultCollector queryResultCollector) { this.queryManager = queryManager; this.queryResultCollector = queryResultCollector; + brokerToBackendQueryIdMapping = new HashMap<>(); } @Override @@ -42,35 +46,38 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) thro } @Override - public String createQuery() { - return queryManager.createQuery(); + public String createQuery(Long backendQueryId) { + var brokerQueryId = queryManager.createQuery(); + brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId); + + return brokerQueryId; } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) throws QueryNotFoundException, + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException, UnsupportedMediaTypeException { - queryManager.addQueryDefinition(queryId, mediaType, content); + queryManager.addQueryDefinition(brokerQueryId, mediaType, content); } @Override - public void publishQuery(String queryId) throws QueryNotFoundException, IOException { - queryManager.publishQuery(queryId); + public void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException { + queryManager.publishQuery(brokerQueryId); } @Override - public void closeQuery(String queryId) throws QueryNotFoundException { - queryManager.removeQuery(queryId); - queryResultCollector.removeResults(queryId); + public void closeQuery(String brokerQueryId) throws QueryNotFoundException { + queryManager.removeQuery(brokerQueryId); + queryResultCollector.removeResults(brokerQueryId); } @Override - public int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { - return queryResultCollector.getResultFeasibility(queryId, siteId); + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { + return queryResultCollector.getResultFeasibility(brokerQueryId, siteId); } @Override - public List getResultSiteIds(String queryId) throws QueryNotFoundException { - return queryResultCollector.getResultSiteIds(queryId); + public List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException { + return queryResultCollector.getResultSiteIds(brokerQueryId); } @Override @@ -81,4 +88,8 @@ public String getSiteName(String siteId) throws SiteNotFoundException { } return siteId; } + + Long getBackendQueryId(String brokerQueryId) { + return brokerToBackendQueryIdMapping.get(brokerQueryId); + } } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollector.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollector.java index f76f3098..d3b73f48 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollector.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollector.java @@ -2,6 +2,7 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.parser.IParser; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatus; @@ -74,8 +75,13 @@ private IParser setUpResourceParser() { private void notifyResultListeners(DSFQueryResult result) { for (Entry listener : listeners.entrySet()) { - listener.getValue().onClientUpdate(listener.getKey(), result.getQueryId(), result.getSiteId(), + var broker = listener.getKey(); + var statusListener = listener.getValue(); + var statusUpdate = new QueryStatusUpdate(broker, result.getQueryId(), result.getSiteId(), QueryStatus.COMPLETED); + var associatedBackendQueryId = broker.getBackendQueryId(result.getQueryId()); + + statusListener.onClientUpdate(associatedBackendQueryId, statusUpdate); } } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClient.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClient.java index 0223e88c..12618bff 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClient.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClient.java @@ -1,6 +1,7 @@ package de.numcodex.feasibility_gui_backend.query.broker.mock; import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; @@ -26,14 +27,16 @@ public class MockBrokerClient implements BrokerClient { private final List listeners; - private final Map queries; + private final Map brokerQueries; + private final Map brokerToBackendQueryIdMapping; private final Map siteNames; private final Map>> runningMocks; public MockBrokerClient() { listeners = new ArrayList<>(); - queries = new HashMap<>(); + brokerQueries = new HashMap<>(); + brokerToBackendQueryIdMapping = new HashMap<>(); siteNames = Map.of( "2", "Lübeck", "3", "Erlangen", @@ -54,53 +57,59 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) { } @Override - public String createQuery() { - var query = MockQuery.create(); - queries.put(query.getQueryId(), query); - runningMocks.put(query.getQueryId(), List.of()); - - return query.getQueryId(); + public String createQuery(Long backendQueryId) { + var brokerQuery = MockQuery.create(); + var brokerQueryId = brokerQuery.getQueryId(); + brokerQueries.put(brokerQueryId, brokerQuery); + brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId); + runningMocks.put(brokerQueryId, List.of()); + + return brokerQueryId; } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) { + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) { // No-Op since this is irrelevant. } @Override - public void publishQuery(String queryId) throws QueryNotFoundException { - var query = findQuery(queryId); + public void publishQuery(String brokerQueryId) throws QueryNotFoundException { + var query = findQuery(brokerQueryId); var mocks = siteNames.keySet() .stream().map(siteId -> CompletableFuture.runAsync(() -> { try { Thread.sleep(Math.round(2000 + 6000 * Math.random())); query.registerSiteResults(siteId, (int) Math.round(10 + 500 * Math.random())); - listeners.forEach(l -> l.onClientUpdate(this, queryId, siteId, COMPLETED)); + var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, siteId, COMPLETED); + var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId); + listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate)); } catch (InterruptedException e) { log.error(e.getMessage(), e); - listeners.forEach(l -> l.onClientUpdate(this, queryId, siteId, FAILED)); + var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, siteId, FAILED); + var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId); + listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate)); } })) .collect(Collectors.toList()); - runningMocks.put(queryId, mocks); + runningMocks.put(brokerQueryId, mocks); } @Override - public void closeQuery(String queryId) throws QueryNotFoundException { - Optional.ofNullable(runningMocks.get(queryId)) - .orElseThrow(() -> new QueryNotFoundException(queryId)) + public void closeQuery(String brokerQueryId) throws QueryNotFoundException { + Optional.ofNullable(runningMocks.get(brokerQueryId)) + .orElseThrow(() -> new QueryNotFoundException(brokerQueryId)) .forEach(rm -> rm.complete(null)); } @Override - public int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { - return findQuery(queryId).getSiteResult(siteId); + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException { + return findQuery(brokerQueryId).getSiteResult(siteId); } @Override - public List getResultSiteIds(String queryId) throws QueryNotFoundException { - return findQuery(queryId).getSiteIdsWithResult(); + public List getResultSiteIds(String brokerQueryId) throws QueryNotFoundException { + return findQuery(brokerQueryId).getSiteIdsWithResult(); } @Override @@ -116,7 +125,7 @@ public String getSiteName(String siteId) { * @throws QueryNotFoundException If the ID does not identify a known query. */ private MockQuery findQuery(String queryId) throws QueryNotFoundException { - return Optional.ofNullable(queries.get(queryId)) + return Optional.ofNullable(brokerQueries.get(queryId)) .orElseThrow(() -> new QueryNotFoundException(queryId)); } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryCollectSpringConfig.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryCollectSpringConfig.java index f4c225dc..a8c48993 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryCollectSpringConfig.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryCollectSpringConfig.java @@ -1,6 +1,6 @@ package de.numcodex.feasibility_gui_backend.query.collect; -import de.numcodex.feasibility_gui_backend.query.persistence.QueryDispatchRepository; +import de.numcodex.feasibility_gui_backend.query.persistence.QueryRepository; import de.numcodex.feasibility_gui_backend.query.persistence.ResultRepository; import de.numcodex.feasibility_gui_backend.query.persistence.SiteRepository; import org.springframework.context.annotation.Bean; @@ -10,9 +10,9 @@ public class QueryCollectSpringConfig { @Bean - public QueryStatusListener createQueryStatusListener(QueryDispatchRepository queryDispatchRepository, + public QueryStatusListener createQueryStatusListener(QueryRepository queryRepository, SiteRepository siteRepository, ResultRepository resultRepository) { - return new QueryStatusListenerImpl(queryDispatchRepository, siteRepository, resultRepository); + return new QueryStatusListenerImpl(queryRepository, siteRepository, resultRepository); } } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListener.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListener.java index b40f0095..8f327c53 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListener.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListener.java @@ -1,19 +1,16 @@ package de.numcodex.feasibility_gui_backend.query.collect; -import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; - /** - * Represents an entity capable of receiving results from a feasibility query running in a distributed fashion. + * Represents an entity capable of receiving results from brokers for broker specific queries that they run. */ public interface QueryStatusListener { /** - * Callback method to process feasibility query result updates. + * Processes update notifications from brokers to one of their broker specific queries that is associated with an + * internal (backend specific) query. * - * @param client The broker client that this update belongs to. - * @param queryId Identifies the query within the broker for which there is an update. - * @param siteId Identifies the site within the broker for which there is an update. Related to a specific query. - * @param status New state of the query. + * @param backendQueryId Identifier for a backend specific query that the status update is associated with. + * @param queryStatusUpdate Describes the update for a broker specific query. */ - void onClientUpdate(BrokerClient client, String queryId, String siteId, QueryStatus status); + void onClientUpdate(Long backendQueryId, QueryStatusUpdate queryStatusUpdate); } diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImpl.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImpl.java index 870568fe..d880762b 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImpl.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImpl.java @@ -29,7 +29,7 @@ public class QueryStatusListenerImpl implements QueryStatusListener { @NonNull - private final QueryDispatchRepository queryDispatchRepository; + private final QueryRepository queryRepository; @NonNull private final SiteRepository siteRepository; @@ -38,17 +38,18 @@ public class QueryStatusListenerImpl implements QueryStatusListener { private final ResultRepository resultRepository; @Override - public void onClientUpdate(BrokerClient client, String externalQueryId, String externalSiteId, QueryStatus status) { - logQueryStatusChange(externalQueryId, externalSiteId, client.getBrokerType(), status); + public void onClientUpdate(Long backendQueryId, QueryStatusUpdate statusUpdate) { + logQueryStatusChange(statusUpdate.brokerQueryId(), statusUpdate.brokerSiteId(), + statusUpdate.source().getBrokerType(), statusUpdate.status()); try { - Optional matchesInPopulation = (status == COMPLETED) - ? Optional.of(resolveMatchesInPopulation(externalQueryId, externalSiteId, client)) + Optional matchesInPopulation = (statusUpdate.status() == COMPLETED) + ? Optional.of(resolveMatchesInPopulation(statusUpdate.brokerQueryId(), statusUpdate.brokerSiteId(), statusUpdate.source())) : Optional.empty(); - if (status == COMPLETED || status == FAILED) { - var internalQuery = lookupCorrespondingInternalQuery(externalQueryId, client.getBrokerType()); - var siteName = resolveSiteName(externalSiteId, client); + if (statusUpdate.status() == COMPLETED || statusUpdate.status() == FAILED) { + var internalQuery = lookupAssociatedBackendQuery(backendQueryId); + var siteName = resolveSiteName(statusUpdate.brokerSiteId(), statusUpdate.source()); var site = lookupSite(siteName) .orElseGet(() -> createSite(siteName)); @@ -56,7 +57,7 @@ public void onClientUpdate(BrokerClient client, String externalQueryId, String e } } catch (QueryResultCollectException e) { log.error("cannot persist result of query '%s' for site '%s' with status '%s'".formatted( - externalQueryId, externalSiteId, status.toString()), e); + statusUpdate.brokerQueryId(), statusUpdate.brokerSiteId(), statusUpdate.status().toString()), e); } } @@ -80,12 +81,10 @@ private int resolveMatchesInPopulation(String externalQueryId, String externalSi } } - private Query lookupCorrespondingInternalQuery(String externalQueryId, BrokerClientType brokerClientType) - throws QueryResultCollectException { - return queryDispatchRepository.findByExternalQueryIdAndBrokerType(externalQueryId, brokerClientType) - .flatMap(queryDispatch -> Optional.of(queryDispatch.getQuery())) - .orElseThrow(() -> new QueryResultCollectException("cannot resolve internal query for external query '%s'" - .formatted(externalQueryId))); + private Query lookupAssociatedBackendQuery(Long backendQueryId) throws QueryResultCollectException { + return queryRepository.findById(backendQueryId) + .orElseThrow(() -> new QueryResultCollectException("cannot find backend query with id '%s'" + .formatted(backendQueryId))); } private String resolveSiteName(String externalSiteId, BrokerClient client) throws QueryResultCollectException { diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusUpdate.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusUpdate.java new file mode 100644 index 00000000..e9b07b9b --- /dev/null +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusUpdate.java @@ -0,0 +1,13 @@ +package de.numcodex.feasibility_gui_backend.query.collect; + +import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; + +/** + * Defines a status update on a broker specific query. + *

+ * Comprises information about the broker that the update originates from, the associated broker specific query ID, the + * associated broker specific site ID as well as the query status itself. + */ +public record QueryStatusUpdate(BrokerClient source, String brokerQueryId, String brokerSiteId, + QueryStatus status) { +} diff --git a/src/main/java/de/numcodex/feasibility_gui_backend/query/dispatch/QueryDispatcher.java b/src/main/java/de/numcodex/feasibility_gui_backend/query/dispatch/QueryDispatcher.java index 5594c124..b12940bd 100644 --- a/src/main/java/de/numcodex/feasibility_gui_backend/query/dispatch/QueryDispatcher.java +++ b/src/main/java/de/numcodex/feasibility_gui_backend/query/dispatch/QueryDispatcher.java @@ -80,7 +80,7 @@ public Long enqueueNewQuery(StructuredQuery query) throws QueryDispatchException /** * Dispatches (publishes) an already enqueued query in a broadcast fashion using configured {@link BrokerClient}s. * - * @param queryId Identifies the query that shall be dispatched. + * @param queryId Identifies the backend query that shall be dispatched. * @throws QueryDispatchException If an error occurs while dispatching the query. */ // TODO: Pass in audit information! (actor) @@ -92,7 +92,7 @@ public void dispatchEnqueuedQuery(Long queryId) throws QueryDispatchException { // TODO: error handling + asynchronous dispatch! try { for (BrokerClient broker : queryBrokerClients) { - var brokerQueryId = broker.createQuery(); + var brokerQueryId = broker.createQuery(queryId); for (Entry queryBodyFormats : translatedQueryBodyFormats.entrySet()) { broker.addQueryDefinition(brokerQueryId, queryBodyFormats.getKey().getRepresentation(), diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientIT.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientIT.java index abb521ec..a865bc30 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientIT.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientIT.java @@ -3,6 +3,7 @@ import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import org.junit.jupiter.api.AfterEach; @@ -28,6 +29,7 @@ class DirectBrokerClientIT { private static final int ASYNC_TIMEOUT_WAIT_MS = 2000; + private static final Long TEST_BACKEND_QUERY_ID = 1L; DirectBrokerClient client; WebClient webClient; @@ -48,14 +50,14 @@ void tearDown() throws IOException { @Test void testPublishQuery() throws QueryNotFoundException, IOException, InterruptedException, SiteNotFoundException { - var queryId = client.createQuery(); - client.addQueryDefinition(queryId, STRUCTURED_QUERY.getRepresentation(), "foo"); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); + client.addQueryDefinition(brokerQueryId, STRUCTURED_QUERY.getRepresentation(), "foo"); mockWebServer.enqueue(new MockResponse().setBody("123").setHeader(CONTENT_TYPE, "internal/json")); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); + client.publishQuery(brokerQueryId); var recordedRequest = mockWebServer.takeRequest(); assertEquals("application/json", recordedRequest.getHeader(CONTENT_TYPE)); @@ -63,38 +65,43 @@ void testPublishQuery() throws QueryNotFoundException, IOException, InterruptedE assertEquals("POST", recordedRequest.getMethod()); assertEquals("foo", recordedRequest.getBody().readUtf8()); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "1", COMPLETED); + var statusUpdate = new QueryStatusUpdate(client, brokerQueryId, "1", COMPLETED); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, statusUpdate); - assertEquals(1, client.getResultSiteIds(queryId).size()); - assertEquals("1", client.getResultSiteIds(queryId).get(0)); - assertEquals(123, client.getResultFeasibility(queryId, "1")); + assertEquals(1, client.getResultSiteIds(brokerQueryId).size()); + assertEquals("1", client.getResultSiteIds(brokerQueryId).get(0)); + assertEquals(123, client.getResultFeasibility(brokerQueryId, "1")); } @Test void testPublishQueryServerError() throws QueryNotFoundException, IOException { - var queryId = client.createQuery(); - client.addQueryDefinition(queryId, STRUCTURED_QUERY.getRepresentation(), "foo"); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); + client.addQueryDefinition(brokerQueryId, STRUCTURED_QUERY.getRepresentation(), "foo"); mockWebServer.enqueue(new MockResponse().setStatus(INTERNAL_SERVER_ERROR.toString())); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); + client.publishQuery(brokerQueryId); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS).only()).onClientUpdate(client, queryId, "1", FAILED); + var statusUpdate = new QueryStatusUpdate(client, brokerQueryId, "1", FAILED); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS).only()).onClientUpdate(TEST_BACKEND_QUERY_ID, + statusUpdate); } @Test void testPublishQueryUnexpectedResponseBody() throws QueryNotFoundException, IOException { - var queryId = client.createQuery(); - client.addQueryDefinition(queryId, STRUCTURED_QUERY.getRepresentation(), "foo"); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); + client.addQueryDefinition(brokerQueryId, STRUCTURED_QUERY.getRepresentation(), "foo"); mockWebServer.enqueue(new MockResponse().setBody("not-a-number").setHeader(CONTENT_TYPE, "internal/json")); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); + client.publishQuery(brokerQueryId); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS).only()).onClientUpdate(client, queryId, "1", FAILED); + var statusUpdate = new QueryStatusUpdate(client, brokerQueryId, "1", FAILED); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS).only()).onClientUpdate(TEST_BACKEND_QUERY_ID, + statusUpdate); } } diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientTest.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientTest.java index 98cf3018..17e99f40 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientTest.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/direct/DirectBrokerClientTest.java @@ -14,6 +14,8 @@ @ExtendWith(MockitoExtension.class) class DirectBrokerClientTest { + private static final Long TEST_BACKEND_QUERY_ID = 1L; + @SuppressWarnings("unused") @Mock WebClient webClient; @@ -28,7 +30,7 @@ void testPublishNonExistingQuery() { @Test void testPublishExistingQueryWithoutStructuredQueryDefinition() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertThrows(IllegalStateException.class, () -> client.publishQuery(queryId)); } @@ -46,7 +48,7 @@ void testCloseQueryWhichDoesNotExist() { @Test void testCloseQuery() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertDoesNotThrow(() -> client.closeQuery(queryId)); } @@ -57,7 +59,7 @@ void testGetResultFeasibilityForQueryWhichDoesNotExist() { @Test void testGetResultFeasibilityForUnknownSite() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertThrows(SiteNotFoundException.class, () -> client.getResultFeasibility(queryId, "unknown-site-id")); } @@ -68,7 +70,7 @@ void testGetResultSiteIdsForQueryWhichDoesNotExist() { @Test void testGetResultSiteIdsForUnpublishedQuery() throws QueryNotFoundException { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); var resultSiteIds = client.getResultSiteIds(queryId); assertTrue(resultSiteIds.isEmpty()); diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollectorIT.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollectorIT.java index f4b5d037..54b593ae 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollectorIT.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/dsf/DSFQueryResultCollectorIT.java @@ -55,7 +55,7 @@ public void setUp() { resultCollector = new DSFQueryResultCollector(resultStore, fhirCtx, fhirWebClientProvider, resultHandler); } - private Task createTestTask(String queryId, String siteId, String measureReportReference, String profile) { + private Task createTestTask(String brokerQueryId, String siteId, String measureReportReference, String profile) { Task task = new Task() .setStatus(COMPLETED) .setIntent(ORDER) @@ -81,7 +81,7 @@ private Task createTestTask(String queryId, String siteId, String measureReportR .addCoding(new Coding() .setSystem("http://highmed.org/fhir/CodeSystem/bpmn-message") .setCode("business-key"))) - .setValue(new StringType(queryId)); + .setValue(new StringType(brokerQueryId)); task.addInput() .setType(new CodeableConcept() .addCoding(new Coding() @@ -120,10 +120,10 @@ private MeasureReport createTestMeasureReport(int measureCount) { @Test public void testRegisteredListenerGetsNotifiedOnUpdate() throws IOException, FhirWebClientProvisionException { - String queryId = UUID.randomUUID().toString(); + String brokerQueryId = UUID.randomUUID().toString(); String siteId = "DIC"; String measureReportId = UUID.randomUUID().toString(); - Task task = createTestTask(queryId, siteId, "MeasureReport/" + measureReportId, SINGLE_DIC_RESULT_PROFILE); + Task task = createTestTask(brokerQueryId, siteId, "MeasureReport/" + measureReportId, SINGLE_DIC_RESULT_PROFILE); int measureCount = 5; MeasureReport measureReport = createTestMeasureReport(measureCount); @@ -133,19 +133,19 @@ public void testRegisteredListenerGetsNotifiedOnUpdate() throws IOException, Fhi when(fhirClient.read(MeasureReport.class, measureReportId)).thenReturn(measureReport); var actual = new Object() { - String queryId = null; + String brokerQueryId = null; String siteId = null; QueryStatus status = null; }; - resultCollector.addResultListener(brokerClient, (broker, qId, sId, status) -> { - actual.queryId = qId; - actual.siteId = sId; - actual.status = status; + resultCollector.addResultListener(brokerClient, (backendQueryId, statusUpdate) -> { + actual.brokerQueryId = statusUpdate.brokerQueryId(); + actual.siteId = statusUpdate.brokerSiteId(); + actual.status = statusUpdate.status(); }); websocketClient.fakeIncomingMessage(task); - assertEquals(queryId, actual.queryId); + assertEquals(brokerQueryId, actual.brokerQueryId); assertEquals(siteId, actual.siteId); assertEquals(QueryStatus.COMPLETED, actual.status); } @@ -162,9 +162,10 @@ public void testResultFeasibilityIsPresentAfterListenerGetsNotifiedOnUpdate() th when(fhirWebClientProvider.provideFhirWebserviceClient()).thenReturn(fhirClient); when(fhirClient.read(MeasureReport.class, measureReportId)).thenReturn(measureReport); - resultCollector.addResultListener(brokerClient, (broker, qId, cId, status) -> { + resultCollector.addResultListener(brokerClient, (backendQueryId, statusUpdate) -> { try { - int resultFeasibility = resultCollector.getResultFeasibility(qId, cId); + int resultFeasibility = resultCollector.getResultFeasibility(statusUpdate.brokerQueryId(), + statusUpdate.brokerSiteId()); assertEquals(measureCount, resultFeasibility); } catch (Exception e) { @@ -188,9 +189,9 @@ public void testSiteIdsArePresentAfterListenerGetsNotifiedOnUpdate() throws IOEx when(fhirWebClientProvider.provideFhirWebserviceClient()).thenReturn(fhirClient); when(fhirClient.read(MeasureReport.class, measureReportId)).thenReturn(measureReport); - resultCollector.addResultListener(brokerClient, (broker, qId, sId, status) -> { + resultCollector.addResultListener(brokerClient, (backendQueryId, statusUpdate) -> { try { - List siteIds = resultCollector.getResultSiteIds(qId); + List siteIds = resultCollector.getResultSiteIds(statusUpdate.brokerQueryId()); assertEquals(List.of(siteId), siteIds); } catch (Exception e) { @@ -207,7 +208,7 @@ public void testRegisteredListenersGetNotNotifiedOnIncomingTasksThatAreNoResults Task task = createTestTask(UUID.randomUUID().toString(), "DIC", "MeasureReport/" + measureReportId, "other-profile"); when(fhirWebClientProvider.provideFhirWebsocketClient()).thenReturn(websocketClient); - resultCollector.addResultListener(brokerClient, (broker, qId, cId, status) -> fail()); + resultCollector.addResultListener(brokerClient, (backendQueryId, statusUpdate) -> fail()); websocketClient.fakeIncomingMessage(task); } diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientIT.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientIT.java index d1685c82..b2a90a45 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientIT.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientIT.java @@ -3,6 +3,7 @@ import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException; import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener; +import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -15,6 +16,7 @@ public class MockBrokerClientIT { private static final int ASYNC_TIMEOUT_WAIT_MS = 9000; + private static final Long TEST_BACKEND_QUERY_ID = 1L; MockBrokerClient client; @@ -26,36 +28,44 @@ void setUp() { @Test void testPublishQuery() throws QueryNotFoundException, SiteNotFoundException { - var queryId = client.createQuery(); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); - - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "2", COMPLETED); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "3", COMPLETED); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "4", COMPLETED); - verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(client, queryId, "5", COMPLETED); - - assertEquals(4, client.getResultSiteIds(queryId).size()); - assertTrue(client.getResultFeasibility(queryId, "2") >= 10); - assertTrue(client.getResultFeasibility(queryId, "3") >= 10); - assertTrue(client.getResultFeasibility(queryId, "4") >= 10); - assertTrue(client.getResultFeasibility(queryId, "5") >= 10); + client.publishQuery(brokerQueryId); + + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "2", COMPLETED)); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "3", COMPLETED)); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "4", COMPLETED)); + verify(statusListener, timeout(ASYNC_TIMEOUT_WAIT_MS)).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "5", COMPLETED)); + + assertEquals(4, client.getResultSiteIds(brokerQueryId).size()); + assertTrue(client.getResultFeasibility(brokerQueryId, "2") >= 10); + assertTrue(client.getResultFeasibility(brokerQueryId, "3") >= 10); + assertTrue(client.getResultFeasibility(brokerQueryId, "4") >= 10); + assertTrue(client.getResultFeasibility(brokerQueryId, "5") >= 10); } @Test void testCloseQueryWhichIsRunning() throws QueryNotFoundException { - var queryId = client.createQuery(); + var brokerQueryId = client.createQuery(TEST_BACKEND_QUERY_ID); var statusListener = mock(QueryStatusListener.class); client.addQueryStatusListener(statusListener); - client.publishQuery(queryId); - client.closeQuery(queryId); + client.publishQuery(brokerQueryId); + client.closeQuery(brokerQueryId); - verify(statusListener, never()).onClientUpdate(client, queryId, "1", COMPLETED); - verify(statusListener, never()).onClientUpdate(client, queryId, "2", COMPLETED); - verify(statusListener, never()).onClientUpdate(client, queryId, "3", COMPLETED); - verify(statusListener, never()).onClientUpdate(client, queryId, "4", COMPLETED); + verify(statusListener, never()).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "1", COMPLETED)); + verify(statusListener, never()).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "2", COMPLETED)); + verify(statusListener, never()).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "3", COMPLETED)); + verify(statusListener, never()).onClientUpdate(TEST_BACKEND_QUERY_ID, + new QueryStatusUpdate(client, brokerQueryId, "4", COMPLETED)); } } diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientTest.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientTest.java index c2489a3d..e18df3ba 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientTest.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/broker/mock/MockBrokerClientTest.java @@ -9,6 +9,8 @@ class MockBrokerClientTest { + private static final Long TEST_BACKEND_QUERY_ID = 1L; + MockBrokerClient client; @BeforeEach @@ -23,7 +25,7 @@ void testAddQueryDefinitionToNonExistingQuery() { @Test void testAddQueryDefinitionToExistingQuery() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertDoesNotThrow(() -> client.addQueryDefinition(queryId, "application/json", "")); } @@ -34,7 +36,7 @@ void testCloseQueryWhichDoesNotExist() { @Test void testCloseQueryWhichHasNotYetBeenPublished() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertDoesNotThrow(() -> client.closeQuery(queryId)); } @@ -45,7 +47,7 @@ void testGetResultFeasibilityForQueryWhichDoesNotExist() { @Test void testGetResultFeasibilityForUnknownSite() { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); assertThrows(SiteNotFoundException.class, () -> client.getResultFeasibility(queryId, "unknown-site-id")); } @@ -56,7 +58,7 @@ void testGetResultSiteIdsForQueryWhichDoesNotExist() { @Test void testGetResultSiteIdsForUnpublishedQuery() throws QueryNotFoundException { - var queryId = client.createQuery(); + var queryId = client.createQuery(TEST_BACKEND_QUERY_ID); var resultSiteIds = client.getResultSiteIds(queryId); assertTrue(resultSiteIds.isEmpty()); diff --git a/src/test/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImplIT.java b/src/test/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImplIT.java index c133a3d1..2babd1c7 100644 --- a/src/test/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImplIT.java +++ b/src/test/java/de/numcodex/feasibility_gui_backend/query/collect/QueryStatusListenerImplIT.java @@ -1,8 +1,8 @@ package de.numcodex.feasibility_gui_backend.query.collect; import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient; +import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException; import de.numcodex.feasibility_gui_backend.query.persistence.*; -import de.numcodex.feasibility_gui_backend.query.persistence.QueryDispatch.QueryDispatchId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -14,8 +14,6 @@ import org.springframework.context.annotation.Import; import org.testcontainers.junit.jupiter.Testcontainers; -import java.sql.Timestamp; -import java.time.Instant; import java.util.List; import static de.numcodex.feasibility_gui_backend.query.collect.QueryStatus.COMPLETED; @@ -41,9 +39,6 @@ class QueryStatusListenerImplIT { @Autowired private QueryRepository queryRepository; - @Autowired - private QueryDispatchRepository queryDispatchRepository; - @Autowired private SiteRepository siteRepository; @@ -55,10 +50,11 @@ class QueryStatusListenerImplIT { private static final String TEST_SITE_NAME = "TestSite"; private static final int TEST_MATCHES_IN_POPULATION = 100; - private static final String EXTERNAL_QUERY_ID = "37ff21b1-3d5f-49b0-ac09-063c94eac7aa"; + private static final String BROKER_QUERY_ID = "37ff21b1-3d5f-49b0-ac09-063c94eac7aa"; private static final BrokerClientType FAKE_BROKER_CLIENT_TYPE = DIRECT; private String testSiteId; + private Long testBackendQueryId; @BeforeEach public void setUpDatabaseState() { @@ -68,22 +64,14 @@ public void setUpDatabaseState() { var testQuery = new Query(); testQuery.setQueryContent(fakeContent); - var internalTestQuery = queryRepository.save(testQuery); + testBackendQueryId = queryRepository.save(testQuery).getId(); var testSite = new Site(); testSite.setSiteName(TEST_SITE_NAME); testSiteId = String.valueOf(siteRepository.save(testSite).getId()); - var testQueryDispatchId = new QueryDispatchId(); - testQueryDispatchId.setQueryId(internalTestQuery.getId()); - testQueryDispatchId.setExternalId(EXTERNAL_QUERY_ID); - testQueryDispatchId.setBrokerType(FAKE_BROKER_CLIENT_TYPE); - - var testQueryDispatch = new QueryDispatch(); - testQueryDispatch.setId(testQueryDispatchId); - testQueryDispatch.setQuery(internalTestQuery); - testQueryDispatch.setDispatchedAt(Timestamp.from(Instant.now())); - queryDispatchRepository.save(testQueryDispatch); + // Dispatch Table setup has been left out for brevity (will be filled at runtime but is not important for any of + // the test cases). } @ParameterizedTest @@ -93,18 +81,18 @@ public void setUpDatabaseState() { public void testPersistResult_NonTerminatingStatusChangesDoesNotLeadToPersistedResult(QueryStatus status) { var fakeBrokerClient = new FakeBrokerClient(); - assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(fakeBrokerClient, EXTERNAL_QUERY_ID, testSiteId, - status)); + var statusUpdate = new QueryStatusUpdate(fakeBrokerClient, BROKER_QUERY_ID, testSiteId, status); + assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(testBackendQueryId, statusUpdate)); assertEquals(0, resultRepository.count()); } @Test - public void testPersistResult_UnknownExternalQueryIdDoesNotLeadToPersistedResult() { + public void testPersistResult_UnknownBrokerQueryIdDoesNotLeadToPersistedResult() { var fakeBrokerClient = new FakeBrokerClient(); - var unknownExternalQueryId = "some_unknown_id"; + var unknownBrokerQueryId = "some_unknown_id"; - assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(fakeBrokerClient, unknownExternalQueryId, testSiteId, - COMPLETED)); + var statusUpdate = new QueryStatusUpdate(fakeBrokerClient, unknownBrokerQueryId, testSiteId, COMPLETED); + assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(testBackendQueryId, statusUpdate)); assertEquals(0, resultRepository.count()); } @@ -112,8 +100,8 @@ public void testPersistResult_UnknownExternalQueryIdDoesNotLeadToPersistedResult public void testPersistResult_CompleteStatusLeadsToPersistedResultWithMatchesInPopulation() { var fakeBrokerClient = new FakeBrokerClient(); - assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(fakeBrokerClient, EXTERNAL_QUERY_ID, testSiteId, - COMPLETED)); + var statusUpdate = new QueryStatusUpdate(fakeBrokerClient, BROKER_QUERY_ID, testSiteId, COMPLETED); + assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(testBackendQueryId, statusUpdate)); var registeredResults = resultRepository.findAll(); assertEquals(1, registeredResults.size()); @@ -126,8 +114,8 @@ public void testPersistResult_CompleteStatusLeadsToPersistedResultWithMatchesInP public void testPersistResult_FailedStatusLeadsToPersistedResultWithoutMatchesInPopulation() { var fakeBrokerClient = new FakeBrokerClient(); - assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(fakeBrokerClient, EXTERNAL_QUERY_ID, testSiteId, - FAILED)); + var statusUpdate = new QueryStatusUpdate(fakeBrokerClient, BROKER_QUERY_ID, testSiteId, FAILED); + assertDoesNotThrow(() -> queryStatusListener.onClientUpdate(testBackendQueryId, statusUpdate)); var registeredResults = resultRepository.findAll(); assertEquals(1, registeredResults.size()); @@ -149,33 +137,38 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) { } @Override - public String createQuery() { + public String createQuery(Long backendQueryId) { // NO-OP return null; } @Override - public void addQueryDefinition(String queryId, String mediaType, String content) { + public void addQueryDefinition(String brokerQueryId, String mediaType, String content) { // NO-OP } @Override - public void publishQuery(String queryId) { + public void publishQuery(String brokerQueryId) { // NO-OP } @Override - public void closeQuery(String queryId) { + public void closeQuery(String brokerQueryId) { // NO-OP } @Override - public int getResultFeasibility(String queryId, String siteId) { + public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException { + if (!brokerQueryId.equals(BROKER_QUERY_ID)) { + throw new QueryNotFoundException("cannot find broker specific query for id '%s'" + .formatted(brokerQueryId)); + } return TEST_MATCHES_IN_POPULATION; + } @Override - public List getResultSiteIds(String queryId) { + public List getResultSiteIds(String brokerQueryId) { // NO-OP return null; }