Skip to content

Commit

Permalink
Implemented changes to adapt routingkey template to sepia (#247)
Browse files Browse the repository at this point in the history
* Implemented changes to adapt routingkey template to sepia
  • Loading branch information
jainadc9 authored Aug 22, 2023
1 parent 2eeb088 commit 7ade041
Show file tree
Hide file tree
Showing 13 changed files with 354 additions and 10 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2.1.0
- Implemented new routing key template for Sepia.
-
## 2.0.30
- Upgrading to OpenJDK 17

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
<version>2.0.12</version>
</parent>
<properties>
<eiffel-remrem-publish.version>2.0.30</eiffel-remrem-publish.version>
<eiffel-remrem-semantics.version>2.2.6</eiffel-remrem-semantics.version>
<eiffel-remrem-publish.version>2.1.0</eiffel-remrem-publish.version>
<eiffel-remrem-semantics.version>2.3.0</eiffel-remrem-semantics.version>
</properties>
<artifactId>eiffel-remrem-publish</artifactId>
<version>${eiffel-remrem-publish.version}</version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class CliOptions {

static private Options options=null;
static private CommandLine commandLine;

private static final String SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH = "semanticsRoutingKeyTypeOverrideFilepath";
//Used for testing purposes
private static ArrayList<Integer> testErrorCodes = new ArrayList<>();

Expand Down Expand Up @@ -85,6 +85,8 @@ public static void createCLIOptions() {
options.addOption("tag", "tag", true, "tag to be used in routing key");
options.addOption("rk", "routing_key", true, "routing key of the eiffel message. When provided routing key is not generated and the value provided is used.");
options.addOption("tto", "tcp_time_out", true, "specifies tcp connection timeout, default time is 60000 milliseconds");
options.addOption("srkt", SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH, true, "Default uses the routing key defined in Eiffel Sepia.To make it compatible to prior routing key structure provide the path to routing-key-overrides.properties.");

contentGroup = createContentGroup();
options.addOptionGroup(contentGroup);
}
Expand Down Expand Up @@ -252,6 +254,12 @@ public static void handleMessageBusOptions() throws HandleMessageBusException {
System.setProperty(key, tls_ver);
}

if (commandLine.hasOption(SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH)) {
String semanticsRoutingKeyTypeOverrideFilepath =commandLine.getOptionValue(SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH);
String key = PropertiesConfig.SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH;
System.setProperty(key, semanticsRoutingKeyTypeOverrideFilepath);
}

String usePersistance = "true";
if (commandLine.hasOption("np")) {
usePersistance = "false";
Expand Down Expand Up @@ -288,6 +296,7 @@ public static void clearSystemProperties() {
System.clearProperty(PropertiesConfig.CHANNELS_COUNT);
System.clearProperty(PropertiesConfig.TCP_TIMEOUT);
System.clearProperty(PropertiesConfig.WAIT_FOR_CONFIRMS_TIME_OUT);
System.clearProperty(PropertiesConfig.SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class PropertiesConfig {
public static final String SERVICE_UNAVAILABLE = "Service Unavailable";

public static final String CREATE_EXCHANGE_IF_NOT_EXISTING = "com.ericsson.eiffel.remrem.publish.messagebus.createExchange";
public static final String SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH = "com.ericsson.eiffel.remrem.publish.messagebus.semanticsRoutingKeyTypeOverrideFilepath";
public static final String INVALID_EXCHANGE = "Exchange not found, Please check exchange configuration and try again";
public static final String INVALID_EXCHANGE_MESSAGE_CLI = " Unavailable. To create the exchange specify -ce or --create_exchange to true )";
public static final String INVALID_EXCHANGE_MESSAGE_SERVICE = " ExchangeName is not present, To create the exchange specify createExchangeIfNotExisting in application configuration";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class RabbitMqPropertiesConfig {
@Value("${jasypt.encryptor.jasyptKeyFilePath:#{null}}")
private String jasyptKeyFilePath;

@Value("${semanticsRoutingKeyTypeOverrideFilepath:#{null}}")
private String semanticsRoutingKeyTypeOverrideFilepath;

private Map<String, RabbitMqProperties> rabbitMqPropertiesMap = new HashMap<String, RabbitMqProperties>();

@Autowired
Expand Down Expand Up @@ -154,6 +157,9 @@ private void readSpringProperties() {
rabbitMqProperties.setChannelsCount(
Integer.parseInt(channelsCount));
}

rabbitMqProperties.setRoutingKeyTypeOverrideFilePath(semanticsRoutingKeyTypeOverrideFilepath);

String waitForConfirmsTimeOut = getPropertyAsText(rabbitmqInstanceObject, PROPERTY_WAIT_FOR_CONFIRMS_TIMEOUT);
if (waitForConfirmsTimeOut != null) {
rabbitMqProperties.setWaitForConfirmsTimeOut(Long.parseLong(waitForConfirmsTimeOut));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,16 @@ public static String getRoutingKey(MsgService msgService, JsonObject json, RMQHe
String domainId = rabbitMqProperties.getDomainId();
if (rabbitMqProperties != null && rabbitMqProperties.getExchangeName() != null && rabbitMqProperties.getHost() != null
&& (cliMode || (!cliMode && StringUtils.isNotBlank(domainId)))) {
return StringUtils.defaultIfBlank(routingKey, msgService.generateRoutingKey(json, tag, domainId, userDomainSuffix));

if (StringUtils.isNotBlank(routingKey)) {
return routingKey;
} else if (StringUtils.isNotBlank(rabbitMqProperties.getRoutingKeyTypeOverrideFilePath())) {
String type = rabbitMqProperties.getTypeRoutingKeyFromConfiguration(msgService.getEventType(json));
if (StringUtils.isNotBlank(type)) {
return msgService.generateRoutingKey(json, tag, domainId, userDomainSuffix, type);
}
}
return msgService.generateRoutingKey(json, tag, domainId, userDomainSuffix);
}
return "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
*/
package com.ericsson.eiffel.remrem.publish.helper;

import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.MissingResourceException;
import java.util.Random;
import java.util.ResourceBundle;
import java.util.concurrent.TimeoutException;
import java.util.PropertyResourceBundle;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.LoggerFactory;

import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig;
Expand Down Expand Up @@ -52,6 +57,7 @@ public class RabbitMqProperties {
private String domainId;
private Integer channelsCount;
private boolean createExchangeIfNotExisting;
private String routingKeyTypeOverrideFilePath;
private Integer tcpTimeOut;
private boolean hasExchange = false;
// built in tcp connection timeout value for MB in milliseconds.
Expand All @@ -62,12 +68,17 @@ public class RabbitMqProperties {
public static final String CONTENT_TYPE = "application/json";
public static final String ENCODING_TYPE = "UTF-8";
public static final BasicProperties PERSISTENT_BASIC_APPLICATION_JSON;
public static final String SEMANTICS_MESSAGE_PROTOCOL = "eiffelsemantics";

private Connection rabbitConnection;
private String protocol;

private List<Channel> rabbitChannels;

private ResourceBundle types;
private final String TYPE = "type";
private final String DOT = ".";

Logger log = (Logger) LoggerFactory.getLogger(RMQHelper.class);

static {
Expand Down Expand Up @@ -157,6 +168,14 @@ public void setCreateExchangeIfNotExisting(boolean createExchangeIfNotExisting)
this.createExchangeIfNotExisting = createExchangeIfNotExisting;
}

public String getRoutingKeyTypeOverrideFilePath() {
return routingKeyTypeOverrideFilePath;
}

public void setRoutingKeyTypeOverrideFilePath(String routingKeyTypeOverrideFilePath) {
this.routingKeyTypeOverrideFilePath = routingKeyTypeOverrideFilePath;
}

public Integer getChannelsCount() {
return channelsCount;
}
Expand Down Expand Up @@ -229,9 +248,6 @@ public void init() {
factory.setUsername(username);
factory.setPassword(password);
}




if (tlsVer != null && !tlsVer.isEmpty()) {
if (tlsVer.contains("default")) {
Expand Down Expand Up @@ -259,6 +275,14 @@ public void init() {
log.error("Error occured while setting up the RabbitMq Connection. "+e.getMessage());
e.printStackTrace();
}

if (StringUtils.isNotBlank(routingKeyTypeOverrideFilePath)) {
try {
types = new PropertyResourceBundle(new FileInputStream(routingKeyTypeOverrideFilePath));
} catch (IOException e) {
log.error("Cannot find routing key file. "+e.getMessage());
}
}
}

/**
Expand Down Expand Up @@ -337,6 +361,12 @@ private void initService() {
if (waitForConfirmsTimeOut == null ) {
waitForConfirmsTimeOut = Long.getLong(getValuesFromSystemProperties(protocol + ".rabbitmq.waitForConfirmsTimeOut"));
}

if (protocol.equalsIgnoreCase(SEMANTICS_MESSAGE_PROTOCOL)
&& (routingKeyTypeOverrideFilePath == null || routingKeyTypeOverrideFilePath.isBlank())) {
routingKeyTypeOverrideFilePath = getValuesFromSystemProperties(PropertiesConfig.SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH);
}

}


Expand All @@ -352,6 +382,7 @@ private void setValues() {
usePersitance = Boolean.getBoolean(PropertiesConfig.USE_PERSISTENCE);
createExchangeIfNotExisting = Boolean.parseBoolean(getValuesFromSystemProperties(PropertiesConfig.CREATE_EXCHANGE_IF_NOT_EXISTING));
tcpTimeOut = Integer.getInteger(PropertiesConfig.TCP_TIMEOUT);
routingKeyTypeOverrideFilePath = getValuesFromSystemProperties(PropertiesConfig.SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH);
}

private String getValuesFromSystemProperties(String propertyName) {
Expand Down Expand Up @@ -556,4 +587,35 @@ private Channel giveMeRandomChannel() throws RemRemPublishException {
factory, e);
}
}

/**
* This method is used to get routing key type based on the eventType from the configuration file
*
* @param eventType
* Eiffel eventType
* @return type based on eventType if provided in the configuration file else null
*/
public String getTypeRoutingKeyFromConfiguration(String eventType) {

if (types != null) {
String key = eventType + DOT + TYPE;
try {
String routingKey = types.getString(key);
if (!routingKey.isBlank()) {
return routingKey;
}else {
log.warn("Routing key from configuration is empty for :"+ key);
}
} catch (MissingResourceException e) {
log.warn("Routing key from configuration is null for :"+ key);
return null;
}
}else {
log.error("Uninitialized routing key configuration file ");
}

return null;
}


}
2 changes: 2 additions & 0 deletions publish-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jasypt.encryptor.jasyptKeyFilePath: ""
rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "virtualHost": "", "username": "guest", "password": "guest", "tls": "", "exchangeName": "ertest1234", "channelsCount": "1", "domainId": "eiffelxxx", "createExchangeIfNotExisting":true,"waitForConfirmsTimeOut":"5000", "tcpTimeOut": "5000" }, \
{ "mp": "eiffel3", "host": "127.0.0.1", "port": "5672", "virtualHost": "", "username": "guest", "password": "guest", "tls": "", "exchangeName": "eiffel3", "domainId": "eiffelxxx", "channelsCount": "1", "createExchangeIfNotExisting":true,"waitForConfirmsTimeOut":"5000", "tcpTimeOut": "5000" }]

#semanticsRoutingKeyTypeOverrideFilepath: <The complete file path to read properties of type and family to prepare routing key>

# properties for server used to generate messages
generate.server.uri: http://127.0.0.1:8080
Expand All @@ -44,3 +45,4 @@ activedirectory.connectionTimeOut:
spring.mvc.pathmatch.matching-strategy: ANT_PATH_MATCHER



Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testGenerateRoutingKey() throws Exception {
JsonParser parser = new JsonParser();
JsonElement json = parser.parse(new FileReader(file)).getAsJsonObject();
String routingKey = messageService.generateRoutingKey(json.getAsJsonObject(), null, null, null);
assertEquals("eiffel.activity.finished.notag.eiffeltest", routingKey);
assertEquals("eiffel.activity.EiffelActivityFinishedEvent.notag.eiffeltest", routingKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.fail;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
Expand All @@ -39,6 +40,7 @@
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.ericsson.eiffel.remrem.protocol.MsgService;
import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig;
import com.ericsson.eiffel.remrem.publish.exception.NackException;
import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException;
import com.ericsson.eiffel.remrem.publish.helper.PublishUtils;
Expand Down Expand Up @@ -215,8 +217,37 @@ public void testRoutingKey() throws Exception {
JsonElement json = parser.parse(new FileReader(file)).getAsJsonObject();
routingKey = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, "fem001", null, null);
if(routingKey != null) {
assertEquals("eiffel.activity.finished.notag.eiffeltest.fem001", routingKey);
assertEquals("eiffel.activity.EiffelActivityFinishedEvent.notag.eiffeltest.fem001", routingKey);
}
}
}

@Test
public void testRoutingKeyPriorSepia() throws FileNotFoundException {
RabbitMqProperties semanticsProperties = rmqHelper.getRabbitMqPropertiesMap().get("eiffelsemantics");

try {
// Simulate existence of routing key property file
System.setProperty(PropertiesConfig.SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH,
"src/test/resources/routing-key-overrides.properties");
semanticsProperties.init();

MsgService msgService = PublishUtils.getMessageService(protocol, msgServices);
String routingKey;
if (msgService != null) {
File file = new File("src/test/resources/EiffelActivityFinishedEvent.json");
JsonParser parser = new JsonParser();
JsonElement json = parser.parse(new FileReader(file)).getAsJsonObject();
routingKey = PublishUtils.getRoutingKey(msgService, json.getAsJsonObject(), rmqHelper, "fem001", null, null);
if (routingKey != null) {
assertEquals("eiffel.activity.finished.notag.eiffeltest.fem001", routingKey);
}
}
}
finally {
// Restore non-existence of routing key property file
System.clearProperty(PropertiesConfig.SEMANTICS_ROUTINGKEY_TYPE_OVERRIDE_FILEPATH);
semanticsProperties.init();
}
}
}
Loading

0 comments on commit 7ade041

Please sign in to comment.