Skip to content

Commit

Permalink
Events can be published in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
z-sztrom committed Nov 14, 2024
1 parent 2b11dc9 commit 44aee91
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,15 @@ public RemRemPublishException(String message, RMQBeanConnectionFactory factory,
Throwable cause) {
super(message + factory.getHost() + ":" + factory.getPort(), cause);
}

@Override
public String getMessage() {
String message = super.getMessage();
Throwable cause = getCause();
if (cause != null) {
message += "; root cause: '" + cause.getMessage() + "'";
}

return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void init() {
//The exception can be safely handled here as there is a check for existence of exchange is done before each publish.
checkAndCreateExchangeIfNeeded();
} catch (RemRemPublishException e) {
log.error("Error occured while setting up the RabbitMq Connection. "+e.getMessage());
log.error("Error occurred while setting up the RabbitMq Connection. "+e.getMessage());
e.printStackTrace();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,4 @@ public interface MessageService {
* Does the cleanup like closing open connections
*/
public void cleanUp();

/**
* Implemented Status code for the response
*/
public HttpStatus getHttpStatus();

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.ericsson.eiffel.remrem.publish.service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -49,11 +50,6 @@

Logger log = (Logger) LoggerFactory.getLogger(MessageServiceRMQImpl.class);

/*Variables handles status codes*/
List<Integer> statusCodes;
List<JsonElement> errorItems;
List<PublishResultItem> resultList;
boolean checkEventStatus;
/*
* (non-Javadoc)
* @see com.ericsson.eiffel.remrem.publish.service.MessageService#send(java.util.Map, java.util.Map)
Expand All @@ -74,7 +70,6 @@ public SendResult send(Map<String, String> routingKeyMap, Map<String, String> ms
} else {
event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN,
PropertiesConfig.SERVER_DOWN_MESSAGE);
checkEventStatus = false;
}
} catch (NackException e) {
event = new PublishResultItem(entryKey, HttpStatus.INTERNAL_SERVER_ERROR.value(), PropertiesConfig.SERVER_DOWN,
Expand Down Expand Up @@ -105,45 +100,43 @@ public SendResult send(Map<String, String> routingKeyMap, Map<String, String> ms
*/
@Override
public SendResult send(String jsonContent, MsgService msgService, String userDomainSuffix, String tag, String routingKey) {

JsonParser parser = new JsonParser();
try {
JsonElement json = parser.parse(jsonContent);
JsonElement json = JsonParser.parseString(jsonContent);
if (json.isJsonArray()) {
return send(json, msgService, userDomainSuffix, tag, routingKey);
} else {
Map<String, String> map = new HashMap<>();
Map<String, String> routingKeyMap = new HashMap<>();
String eventId = msgService.getEventId(json.getAsJsonObject());
if (StringUtils.isNotBlank(eventId)) {
String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey);
if (StringUtils.isNotBlank(routing_key)) {
map.put(eventId, json.toString());
routingKeyMap.put(eventId, routing_key);
} else if (routing_key == null) {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
routingKeyGenerationFailure(resultItemList);
return new SendResult(resultItemList);
} else {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
resultItemList.add(resultItem);
return new SendResult(resultItemList);
}
}

Map<String, String> map = new HashMap<>();
Map<String, String> routingKeyMap = new HashMap<>();
String eventId = msgService.getEventId(json.getAsJsonObject());
if (StringUtils.isNotBlank(eventId)) {
String routing_key = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, userDomainSuffix, tag, routingKey);
if (StringUtils.isNotBlank(routing_key)) {
map.put(eventId, json.toString());
routingKeyMap.put(eventId, routing_key);
} else if (routing_key == null) {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
routingKeyGenerationFailure(resultItemList);
return new SendResult(resultItemList);
} else {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
createFailureResult(resultItemList);
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
resultItemList.add(resultItem);
return new SendResult(resultItemList);
}
return send(routingKeyMap, map, msgService);
} else {
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
createFailureResult(resultItemList);
return new SendResult(resultItemList);
}
return send(routingKeyMap, map, msgService);
} catch (final JsonSyntaxException e) {
String resultMsg = "Could not parse JSON.";
if (e.getCause() != null) {
resultMsg = resultMsg + " Cause: " + e.getCause().getMessage();
}
log.error(resultMsg, e.getMessage());
List<PublishResultItem> resultItemList = new CopyOnWriteArrayList<>();
List<PublishResultItem> resultItemList = new ArrayList<>();
createFailureResult(resultItemList);
return new SendResult(resultItemList);
}
Expand All @@ -155,76 +148,64 @@ public SendResult send(String jsonContent, MsgService msgService, String userDom
*/
@Override
public SendResult send(JsonElement json, MsgService msgService, String userDomainSuffix, String tag, String routingKey) {

List<PublishResultItem> resultList;
boolean checkEventStatus;

Map<String, String> map = new HashMap<>();
Map<String, String> routingKeyMap = new HashMap<>();

SendResult result;
resultList = new CopyOnWriteArrayList<PublishResultItem>();
resultList = new ArrayList<>();
if (json == null) {
createFailureResult(resultList);
}
if (json.isJsonArray()) {
statusCodes = new CopyOnWriteArrayList<Integer>();
checkEventStatus = true;
JsonArray bodyJson = json.getAsJsonArray();
for (JsonElement obj : bodyJson) {
String eventId = msgService.getEventId(obj.getAsJsonObject());
if (StringUtils.isNotEmpty(eventId) && checkEventStatus) {
String routing_key = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap,
userDomainSuffix, tag, routingKey);
if (StringUtils.isNotBlank(routing_key)) {
result = send(obj.toString(), msgService, userDomainSuffix, tag, routing_key);
resultList.addAll(result.getEvents());
int statusCode = result.getEvents().get(0).getStatusCode();
if (!statusCodes.contains(statusCode))
statusCodes.add(statusCode);
} else if (routing_key == null) {
routingKeyGenerationFailure(resultList);
errorItems = new CopyOnWriteArrayList<JsonElement>();
int statusCode = resultList.get(0).getStatusCode();
statusCodes.add(statusCode);
errorItems.add(obj);
checkEventStatus = false;
} else {
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
resultList.add(resultItem);
int statusCode = resultItem.getStatusCode();
statusCodes.add(statusCode);
break;
}
} else {
if (!checkEventStatus) {
addUnsuccessfulResultItem(obj);
int statusCode = resultList.get(0).getStatusCode();
statusCodes.add(statusCode);
else {
if (json.isJsonArray()) {
checkEventStatus = true;
JsonArray bodyJson = json.getAsJsonArray();
for (JsonElement obj : bodyJson) {
String eventId = msgService.getEventId(obj.getAsJsonObject());
if (StringUtils.isNotEmpty(eventId) && checkEventStatus) {
String routing_key = getAndCheckEvent(msgService, map, resultList, obj, routingKeyMap,
userDomainSuffix, tag, routingKey);
if (StringUtils.isNotBlank(routing_key)) {
result = send(obj.toString(), msgService, userDomainSuffix, tag, routing_key);
resultList.addAll(result.getEvents());
} else if (routing_key == null) {
routingKeyGenerationFailure(resultList);
checkEventStatus = false;
} else {
PublishResultItem resultItem = rabbitmqConfigurationNotFound(msgService);
resultList.add(resultItem);
break;
}
} else {
createFailureResult(resultList);
errorItems = new CopyOnWriteArrayList<JsonElement>();
int statusCode = resultList.get(0).getStatusCode();
statusCodes.add(statusCode);
errorItems.add(obj);
checkEventStatus = false;
if (!checkEventStatus) {
addUnsuccessfulResultItem(resultList, obj);
} else {
createFailureResult(resultList);
checkEventStatus = false;
}
}
}
} else {
result = send(json.toString(), msgService, userDomainSuffix, tag, routingKey);
resultList.addAll(result.getEvents());
}
} else {
statusCodes = new CopyOnWriteArrayList<Integer>();
result = send(json.toString(), msgService, userDomainSuffix, tag, routingKey);
resultList.addAll(result.getEvents());
int statusCode = result.getEvents().get(0).getStatusCode();
if (!statusCodes.contains(statusCode))
statusCodes.add(statusCode);
}

result = new SendResult();
result.setEvents(resultList);
return result;
}

private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,NackException, TimeoutException, RemRemPublishException {
private String sendMessage(String routingKey, String msg, MsgService msgService) throws IOException,TimeoutException, RemRemPublishException {
String resultMsg = PropertiesConfig.SUCCESS;
try {
instantiateRmqHelper();
} catch (RemRemPublishException e) {
log.error("RemRemPublishException occurred::" + e.getMessage());
log.error("RemRemPublishException occurred::{}", e.getMessage());
}
rmqHelper.send(routingKey, msg, msgService);
return resultMsg;
Expand Down Expand Up @@ -268,7 +249,7 @@ private String getAndCheckEvent(MsgService msgService, Map<String, String> map,

/**
* Method returns result for the failure event.
* @param events for list the eiffel events results
* @param resultItemList for list the eiffel events results
*/
private void createFailureResult(List<PublishResultItem> resultItemList) {
PublishResultItem resultItem = new PublishResultItem(null, 400, PropertiesConfig.INVALID_MESSAGE,
Expand All @@ -293,21 +274,9 @@ private void routingKeyGenerationFailure(List<PublishResultItem> resultItemList)
resultItemList.add(resultItem);
}

private void addUnsuccessfulResultItem(JsonElement obj) {
private void addUnsuccessfulResultItem(List<PublishResultItem> resultList, JsonElement obj) {
PublishResultItem event = new PublishResultItem(null, 503, PropertiesConfig.SERVICE_UNAVAILABLE,
PropertiesConfig.UNSUCCESSFUL_EVENT_CONTENT);
resultList.add(event);
}

/**
* Method returns the Http response code for the events.
*/
public HttpStatus getHttpStatus() {
if (statusCodes.size() > 1) {
return HttpStatus.MULTI_STATUS;
} else {
return HttpStatus.valueOf(statusCodes.get(0));

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.*;

import com.ericsson.eiffel.remrem.publish.service.*;
import com.google.gson.*;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -27,6 +28,7 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.lang.NonNull;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.UserDetails;
Expand All @@ -43,10 +45,6 @@
import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException;
import com.ericsson.eiffel.remrem.publish.helper.PublishUtils;
import com.ericsson.eiffel.remrem.publish.helper.RMQHelper;
import com.ericsson.eiffel.remrem.publish.service.EventTemplateHandler;
import com.ericsson.eiffel.remrem.publish.service.GenerateURLTemplate;
import com.ericsson.eiffel.remrem.publish.service.MessageService;
import com.ericsson.eiffel.remrem.publish.service.SendResult;
import com.fasterxml.jackson.databind.JsonNode;

import ch.qos.logback.classic.Logger;
Expand Down Expand Up @@ -111,6 +109,7 @@ public void setRestTemplate(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}

private int callsInSend = 0;
public void logUserName() {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
// Check if the user is authenticated
Expand Down Expand Up @@ -165,12 +164,31 @@ public ResponseEntity send(final String msgProtocol, final String userDomain, fi

}
synchronized (this) {
callsInSend++;
log.info("callsInSend (before): " + callsInSend);
SendResult result = messageService.send(body, msgService, userDomain, tag, routingKey);
log.info("HTTP Status: {}", messageService.getHttpStatus().value());
return new ResponseEntity(result, messageService.getHttpStatus());
callsInSend--;
HttpStatus status = getHttpStatus(result);
log.info("callsInSend (after): " + callsInSend);
log.info("HTTP Status: {}", status.value());
return new ResponseEntity(result, status);
}
}

private HttpStatus getHttpStatus(SendResult result) {
List<PublishResultItem> events = result.getEvents();
HttpStatus status;
int nevents = events.size();
if (nevents == 0) {
return HttpStatus.BAD_REQUEST;
}
else if (events.size() == 1) {
return HttpStatus.valueOf(events.get(0).getStatusCode());
}
else {
return HttpStatus.MULTI_STATUS;
}
}
/**
* This controller used as producer to send messages or event
* @param msgProtocol
Expand Down Expand Up @@ -275,6 +293,11 @@ public ResponseEntity generateAndPublish(@ApiParam(value = "message protocol", r
}
}

private boolean eventTypeExists(@NonNull MsgService msgService, String eventType) {
Collection<String> supportedEventTypes = msgService.getSupportedEventTypes();
return supportedEventTypes != null && supportedEventTypes.contains(eventType);
}

/**
* This controller provides single RemRem REST API End Point for both RemRem
* Generate and Publish.
Expand Down Expand Up @@ -345,6 +368,11 @@ public ResponseEntity generateAndPublish(final String msgProtocol, final String
parsedTemplates.append("[");
for (JsonElement eventJson : events) {
// -- parse params in incoming request -> body -------------
if (!eventTypeExists(msgService, msgType)) {
return createResponseEntity(HttpStatus.BAD_REQUEST, JSON_ERROR_STATUS,
"Unknown event type '" + msgType + "'");
}

JsonNode parsedTemplate = eventTemplateHandler.eventTemplateParser(eventJson.toString(), msgType);
if (parsedTemplates.length() > 1) {
parsedTemplates.append(",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void setUp() throws Exception {

when(service.getServiceName()).thenReturn("eiffelsemantics");
when(service2.getServiceName()).thenReturn("eiffelsemantics");
when(messageService.getHttpStatus()).thenReturn(HttpStatus.OK);
// when(messageService.getHttpStatus()).thenReturn(HttpStatus.OK);

when(messageService.send(ArgumentMatchers.anyString(), ArgumentMatchers.any(MsgService.class), ArgumentMatchers.anyString(),
ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn(res);
Expand Down

0 comments on commit 44aee91

Please sign in to comment.