Skip to content
This repository has been archived by the owner on Apr 4, 2022. It is now read-only.

Commit

Permalink
Merge pull request #88 from num-codex/bug/87-race-condition-publishin…
Browse files Browse the repository at this point in the history
…g-broker-via-multiple-brokers

Fix Race Condition Using Multiple Brokers
  • Loading branch information
DiCanio authored Feb 2, 2022
2 parents c002ee8 + 472603f commit 1f16acf
Show file tree
Hide file tree
Showing 19 changed files with 303 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static de.numcodex.feasibility_gui_backend.query.persistence.ResultType.SUCCESS;

@Service
@Transactional
@RequiredArgsConstructor
public class QueryHandlerService {

Expand All @@ -37,6 +36,7 @@ public Long runQuery(StructuredQuery structuredQuery) throws QueryDispatchExcept
return queryId;
}

@Transactional
public QueryResult getQueryResult(Long queryId) {
var singleSiteResults = resultRepository.findByQueryAndStatus(queryId, SUCCESS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,70 +28,72 @@ public interface BrokerClient {
void addQueryStatusListener(QueryStatusListener queryStatusListener) throws IOException;

/**
* Creates a new unpublished query.
* Creates a new unpublished broker specific query.
*
* @return The ID of the created query.
* @param backendQueryId Identifier for a backend specific query that a broker specific query is created for.
* @return Identifier of the broker specific query.
* @throws IOException IO/communication error
*/
String createQuery() throws IOException;
String createQuery(Long backendQueryId) throws IOException;

/**
* Adds the necessary query definition to a query.
* Adds the necessary query definition to a broker specific query.
* <p>
* The query definition is added to a query identified by the given query ID.
*
* @param queryId Identifies the query that the query definition shall be added to.
* @param mediaType ?
* @param content The actual query in plain text.
* @param brokerQueryId Identifies the broker specific query that the query definition shall be added to.
* @param mediaType MIME type for the query content.
* @param content The actual query in plain text.
* @throws QueryNotFoundException If the given query ID does not identify a known query.
* @throws UnsupportedMediaTypeException If the given media type is not supported.
* @throws IOException IO/communication error
*/
void addQueryDefinition(String queryId, String mediaType, String content) throws QueryNotFoundException,
void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException,
UnsupportedMediaTypeException, IOException;

/**
* Publishes a query for distributed execution.
* Publishes a broker specific query for distributed execution.
*
* @param queryId Identifies the query that shall be published.
* @param brokerQueryId Identifies the broker specific query that shall be published.
* @throws QueryNotFoundException If the given query ID does not identify a known query.
* @throws IOException If the query was not published due IO/communication error
*/
void publishQuery(String queryId) throws QueryNotFoundException, IOException;
void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException;

/**
* Marks an already published query as closed.
* Marks an already published broker specific query as closed.
* <p>
* After calling this function calls to {@link #getResultFeasibility(String, String)} and
* {@link #getResultSiteIds(String)} will fail.
*
* @param queryId Identifies the query that shall be closed.
* @param brokerQueryId Identifies the broker specific query that shall be closed.
* @throws QueryNotFoundException If the given query ID does not identify a known query.
* @throws IOException IO/communication error
*/
void closeQuery(String queryId) throws QueryNotFoundException, IOException;
void closeQuery(String brokerQueryId) throws QueryNotFoundException, IOException;

/**
* Gets the feasibility (measure count) of a published query for a specific site.
* Gets the feasibility (measure count) of a published broker specific query for a specific site.
*
* @param queryId Identifies the query.
* @param siteId Identifies the site within the query whose feasibility shall be gotten.
* @param brokerQueryId Identifies the broker specific query.
* @param siteId Identifies the site within the query whose feasibility shall be gotten.
* @return The feasibility for a specific site within a query.
* @throws QueryNotFoundException If the given query ID does not identify a known query.
* @throws SiteNotFoundException If the given site ID does not identify a known site within a query.
* @throws IOException IO/communication error
*/
int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException, IOException;
int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException,
IOException;

/**
* Gets all site IDs associated with a published query that can already provide a result.
* Gets all site IDs associated with a published broker specific query that can already provide a result.
*
* @param queryId Identifies the query whose associated site IDs with results shall be gotten.
* @param brokerQueryId Identifies the broker specific query whose associated site IDs with results shall be gotten.
* @return All site IDs of a specific query that can already provide results.
* @throws QueryNotFoundException If the given query ID does not identify a known query.
* @throws IOException IO/communication error
*/
List<String> getResultSiteIds(String queryId) throws QueryNotFoundException, IOException;
List<String> getResultSiteIds(String brokerQueryId) throws QueryNotFoundException, IOException;

/**
* Gets the display name of a site.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import de.numcodex.feasibility_gui_backend.query.broker.UnsupportedMediaTypeException;
import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener;
import de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType;
import lombok.AllArgsConstructor;
import org.aktin.broker.client2.BrokerAdmin2;
import org.aktin.broker.xml.Node;
import org.aktin.broker.xml.RequestStatusInfo;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Map;
import java.util.Objects;

import static de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType.AKTIN;

Expand All @@ -23,11 +24,16 @@
* @author R.W.Majeed
*
*/
@AllArgsConstructor
public class AktinBrokerClient implements BrokerClient {
final private BrokerAdmin2 delegate;
private final BrokerAdmin2 delegate;
private final Map<String, Long> brokerToBackendQueryIdMapping;

@Override
public AktinBrokerClient(BrokerAdmin2 delegate) {
this.delegate = Objects.requireNonNull(delegate);
this.brokerToBackendQueryIdMapping = new HashMap<>();
}

@Override
public BrokerClientType getBrokerType() {
return AKTIN;
}
Expand Down Expand Up @@ -55,46 +61,49 @@ int unwrapSiteId(String siteId) {
}

@Override
public String createQuery() throws IOException {
return wrapQueryId(delegate.createRequest());
}
public String createQuery(Long backendQueryId) throws IOException {
var brokerQueryId = wrapQueryId(delegate.createRequest());
brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId);

return brokerQueryId;
}

@Override
public void addQueryDefinition(String queryId, String mediaType, String content)
public void addQueryDefinition(String brokerQueryId, String mediaType, String content)
throws QueryNotFoundException, UnsupportedMediaTypeException, IOException {
delegate.putRequestDefinition(unwrapQueryId(queryId), mediaType, content);
delegate.putRequestDefinition(unwrapQueryId(brokerQueryId), mediaType, content);

}

@Override
public void publishQuery(String queryId) throws QueryNotFoundException, IOException {
delegate.publishRequest(unwrapQueryId(queryId));
public void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException {
delegate.publishRequest(unwrapQueryId(brokerQueryId));
}

@Override
public void closeQuery(String queryId) throws IOException {
delegate.closeRequest(unwrapQueryId(queryId));
public void closeQuery(String brokerQueryId) throws IOException {
delegate.closeRequest(unwrapQueryId(brokerQueryId));
}

@Override
public int getResultFeasibility(String queryId, String siteId)
public int getResultFeasibility(String brokerQueryId, String siteId)
throws QueryNotFoundException, SiteNotFoundException, IOException {
String result = delegate.getResultString(unwrapQueryId(queryId), unwrapSiteId(siteId));
String result = delegate.getResultString(unwrapQueryId(brokerQueryId), unwrapSiteId(siteId));
if( result == null ) {
throw new SiteNotFoundException(queryId, siteId);
throw new SiteNotFoundException(brokerQueryId, siteId);
}
return Integer.parseInt(result);
}

@Override
public List<String> getResultSiteIds(String queryId) throws QueryNotFoundException, IOException {
List<RequestStatusInfo> list = delegate.listRequestStatus(unwrapQueryId(queryId));
public List<String> getResultSiteIds(String brokerQueryId) throws QueryNotFoundException, IOException {
List<RequestStatusInfo> list = delegate.listRequestStatus(unwrapQueryId(brokerQueryId));
if( list == null ) {
throw new QueryNotFoundException(queryId);
throw new QueryNotFoundException(brokerQueryId);
}
return list.stream()
.map( (info) -> wrapSiteId(info.node) )
.collect(Collectors.toUnmodifiableList());
.toList();
}

@Override
Expand All @@ -106,4 +115,7 @@ public String getSiteName(String siteId) throws SiteNotFoundException, IOExcepti
return node.getCommonName();
}

Long getBackendQueryId(String brokerQueryId) {
return brokerToBackendQueryIdMapping.get(brokerQueryId);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.numcodex.feasibility_gui_backend.query.broker.aktin;

import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate;
import de.numcodex.feasibility_gui_backend.query.collect.QueryStatus;
import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener;
import lombok.AllArgsConstructor;
Expand All @@ -22,7 +23,7 @@
@Log
public class WrappedNotificationListener implements AdminNotificationListener{
final private AktinBrokerClient client;
final private QueryStatusListener delegate;
final private QueryStatusListener statusListener;

@Override
public void onRequestCreated(int requestId) {
Expand All @@ -41,23 +42,26 @@ public void onRequestClosed(int requestId) {

@Override
public void onRequestStatusUpdate(int requestId, int nodeId, String status) {
QueryStatus dest;
QueryStatus queryStatus;
RequestStatus rs;
try{
rs = RequestStatus.valueOf(status);
}catch( IllegalArgumentException e ) {
return; // unsupported/unrecognized status
}

dest = switch (rs) {
queryStatus = switch (rs) {
case completed -> QueryStatus.COMPLETED;
case processing -> QueryStatus.EXECUTING;
case queued -> QueryStatus.QUEUED;
case retrieved -> QueryStatus.RETRIEVED;
default -> QueryStatus.FAILED;
};
log.log(Level.INFO,"Request status updated {0} {1} {2}", new Object[] {requestId,nodeId,status});
delegate.onClientUpdate(client, client.wrapQueryId(requestId), client.wrapSiteId(nodeId), dest);
var statusUpdate = new QueryStatusUpdate(client, client.wrapQueryId(requestId), client.wrapSiteId(nodeId),
queryStatus);
var associatedBackendQueryId = client.getBackendQueryId(client.wrapQueryId(requestId));
statusListener.onClientUpdate(associatedBackendQueryId, statusUpdate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import de.numcodex.feasibility_gui_backend.query.QueryMediaType;
import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient;
import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate;
import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException;
import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException;
import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener;
Expand Down Expand Up @@ -35,7 +36,8 @@ public class DirectBrokerClient implements BrokerClient {

private final WebClient webClient;
private final List<QueryStatusListener> listeners;
private final Map<String, DirectQuery> queries;
private final Map<String, DirectQuery> brokerQueries;
private final Map<String, Long> brokerToBackendQueryIdMapping;

/**
* Creates a new {@link DirectBrokerClient} instance that uses the given web client to communicate with a Flare
Expand All @@ -46,7 +48,8 @@ public class DirectBrokerClient implements BrokerClient {
public DirectBrokerClient(WebClient webClient) {
this.webClient = Objects.requireNonNull(webClient);
listeners = new ArrayList<>();
queries = new HashMap<>();
brokerQueries = new HashMap<>();
brokerToBackendQueryIdMapping = new HashMap<>();
}

@Override
Expand All @@ -60,24 +63,26 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) {
}

@Override
public String createQuery() {
var query = DirectQuery.create();
queries.put(query.getQueryId(), query);
public String createQuery(Long backendQueryId) {
var brokerQuery = DirectQuery.create();
var brokerQueryId = brokerQuery.getQueryId();
brokerQueries.put(brokerQueryId, brokerQuery);
brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId);

return query.getQueryId();
return brokerQueryId;
}

@Override
public void addQueryDefinition(String queryId, String mediaType, String content) throws QueryNotFoundException {
findQuery(queryId).addQueryDefinition(mediaType, content);
public void addQueryDefinition(String brokerQueryId, String mediaType, String content) throws QueryNotFoundException {
findQuery(brokerQueryId).addQueryDefinition(mediaType, content);
}

@Override
public void publishQuery(String queryId) throws QueryNotFoundException, IOException {
var query = findQuery(queryId);
public void publishQuery(String brokerQueryId) throws QueryNotFoundException, IOException {
var query = findQuery(brokerQueryId);
var structuredQueryContent = Optional.ofNullable(query.getQueryDefinition(STRUCTURED_QUERY))
.orElseThrow(() -> new IllegalStateException("Query with ID "
+ queryId
+ brokerQueryId
+ " does not contain a query definition for the mandatory type: "
+ STRUCTURED_QUERY
));
Expand All @@ -96,32 +101,36 @@ public void publishQuery(String queryId) throws QueryNotFoundException, IOExcept
.map(Integer::valueOf)
.doOnError(error -> {
log.error(error.getMessage(), error);
listeners.forEach(l -> l.onClientUpdate(this, queryId, SITE_1_ID, FAILED));
var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, SITE_1_ID, FAILED);
var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId);
listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate));
})
.subscribe(val -> {
query.registerSiteResults(SITE_1_ID, val);
listeners.forEach(l -> l.onClientUpdate(this, queryId, SITE_1_ID, COMPLETED));
var statusUpdate = new QueryStatusUpdate(this, brokerQueryId, SITE_1_ID, COMPLETED);
var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(brokerQueryId);
listeners.forEach(l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate));
});
} catch (Exception e) {
throw new IOException("An error occurred while publishing the query with ID: " + queryId, e);
throw new IOException("An error occurred while publishing the query with ID: " + brokerQueryId, e);
}
}

@Override
public void closeQuery(String queryId) throws QueryNotFoundException {
if (queries.remove(queryId) == null) {
throw new QueryNotFoundException(queryId);
public void closeQuery(String brokerQueryId) throws QueryNotFoundException {
if (brokerQueries.remove(brokerQueryId) == null) {
throw new QueryNotFoundException(brokerQueryId);
}
}

@Override
public int getResultFeasibility(String queryId, String siteId) throws QueryNotFoundException, SiteNotFoundException {
return findQuery(queryId).getSiteResult(siteId);
public int getResultFeasibility(String brokerQueryId, String siteId) throws QueryNotFoundException, SiteNotFoundException {
return findQuery(brokerQueryId).getSiteResult(siteId);
}

@Override
public List<String> getResultSiteIds(String queryId) throws QueryNotFoundException {
return findQuery(queryId).getSiteIdsWithResult();
public List<String> getResultSiteIds(String brokerQueryId) throws QueryNotFoundException {
return findQuery(brokerQueryId).getSiteIdsWithResult();
}

@Override
Expand All @@ -137,7 +146,7 @@ public String getSiteName(String siteId) {
* @throws QueryNotFoundException If the ID does not identify a known query.
*/
private DirectQuery findQuery(String queryId) throws QueryNotFoundException {
return Optional.ofNullable(queries.get(queryId))
return Optional.ofNullable(brokerQueries.get(queryId))
.orElseThrow(() -> new QueryNotFoundException(queryId));
}

Expand Down
Loading

0 comments on commit 1f16acf

Please sign in to comment.