Skip to content

Commit

Permalink
Fixing missing events issues (#508)
Browse files Browse the repository at this point in the history
* Rebasing with eiffel-community/master
Fixing missing events issues

* Revert "Fixing missing events issues"

This reverts commit 2d3571f.

* Fixed Missing events issues

* Refactoring as per review comments

* Refactoring as per review comments

* Refectoring as per review comments

* Printing environment vars in travis

* Printing environment vars in travis with echo

* Refactoring code

* Removing echo statements

* Removing printenv

* Printing printenv

* Removing printenv

* Resolving conflicts

* Refectoring as per review comments

* Changing project version to 3.2.0

* Refectoring as per review comments

Co-authored-by: Vishalkumar Panchal <[email protected]>
  • Loading branch information
vrpanchal and Vishalkumar Panchal authored Oct 22, 2021
1 parent a1bd6ad commit d5fd9f5
Show file tree
Hide file tree
Showing 20 changed files with 123 additions and 72 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.ericsson</groupId>
<artifactId>eiffel-intelligence</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<packaging>war</packaging>

<parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import javax.servlet.http.HttpServletRequest;

import com.ericsson.ei.exception.InvalidRulesException;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
Expand All @@ -20,10 +19,12 @@

import com.ericsson.ei.controller.model.RuleCheckBody;
import com.ericsson.ei.controller.model.RulesCheckBody;
import com.ericsson.ei.exception.InvalidRulesException;
import com.ericsson.ei.jmespath.JmesPathInterface;
import com.ericsson.ei.rules.IRuleTestService;
import com.ericsson.ei.utils.ResponseMessage;

import io.netty.util.internal.StringUtil;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.Setter;
Expand Down Expand Up @@ -85,8 +86,14 @@ public ResponseEntity<?> createRuleTestRunFullAggregation(
final HttpServletRequest httpRequest) {
if (testEnabled) {
try {
String aggregatedObject = ruleTestService.prepareAggregatedObject(
new JSONArray(body.getListRulesJson()), new JSONArray(body.getListEventsJson()));
String aggregatedObject = StringUtil.EMPTY_STRING;
try {
aggregatedObject = ruleTestService.prepareAggregatedObject(
new JSONArray(body.getListRulesJson()), new JSONArray(body.getListEventsJson()));
} catch (Exception e) {
String errorMessage = "Failed to generate aggregated object.";
LOGGER.error(errorMessage, e);
}
if (aggregatedObject != null && !aggregatedObject.equals("[]")) {
return new ResponseEntity<>(aggregatedObject, HttpStatus.OK);
} else {
Expand All @@ -96,11 +103,7 @@ public ResponseEntity<?> createRuleTestRunFullAggregation(
return new ResponseEntity<>(errorJsonAsString, HttpStatus.BAD_REQUEST);
}
}
catch (InvalidRulesException e) {
String errorJsonAsString = ResponseMessage.createJsonMessage(e.getMessage());
return new ResponseEntity<>(errorJsonAsString, HttpStatus.BAD_REQUEST);
}
catch (JSONException | IOException e) {
catch (JSONException e) {
String errorMessage = "Failed to generate aggregated object.";
LOGGER.error(errorMessage, e);
String errorJsonAsString = ResponseMessage.createJsonMessage(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public ResponseEntity sendRequestToER(String eventId, SearchOption searchOption,
return request.performRequest();
}


private HttpRequest prepareRequest(String eventId, SearchOption searchOption, int limit,
int levels, boolean tree, HttpRequest request) throws IOException, URISyntaxException {
Boolean shallowParameter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.ericsson.ei.exception.MongoDBConnectionException;
import com.ericsson.ei.jmespath.JmesPathInterface;
import com.ericsson.ei.jsonmerge.DownstreamMergeHandler;
import com.ericsson.ei.rules.RulesObject;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoExecutionTimeoutException;

@Component
public class DownstreamExtractionHandler {
Expand All @@ -35,6 +37,7 @@ public class DownstreamExtractionHandler {
@Autowired private JmesPathInterface jmesPathInterface;
@Autowired private DownstreamMergeHandler mergeHandler;
@Autowired private ObjectHandler objectHandler;


public void runExtraction(RulesObject rulesObject, String mergeId, String event, String aggregatedDbObject) {
try {
Expand All @@ -47,7 +50,8 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
}
}

public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) {
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject)
throws MongoExecutionTimeoutException, MongoDBConnectionException {
JsonNode extractedContent;
extractedContent = extractContent(rulesObject, event);
LOGGER.debug("Start extraction of Aggregated Object:\n{} \nwith Event:\n{}", aggregatedDbObject.toString(), event);
Expand Down
30 changes: 19 additions & 11 deletions src/main/java/com/ericsson/ei/handlers/EventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package com.ericsson.ei.handlers;

import com.ericsson.ei.exception.MongoDBConnectionException;
import com.ericsson.ei.rules.IdRulesHandler;
import org.apache.http.conn.HttpHostConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
Expand All @@ -26,13 +25,16 @@
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import com.ericsson.ei.exception.MongoDBConnectionException;
import com.ericsson.ei.rules.IdRulesHandler;
import com.ericsson.ei.rules.RulesHandler;
import com.ericsson.ei.rules.RulesObject;
import com.ericsson.ei.utils.MongoDBMonitorThread;
import com.ericsson.ei.utils.SpringContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoExecutionTimeoutException;
import com.rabbitmq.client.Channel;
import com.ericsson.ei.utils.MongoDBMonitorThread;
import com.ericsson.ei.utils.SpringContext;

@Component
public class EventHandler {
Expand All @@ -58,9 +60,10 @@ public RulesHandler getRulesHandler() {
return rulesHandler;
}

public void eventReceived(String event) throws MongoDBConnectionException {
public void eventReceived(String event, final boolean isRelivered)
throws MongoDBConnectionException, Exception {
RulesObject eventRules = rulesHandler.getRulesForEvent(event);
idRulesHandler.runIdRules(eventRules, event);
idRulesHandler.runIdRules(eventRules, event, isRelivered);
}

@Async
Expand All @@ -69,12 +72,13 @@ public void onMessage(Message message, Channel channel) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(messageBody);
String id = node.get("meta").get("id").toString();

final boolean isRedelivered = message.getMessageProperties().isRedelivered();
final int waitBeforeSendBack = 2000;
long deliveryTag = message.getMessageProperties().getDeliveryTag();
LOGGER.debug("Thread id {} spawned for EventHandler", Thread.currentThread().getId());
try {
LOGGER.info("Event {} Received", id);
eventReceived(messageBody);
eventReceived(messageBody, isRedelivered);
channel.basicAck(deliveryTag, false);
LOGGER.info("Event {} processed", id);
} catch (MongoDBConnectionException mdce) {
Expand Down Expand Up @@ -108,10 +112,14 @@ public void onMessage(Message message, Channel channel) throws Exception {
// once the mongoDB Connection is up event will be sent back to queue with
// un-acknowledgement
channel.basicNack(deliveryTag, false, true);
LOGGER.info(
"Sent back the event {} to queue with un-acknowledgement due to {}", id, mdce);
LOGGER.info("Sent back the event {} to queue with un-acknowledgement due to {}", id, mdce);
} catch (HttpHostConnectException | MongoExecutionTimeoutException e) {
LOGGER.info("Waiting for {} mili-seconds before sending the event back to queue", waitBeforeSendBack);
Thread.sleep(waitBeforeSendBack);
channel.basicNack(deliveryTag, false, true);
LOGGER.info("Sent back the event {} to queue with un-acknowledgement: ", id);
} catch (Exception e) {
LOGGER.error("Event is not Re-queued due to exception for id: {} Exception: {} ", id, e);
LOGGER.error("Event is not Re-queued due to exception for id: {} Exception: {} ", id, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public void init() throws AbortExecutionException {
LOGGER.error("Failed to create an index for {} due to: {}", collectionName, e);
}
}



public void setCollectionName(String collectionName) {
this.collectionName = collectionName;
}
Expand Down Expand Up @@ -112,8 +111,7 @@ public ArrayList<String> getObjectsForEventId(String eventId) {
* @param event
* @param objectId aggregated event object Id
*/
public void updateEventToObjectMapInMemoryDB(RulesObject rulesObject, String event,
String objectId, int ttlValue) {
public void updateEventToObjectMapInMemoryDB(RulesObject rulesObject, String event, String objectId, int ttlValue) {
String eventId = getEventId(rulesObject, event);

final MongoCondition condition = MongoCondition.idCondition(objectId);
Expand Down
39 changes: 26 additions & 13 deletions src/main/java/com/ericsson/ei/handlers/ExtractionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package com.ericsson.ei.handlers;

import com.ericsson.ei.rules.ProcessRulesHandler;
import java.io.IOException;

import org.apache.http.conn.HttpHostConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -26,10 +28,12 @@
import com.ericsson.ei.exception.PropertyNotFoundException;
import com.ericsson.ei.jmespath.JmesPathInterface;
import com.ericsson.ei.jsonmerge.MergeHandler;
import com.ericsson.ei.rules.ProcessRulesHandler;
import com.ericsson.ei.rules.RulesObject;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.mongodb.MongoExecutionTimeoutException;

@Component
public class ExtractionHandler {
Expand Down Expand Up @@ -62,23 +66,21 @@ public void setObjectHandler(ObjectHandler objectHandler) {
this.objectHandler = objectHandler;
}

public void runExtraction(RulesObject rulesObject, String id, String event, String aggregatedDbObject) throws MongoDBConnectionException {
public void runExtraction(RulesObject rulesObject, String id, String event, String aggregatedDbObject, boolean isRedelivered)
throws HttpHostConnectException, MongoExecutionTimeoutException, MongoDBConnectionException {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode aggregatedJsonObject = mapper.readTree(aggregatedDbObject);
runExtraction(rulesObject, id, event, aggregatedJsonObject);
} catch (Exception e) {
LOGGER.error("Failed with extraction.", e);
if (e.getMessage() != null && e.getMessage().equalsIgnoreCase("MongoDB Connection down")) {
throw new MongoDBConnectionException("MongoDB Connection down");
}
runExtraction(rulesObject, id, event, aggregatedJsonObject, isRedelivered);
} catch (IOException e) {
LOGGER.warn("Failed to read the aggregated object due to {} ", e);
}
}

public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) throws MongoDBConnectionException {
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject, boolean isRedelivered)
throws HttpHostConnectException, MongoExecutionTimeoutException, MongoDBConnectionException {
try {
JsonNode extractedContent = extractContent(rulesObject, event);

String mergedContent = null;
String aggregatedObjectId = null;

Expand All @@ -89,7 +91,17 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
aggregatedDbObject.toString(), extractedContent.toString(), event);
aggregatedObjectId = objectHandler.extractObjectId(aggregatedDbObject);
mergedContent = mergeHandler.mergeObject(aggregatedObjectId, mergeId, rulesObject, event, extractedContent);
mergedContent = processRulesHandler.runProcessRules(event, rulesObject, mergedContent, aggregatedObjectId, mergeId);
if (mergedContent == null) {
return;
}

// Need to extract the history rules for the re-delivered start event type.
if (rulesObject.isStartEventRules() && isRedelivered) {
upStreamEventsHandler.runHistoryExtractionRulesOnAllUpstreamEvents(mergeId);
} else {
mergedContent = processRulesHandler.runProcessRules(event, rulesObject, mergedContent,
aggregatedObjectId, mergeId);
}
} else {
LOGGER.trace("***** Extraction starts for the aggregation Id: " + mergeId);
ObjectNode objectNode = (ObjectNode) extractedContent;
Expand All @@ -101,8 +113,9 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
LOGGER.trace("**** Extraction ends for the aggregation Id: " + mergeId);
}
objectHandler.checkAggregations(mergedContent, aggregatedObjectId);
} catch (PropertyNotFoundException e) {
LOGGER.debug("Did not run history extraction on upstream events.", e);
} catch (HttpHostConnectException | MongoExecutionTimeoutException e) {
LOGGER.warn("Extraction failed for {}, due to {}. Sending back to queue.", event, e.getMessage());
throw e;
} catch (Exception e) {
LOGGER.error("Failed to run extraction for event {}", event, e);
if (e.getMessage() != null && e.getMessage().equalsIgnoreCase("MongoDB Connection down")) {
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/com/ericsson/ei/handlers/ObjectHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

import javax.annotation.PostConstruct;

import com.ericsson.ei.mongo.*;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.slf4j.Logger;
Expand All @@ -35,8 +33,14 @@
import com.ericsson.ei.exception.AbortExecutionException;
import com.ericsson.ei.exception.MongoDBConnectionException;
import com.ericsson.ei.jmespath.JmesPathInterface;
import com.ericsson.ei.mongo.MongoCondition;
import com.ericsson.ei.mongo.MongoConstants;
import com.ericsson.ei.mongo.MongoDBHandler;
import com.ericsson.ei.mongo.MongoQuery;
import com.ericsson.ei.mongo.MongoQueryBuilder;
import com.ericsson.ei.rules.RulesObject;
import com.ericsson.ei.subscription.SubscriptionHandler;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
Expand Down Expand Up @@ -82,7 +86,7 @@ public class ObjectHandler {
@Setter
@Autowired
private SubscriptionHandler subscriptionHandler;

@PostConstruct
public void init() throws AbortExecutionException {
try {
Expand All @@ -93,6 +97,7 @@ public void init() throws AbortExecutionException {
LOGGER.error("Failed to create an index for {} due to: {}", aggregationsCollectionName, e);
}
}

/**
* This method is responsible for inserting an aggregated object in to the database.
*
Expand All @@ -113,14 +118,15 @@ public String insertObject(String aggregatedObject, RulesObject rulesObject, Str
BasicDBObject document = prepareDocumentForInsertion(id, aggregatedObject);
LOGGER.debug("ObjectHandler: Aggregated Object document to be inserted: {}",
document.toString());

mongoDbHandler.insertDocument(databaseName, aggregationsCollectionName, document.toString());
postInsertActions(aggregatedObject, rulesObject, event, id);
return aggregatedObject;
}

public String insertObject(JsonNode aggregatedObject, RulesObject rulesObject, String event,
String id) throws MongoDBConnectionException {
return insertObject(aggregatedObject.toString(), rulesObject, event, id);
return insertObject(aggregatedObject.toString(), rulesObject, event, id);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,24 @@ public void setHistoryExtractionHandler(final HistoryExtractionHandler historyEx
* @throws Exception
* @throws PropertyNotFoundException
*/
public void runHistoryExtractionRulesOnAllUpstreamEvents(String aggregatedObjectId) throws PropertyNotFoundException, Exception {
public void runHistoryExtractionRulesOnAllUpstreamEvents(String aggregatedObjectId) throws Exception {

// Use aggregatedObjectId as eventId since they are the same for start
// events.
long start = System.currentTimeMillis();
final ResponseEntity responseEntity = eventRepositoryQueryService
.getEventStreamDataById(aggregatedObjectId, SearchOption.UP_STREAM, -1, -1, true);

long stop = System.currentTimeMillis();
LOGGER.debug("%%%% Response time for upstream query for id: {}: {} ", aggregatedObjectId, stop-start);

LOGGER.debug("ResponseEntity: " + responseEntity);

if (responseEntity == null) {
LOGGER.info("Asked for upstream from {} but got null response entity back!", aggregatedObjectId);
return;
}

final String searchResultString = responseEntity.getBody();
LOGGER.debug("Search result string is: " + searchResultString);
ObjectMapper mapper = new ObjectMapper();
final JsonNode searchResult = mapper.readTree(searchResultString);

Expand Down
Loading

0 comments on commit d5fd9f5

Please sign in to comment.