From d83afb8edd36a55338246d21b12cf2dd497d53dc Mon Sep 17 00:00:00 2001 From: saurabh Date: Mon, 14 Dec 2020 12:50:25 +0530 Subject: [PATCH 01/26] testing rabbitMq --- src/eazyserver/core/manager.py | 30 +++- src/eazyserver/core/rabbitMqConnector.py | 218 +++++++++++++++++++++++ 2 files changed, 239 insertions(+), 9 deletions(-) create mode 100644 src/eazyserver/core/rabbitMqConnector.py diff --git a/src/eazyserver/core/manager.py b/src/eazyserver/core/manager.py index d4df977..34a0895 100644 --- a/src/eazyserver/core/manager.py +++ b/src/eazyserver/core/manager.py @@ -7,6 +7,7 @@ import signal from .kafka_connector import KafkaConnector +from .rabbitMqConnector import RabbitMqConnector import threading from .vedaio import VedaSocketIO @@ -17,18 +18,29 @@ def __init__(self, **kwargs): super(Manager, self).__init__() self.behaviour = kwargs.get('behaviour') - self.connector_type = kwargs.get('connector_type') - self.kafka_client_type = kwargs.get('kafka_client_type') - self.kafka_client_config = kwargs.get('kafka_client_config') + self.connector_type = kwargs.get('connector_type','kafka') + self.kafka_client_type = kwargs.get('kafka_client_type',None) + self.kafka_client_config = kwargs.get('kafka_client_config',None) + self.client_config=kwargs.get('client_config',None) + + self.pid = os.getpid() self.exit_code = 0 - self.connected_behaviour = KafkaConnector( - self.behaviour, - kafka_client_type=self.kafka_client_type, - on_exit=self.stop, - **self.kafka_client_config) - + if connector_type == 'kafka': + self.connected_behaviour = KafkaConnector( + self.behaviour, + kafka_client_type=self.kafka_client_type, + on_exit=self.stop, + **self.kafka_client_config) + + if connector_type == 'rabbitMq': + self.rabbit_client_config = kwargs.get('rabbit_client_config') + self.connected_behaviour = RabbitMqConnector( + self.behaviour, + on_exit=self.stop, + **self.client_config) + self.signal_map = kwargs.get('signal_map', {}) # Set Kafka Enable/Disable on SIGUSR2 (12) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py new file mode 100644 index 0000000..49ac384 --- /dev/null +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -0,0 +1,218 @@ +import logging +logger = logging.getLogger(__name__) +logger.debug("Loaded " + __name__) + +import os +import json +import time +import sys +import traceback +from prettyprinter import pprint +from bson.objectid import ObjectId +from datetime import datetime + +import rabbitMqConnector as Connector + + +# TODO: Move/Add formatOutput to behaviour base class +# Created following fields in output dict if missing: +# _id,_created,_updated,source_id,_type,_producer +def formatOutput(output,behavior,source_data=None): + if "_id" not in output: output["_id"] = str(ObjectId()) + if "_updated" not in output: output["_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + if "_type" not in output: output["_type"] = "BEHAVIOUR" #TODO take from behavior object + if "_producer" not in output: output["_producer"] = "{}:{}:{}".format(behavior.__class__.__name__,"1.0",behavior.id) #name:version:id #TODO take version from behaviour + + # Source chaining for stream + if "source_id" not in output: + if source_data: # Select rightmost consumer + output["source_id"] = source_data[-1]["_id"] + else: # This is Producer + output["source_id"] = output["_id"] + + # source_config chaining for stream + if source_data: # Select rightmost consumer + output_source_config = source_data[-1]["source_config"] + else: + # init from behaviour config values + output_source_config ={ + "organization":behavior.config.get("organization", ""), + "hub":behavior.config.get("hub", ""), + "camera":behavior.config.get("camera", behavior.config.get("_id", "")), + "behaviourType":behavior.config.get("behaviourType", ""), + "behaviour":behavior.config.get("_id", ""), + } + # Handle embedded=true case + for key,value in output_source_config.items(): + if type(value) ==dict: + output_source_config[key] = value.get("_id","") + # Handle camera type + if output_source_config["behaviour"] == output_source_config["camera"]: + output_source_config["behaviour"] = "" + output_source_config["behaviourType"] = "camera" + + output_source_config.update(output.get("source_config",{})) + output["source_config"]=output_source_config + + if "_created" not in output: + if output["source_id"] is None or output["source_id"] == output["_id"]: + output["_created"] = output["_updated"] + else: + # Propagate _created from input data which is source (_id of input specified as source_id of output) + for data in source_data: + if output["source_id"] == data["_id"]: + output["_created"] = data["_created"] + break + # Propagate _created time based upon same source_id of input data + for data in source_data: + if output["source_id"] == data["source_id"]: + output["_created"] = data["_created"] + break + + if "_created" not in output: + logger.info("{} | source_id {} not found for id {}".format(output["_producer"],output["source_id"],output["_id"])) + output["_created"] = output["_updated"] + + return output + +############################# +## Main Connector Class +############################# + +class RabbitMqConnector(object): + Type = "RabbitMqConnector" + + def __init__(self, Behaviour, client_type="rabbitMq", on_exit=None, **kwargs): + self.should_stop =False + self.client = None + self.behavior = Behaviour + + self.client_type = client_type + self.client_config = kwargs + self.exit_callbacks=[] + if on_exit: self.exit_callbacks.append(on_exit) + + # TODO : Validate **kwargs + + RABBIT_SERVER_CONFIG={ + 'host':"queue.vedalabs.in", + 'user':'guest', + 'password':'guest', + 'port':5672 + } + + REST_API_CONFIG=self.client_config.get("restApiConfig",None) + + self.consumerTopics=self.client_config.get("consumerTopics",None) + self.producerTopic=self.client_config.get("producerTopic",None) + self.consumerSubscriptions=self.client_config.get("consumerSubscriptions",None) + self.producerSubscriptions=self.client_config.get("producerSubscriptions",None) + self.consumerSyncTopics=self.client_config.get("consumerSyncTopics",None) + self.consumerSyncMode=self.client_config.get("consumerSyncMode",False) + + print("="*50) + print("Printing kwargs...") + for k,v in kwargs.items(): + print(k, v) + print("="*50) + + # Create client based on type of Kafka Client specified + queueId=self.behavior.config.get("_id","") + + self.client=Connector.RabbitMqConnector(rabbit_server_config=RABBIT_SERVER_CONFIG, + consumerCallback=self.consume, + subscriptionCallback=self.update, + consumerTopics=self.consumerTopics, + consumerSyncTopics=self.consumerSyncTopics, + producerTopic=self.producerTopic, + rest_api_config=REST_API_CONFIG, + sender_exchange="BEHAVIOUR_EVENTS", + receiver_exchange="BEHAVIOUR_EVENTS", + queueId=queueId) + + + def stop(self): + self.should_stop=True + logger.info("Behaviour is schedule for shutdown.") + self.client.stop() + + def send(self,output): + output = formatOutput(output, self.behavior) + self.client.send(producerTopic=self.producerTopic,message=output) + + + ###### Update Related Functions + # Topics to be subscribed + def subscriptionTopics(self,subscriptions=[]): + subscriptions = self.behavior.subscriptionTopics(subscriptions) + return subscriptions + + # update event callback + def update(self, data,props=None,methods=None): + logger.debug("KafkaConnector: Update triggered with data:{}".format(data)) + UpdateSuccess = self.behavior.update(data) + logger.debug("KafkaConnector: Hot update status:{}".format(UpdateSuccess)) + + return UpdateSuccess + + + def consume(self,message,props=None,methods=None): + try: + consumerTopic=methods.routing_key + print("="*50) + print("Consuming Message") + print("self.consumer_topic",consumerTopic) + print("="*50) + + output=self.behaviour.run(message) + + if output: + self.send(output) + except Exception as e: + logger.error("Exception in Behaviour code:{}",e) + self.client.stop() + print("-"*60) + traceback.print_exc(file=sys.stdout) + self.on_exit(101) + print("-"*60) + exit(101) + + + + + def run(self,app): + app.app_context().push() + while(not self.should_stop): + try: + if (self.consumerTopics ==None and self.consumerSyncTopics ==None): + output=self.behavior.run() + if output: + self.send(output) + + if self.consumerSyncTopics: + if self.consumerSyncMode !=True: + for topic in self.consumerSyncTopics: + message=self.client.consume_sync(topic) + output=self.behavior.run(message) + if output: + self.send(output) + + else: + message=self.client.consume_sync_all() + output=self.behavior.run(message) + if output: + self.send(output) + + except Exception as e: + logger.error("Exception in Behaviour code:{}",e) + self.client.stop() + print("-"*60) + traceback.print_exc(file=sys.stdout) + self.on_exit(101) + print("-"*60) + exit(101) + + + def on_exit(self,exit_code): + for callback in self.exit_callbacks: + callback(exit_code) From d47d40dd84cdec307747fd19840a02f08126c354 Mon Sep 17 00:00:00 2001 From: saurabh Date: Mon, 14 Dec 2020 13:07:43 +0530 Subject: [PATCH 02/26] requirements update --- requirements_dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements_dev.txt b/requirements_dev.txt index 296530c..1b2bb62 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -31,3 +31,4 @@ diskcache==4.1.0 #https://github.com/pyeve/eve/issues/1359 werkzeug==0.16.1 +rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git From 21550a05f4b585c9f1ec2287d57a4f3623eeed26 Mon Sep 17 00:00:00 2001 From: Rohit Sethi Date: Mon, 14 Dec 2020 13:24:29 +0530 Subject: [PATCH 03/26] Fixed: connector_type name to self.connector_type --- src/eazyserver/core/manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eazyserver/core/manager.py b/src/eazyserver/core/manager.py index 34a0895..7e8c677 100644 --- a/src/eazyserver/core/manager.py +++ b/src/eazyserver/core/manager.py @@ -27,14 +27,14 @@ def __init__(self, **kwargs): self.pid = os.getpid() self.exit_code = 0 - if connector_type == 'kafka': + if self.connector_type == 'kafka': self.connected_behaviour = KafkaConnector( self.behaviour, kafka_client_type=self.kafka_client_type, on_exit=self.stop, **self.kafka_client_config) - if connector_type == 'rabbitMq': + if self.connector_type == 'rabbitMq': self.rabbit_client_config = kwargs.get('rabbit_client_config') self.connected_behaviour = RabbitMqConnector( self.behaviour, From d6e7899d052df11ba46a5b6240e00f62c38a0b47 Mon Sep 17 00:00:00 2001 From: saurabh Date: Mon, 14 Dec 2020 14:56:14 +0530 Subject: [PATCH 04/26] run behaviour only if message is present --- src/eazyserver/core/rabbitMqConnector.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 49ac384..74ae24a 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -193,15 +193,17 @@ def run(self,app): if self.consumerSyncMode !=True: for topic in self.consumerSyncTopics: message=self.client.consume_sync(topic) - output=self.behavior.run(message) - if output: - self.send(output) + if message: + output=self.behavior.run(message) + if output: + self.send(output) else: message=self.client.consume_sync_all() - output=self.behavior.run(message) - if output: - self.send(output) + if message: + output=self.behavior.run(message) + if output: + self.send(output) except Exception as e: logger.error("Exception in Behaviour code:{}",e) @@ -211,6 +213,7 @@ def run(self,app): self.on_exit(101) print("-"*60) exit(101) + time.sleep(0.01) def on_exit(self,exit_code): From 38ad5549d5e77cde6af43fb253d2eb2cef11ece6 Mon Sep 17 00:00:00 2001 From: saurabh Date: Mon, 14 Dec 2020 18:15:22 +0530 Subject: [PATCH 05/26] fix multiple print issues --- src/eazyserver/core/rabbitMqConnector.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 74ae24a..cf1052d 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -94,12 +94,12 @@ def __init__(self, Behaviour, client_type="rabbitMq", on_exit=None, **kwargs): # TODO : Validate **kwargs - RABBIT_SERVER_CONFIG={ + RABBIT_SERVER_CONFIG=self.client_config.get("rabbitServerConfig",{ 'host':"queue.vedalabs.in", 'user':'guest', 'password':'guest', 'port':5672 - } + }) REST_API_CONFIG=self.client_config.get("restApiConfig",None) @@ -159,11 +159,6 @@ def update(self, data,props=None,methods=None): def consume(self,message,props=None,methods=None): try: consumerTopic=methods.routing_key - print("="*50) - print("Consuming Message") - print("self.consumer_topic",consumerTopic) - print("="*50) - output=self.behaviour.run(message) if output: @@ -179,7 +174,6 @@ def consume(self,message,props=None,methods=None): - def run(self,app): app.app_context().push() while(not self.should_stop): From 9592e620a9bd93819d890906bd8e08e417dc93ac Mon Sep 17 00:00:00 2001 From: saurabh Date: Tue, 15 Dec 2020 14:50:35 +0530 Subject: [PATCH 06/26] disable socket io connection --- src/eazyserver/core/manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/eazyserver/core/manager.py b/src/eazyserver/core/manager.py index 7e8c677..30bba7f 100644 --- a/src/eazyserver/core/manager.py +++ b/src/eazyserver/core/manager.py @@ -49,7 +49,8 @@ def __init__(self, **kwargs): signal.signal(signal.SIGTERM, self.receiveSignal) # Socket IO based Live updates - if not self.connected_behaviour.behavior.offlineMode: + + if not self.connected_behaviour.behavior.offlineMode and self.connector_type!='rabbitMq': self.socketClient=VedaSocketIO(subscriptions=self.subscriptionTopics()) self.registerUpdateHandler() From a1f83b2d25ed8ecfaee577302283730e697b8b6c Mon Sep 17 00:00:00 2001 From: saurabh Date: Tue, 15 Dec 2020 15:28:26 +0530 Subject: [PATCH 07/26] fixed source data issue --- src/eazyserver/core/rabbitMqConnector.py | 27 ++++++++++++------------ 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index cf1052d..398f474 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -59,15 +59,16 @@ def formatOutput(output,behavior,source_data=None): output["_created"] = output["_updated"] else: # Propagate _created from input data which is source (_id of input specified as source_id of output) - for data in source_data: - if output["source_id"] == data["_id"]: - output["_created"] = data["_created"] - break - # Propagate _created time based upon same source_id of input data - for data in source_data: - if output["source_id"] == data["source_id"]: - output["_created"] = data["_created"] - break + if source_data: + for data in source_data: + if output["source_id"] == data["_id"]: + output["_created"] = data["_created"] + break + # Propagate _created time based upon same source_id of input data + for data in source_data: + if output["source_id"] == data["source_id"]: + output["_created"] = data["_created"] + break if "_created" not in output: logger.info("{} | source_id {} not found for id {}".format(output["_producer"],output["source_id"],output["_id"])) @@ -136,8 +137,8 @@ def stop(self): logger.info("Behaviour is schedule for shutdown.") self.client.stop() - def send(self,output): - output = formatOutput(output, self.behavior) + def send(self,output,source_data): + output = formatOutput(output, self.behavior,source_data) self.client.send(producerTopic=self.producerTopic,message=output) @@ -162,7 +163,7 @@ def consume(self,message,props=None,methods=None): output=self.behaviour.run(message) if output: - self.send(output) + self.send(output,[message]) except Exception as e: logger.error("Exception in Behaviour code:{}",e) self.client.stop() @@ -190,7 +191,7 @@ def run(self,app): if message: output=self.behavior.run(message) if output: - self.send(output) + self.send(output,[message]) else: message=self.client.consume_sync_all() From 4cf49e5d4fd02507152ed1cb4eae6dbb4ea92950 Mon Sep 17 00:00:00 2001 From: saurabh Date: Tue, 15 Dec 2020 16:00:46 +0530 Subject: [PATCH 08/26] fixed bug --- src/eazyserver/core/rabbitMqConnector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 398f474..8c037f0 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -137,7 +137,7 @@ def stop(self): logger.info("Behaviour is schedule for shutdown.") self.client.stop() - def send(self,output,source_data): + def send(self,output,source_data=None): output = formatOutput(output, self.behavior,source_data) self.client.send(producerTopic=self.producerTopic,message=output) @@ -201,7 +201,7 @@ def run(self,app): self.send(output) except Exception as e: - logger.error("Exception in Behaviour code:{}",e) + logger.error("Exception in Behaviour code:{}".format(str(e))) self.client.stop() print("-"*60) traceback.print_exc(file=sys.stdout) From c92251170883b73991cf6fe63229332606d3a4d1 Mon Sep 17 00:00:00 2001 From: saurabh Date: Tue, 22 Dec 2020 12:09:14 +0530 Subject: [PATCH 09/26] fixing async consumer call back --- src/eazyserver/core/rabbitMqConnector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 8c037f0..2cd3b6b 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -121,7 +121,7 @@ def __init__(self, Behaviour, client_type="rabbitMq", on_exit=None, **kwargs): queueId=self.behavior.config.get("_id","") self.client=Connector.RabbitMqConnector(rabbit_server_config=RABBIT_SERVER_CONFIG, - consumerCallback=self.consume, + topicCallback=self.consume, subscriptionCallback=self.update, consumerTopics=self.consumerTopics, consumerSyncTopics=self.consumerSyncTopics, From 58c11be248e80454ef4a6dfee8e718d3668e9fb7 Mon Sep 17 00:00:00 2001 From: saurabh Date: Tue, 22 Dec 2020 12:30:05 +0530 Subject: [PATCH 10/26] fixing spelling --- src/eazyserver/core/rabbitMqConnector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 2cd3b6b..b563ad9 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -160,12 +160,12 @@ def update(self, data,props=None,methods=None): def consume(self,message,props=None,methods=None): try: consumerTopic=methods.routing_key - output=self.behaviour.run(message) + output=self.behavior.run(message) if output: self.send(output,[message]) except Exception as e: - logger.error("Exception in Behaviour code:{}",e) + logger.error("Exception in Behaviour code:{}".format(str(e))) self.client.stop() print("-"*60) traceback.print_exc(file=sys.stdout) From 45a173637353c570ff7bb2aaa2281fe87241d07c Mon Sep 17 00:00:00 2001 From: saurabh Date: Wed, 30 Dec 2020 14:40:05 +0530 Subject: [PATCH 11/26] added async lock --- src/eazyserver/core/rabbitMqConnector.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index b563ad9..21ef658 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -119,6 +119,7 @@ def __init__(self, Behaviour, client_type="rabbitMq", on_exit=None, **kwargs): # Create client based on type of Kafka Client specified queueId=self.behavior.config.get("_id","") + self.asyncLock=False self.client=Connector.RabbitMqConnector(rabbit_server_config=RABBIT_SERVER_CONFIG, topicCallback=self.consume, @@ -158,20 +159,26 @@ def update(self, data,props=None,methods=None): def consume(self,message,props=None,methods=None): + print ("consume called for msg") try: + while(self.asyncLock==True): + time.sleep(0.5) + print("waiting for async lock") + self.asyncLock=True consumerTopic=methods.routing_key - output=self.behavior.run(message) - + output=self.behavior.run(message) if output: self.send(output,[message]) + self.asyncLock=False except Exception as e: + self.asyncLock=False logger.error("Exception in Behaviour code:{}".format(str(e))) self.client.stop() print("-"*60) traceback.print_exc(file=sys.stdout) self.on_exit(101) print("-"*60) - exit(101) + exit(101) @@ -198,7 +205,7 @@ def run(self,app): if message: output=self.behavior.run(message) if output: - self.send(output) + self.send(output,[message]) except Exception as e: logger.error("Exception in Behaviour code:{}".format(str(e))) From 636467924e6005ae88af6af4c07d99cc6851df3d Mon Sep 17 00:00:00 2001 From: saurabh Date: Wed, 30 Dec 2020 14:40:50 +0530 Subject: [PATCH 12/26] added async lock continued --- src/eazyserver/core/rabbitMqConnector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 21ef658..653faeb 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -162,7 +162,7 @@ def consume(self,message,props=None,methods=None): print ("consume called for msg") try: while(self.asyncLock==True): - time.sleep(0.5) + time.sleep(0.1) print("waiting for async lock") self.asyncLock=True consumerTopic=methods.routing_key From 7a49b0ef5e4b221c6cf0b57ff6debfe5409d011b Mon Sep 17 00:00:00 2001 From: saurabh Date: Thu, 31 Dec 2020 10:28:31 +0530 Subject: [PATCH 13/26] thread safe rabbit mq --- requirements_dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index 1b2bb62..11ca1e8 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -31,4 +31,4 @@ diskcache==4.1.0 #https://github.com/pyeve/eve/issues/1359 werkzeug==0.16.1 -rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git +rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@thread_safe From f0edb64049952a5fcaaca4a4db54b15d10667bbd Mon Sep 17 00:00:00 2001 From: saurabh Date: Thu, 31 Dec 2020 12:00:40 +0530 Subject: [PATCH 14/26] changing branch-hot fix --- requirements_dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index 11ca1e8..c76cacd 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -31,4 +31,4 @@ diskcache==4.1.0 #https://github.com/pyeve/eve/issues/1359 werkzeug==0.16.1 -rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@thread_safe +rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@rabbit_latest From b1d012c47edd45bab09c206b5c90ec32d4350be5 Mon Sep 17 00:00:00 2001 From: saurabh Date: Sun, 3 Jan 2021 17:43:43 +0530 Subject: [PATCH 15/26] added client to behaviour for dynamic subscription calling --- src/eazyserver/core/rabbitMqConnector.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 653faeb..6879162 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -132,7 +132,9 @@ def __init__(self, Behaviour, client_type="rabbitMq", on_exit=None, **kwargs): receiver_exchange="BEHAVIOUR_EVENTS", queueId=queueId) - + # Add rabbit mq client to the behaviour object as well + self.behavior.connector_client=self.client + def stop(self): self.should_stop=True logger.info("Behaviour is schedule for shutdown.") @@ -189,7 +191,7 @@ def run(self,app): if (self.consumerTopics ==None and self.consumerSyncTopics ==None): output=self.behavior.run() if output: - self.send(output) + self.send(output) if self.consumerSyncTopics: if self.consumerSyncMode !=True: From cdb6fd61294babd40ff9294ae529e278c8fb3389 Mon Sep 17 00:00:00 2001 From: saurabh Date: Tue, 5 Jan 2021 19:55:48 +0530 Subject: [PATCH 16/26] added config for sender and receiver rabbitmq servers --- src/eazyserver/core/rabbitMqConnector.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 6879162..3af0e2d 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -110,6 +110,8 @@ def __init__(self, Behaviour, client_type="rabbitMq", on_exit=None, **kwargs): self.producerSubscriptions=self.client_config.get("producerSubscriptions",None) self.consumerSyncTopics=self.client_config.get("consumerSyncTopics",None) self.consumerSyncMode=self.client_config.get("consumerSyncMode",False) + self.sender_rabbit_server_config=self.client_config.get("sender_rabbit_server_config",None) + self.receiver_rabbit_server_config=self.client_config.get("receiver_rabbit_server_config",None) print("="*50) print("Printing kwargs...") @@ -121,7 +123,7 @@ def __init__(self, Behaviour, client_type="rabbitMq", on_exit=None, **kwargs): queueId=self.behavior.config.get("_id","") self.asyncLock=False - self.client=Connector.RabbitMqConnector(rabbit_server_config=RABBIT_SERVER_CONFIG, + self.client=Connector.RabbitMqConnector(rabbit_server_config=RABBIT_SERVER_CONFIG, topicCallback=self.consume, subscriptionCallback=self.update, consumerTopics=self.consumerTopics, @@ -130,6 +132,8 @@ def __init__(self, Behaviour, client_type="rabbitMq", on_exit=None, **kwargs): rest_api_config=REST_API_CONFIG, sender_exchange="BEHAVIOUR_EVENTS", receiver_exchange="BEHAVIOUR_EVENTS", + sender_rabbit_server_config=self.sender_rabbit_server_config, + receiver_rabbit_server_config=self.receiver_rabbit_server_config, queueId=queueId) # Add rabbit mq client to the behaviour object as well From 26121026248ca47e6c5bdeac3012ab481c1926ed Mon Sep 17 00:00:00 2001 From: saurabh Date: Fri, 8 Jan 2021 13:37:02 +0530 Subject: [PATCH 17/26] passed missing subscriptions to eazyserver --- src/eazyserver/core/rabbitMqConnector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 3af0e2d..284c29c 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -127,6 +127,7 @@ def __init__(self, Behaviour, client_type="rabbitMq", on_exit=None, **kwargs): topicCallback=self.consume, subscriptionCallback=self.update, consumerTopics=self.consumerTopics, + consumerSubscriptions=self.consumerSubscriptions, consumerSyncTopics=self.consumerSyncTopics, producerTopic=self.producerTopic, rest_api_config=REST_API_CONFIG, From 2222bf80c9b0826e0c18e57eb1154bb794a2550a Mon Sep 17 00:00:00 2001 From: saurabh Date: Mon, 11 Jan 2021 13:50:56 +0530 Subject: [PATCH 18/26] added error check on hot updates --- src/eazyserver/core/rabbitMqConnector.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 284c29c..6ab6243 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -158,9 +158,20 @@ def subscriptionTopics(self,subscriptions=[]): # update event callback def update(self, data,props=None,methods=None): - logger.debug("KafkaConnector: Update triggered with data:{}".format(data)) - UpdateSuccess = self.behavior.update(data) - logger.debug("KafkaConnector: Hot update status:{}".format(UpdateSuccess)) + logger.debug("RabbitMqConnector: Update triggered with data:{}".format(data)) + try: + while(self.asyncLock==True): + time.sleep(0.1) + print("waiting for async lock") + self.asyncLock=True + UpdateSuccess = self.behavior.update(data) + self.asyncLock=False + except Exception as e: + self.asyncLock=False + logger.error("Exception in Behaviour code:{}".format(str(e))) + logger.info(traceback.format_exc()) + logger.debug("RabbitMqConnector: Hot update status:{}".format(UpdateSuccess)) + return UpdateSuccess From 6b12c4b4feeb264a1d6ae33021c6c343a87e1362 Mon Sep 17 00:00:00 2001 From: saurabh Date: Mon, 11 Jan 2021 16:42:19 +0530 Subject: [PATCH 19/26] changed version of rabbitMq --- requirements_dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index c76cacd..03825b2 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -31,4 +31,4 @@ diskcache==4.1.0 #https://github.com/pyeve/eve/issues/1359 werkzeug==0.16.1 -rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@rabbit_latest +rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@v1.0.0 From 7c971623ae613efe6660a3a444ff98afdaf997aa Mon Sep 17 00:00:00 2001 From: saurabh Date: Mon, 11 Jan 2021 17:39:30 +0530 Subject: [PATCH 20/26] changed release version to 1.0.1 rabbit mw --- requirements_dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index 03825b2..e048118 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -31,4 +31,4 @@ diskcache==4.1.0 #https://github.com/pyeve/eve/issues/1359 werkzeug==0.16.1 -rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@v1.0.0 +rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@v1.0.1 From 9ad380026137c7d202271d11e60934be1b10b022 Mon Sep 17 00:00:00 2001 From: saurabh Date: Mon, 11 Jan 2021 17:40:42 +0530 Subject: [PATCH 21/26] changed release version to 1.0.1 rabbit mQ --- requirements_dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index e048118..bdc39cc 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -31,4 +31,4 @@ diskcache==4.1.0 #https://github.com/pyeve/eve/issues/1359 werkzeug==0.16.1 -rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@v1.0.1 +rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@V1.0.1 From 623454eb07873561668c7fc82b7b2013b2f022dd Mon Sep 17 00:00:00 2001 From: saurabh Date: Mon, 11 Jan 2021 18:05:01 +0530 Subject: [PATCH 22/26] updated rabbit version --- requirements_dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index bdc39cc..69d5e90 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -31,4 +31,4 @@ diskcache==4.1.0 #https://github.com/pyeve/eve/issues/1359 werkzeug==0.16.1 -rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@V1.0.1 +rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@v1.0.2 From e45760de9e5ff26fbc6939547bbb9ca4538e4c7d Mon Sep 17 00:00:00 2001 From: saurabh Date: Tue, 12 Jan 2021 12:35:53 +0530 Subject: [PATCH 23/26] testing with new version rabbitmq --- requirements_dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index 69d5e90..8a01163 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -31,4 +31,4 @@ diskcache==4.1.0 #https://github.com/pyeve/eve/issues/1359 werkzeug==0.16.1 -rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@v1.0.2 +rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@v1.0.3 From ed2e29624a60b3fd0d305f00015e083a55efbf7f Mon Sep 17 00:00:00 2001 From: saurabh Date: Tue, 12 Jan 2021 15:51:00 +0530 Subject: [PATCH 24/26] fixed runtime error --- src/eazyserver/core/rabbitMqConnector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eazyserver/core/rabbitMqConnector.py b/src/eazyserver/core/rabbitMqConnector.py index 6ab6243..5eb5c3e 100644 --- a/src/eazyserver/core/rabbitMqConnector.py +++ b/src/eazyserver/core/rabbitMqConnector.py @@ -168,6 +168,7 @@ def update(self, data,props=None,methods=None): self.asyncLock=False except Exception as e: self.asyncLock=False + UpdateSuccess=False logger.error("Exception in Behaviour code:{}".format(str(e))) logger.info(traceback.format_exc()) logger.debug("RabbitMqConnector: Hot update status:{}".format(UpdateSuccess)) From bf97d4d3e9d8a46ea126dcc055fc926b199404d7 Mon Sep 17 00:00:00 2001 From: saurabh Date: Tue, 12 Jan 2021 16:29:21 +0530 Subject: [PATCH 25/26] updated version mq-final testing --- requirements_dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements_dev.txt b/requirements_dev.txt index 8a01163..9891900 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -31,4 +31,4 @@ diskcache==4.1.0 #https://github.com/pyeve/eve/issues/1359 werkzeug==0.16.1 -rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@v1.0.3 +rabbitMqConnector@ git+https://github.com/MacherLabs/rabbitMqConnector.git@v1.0.4 From 1931a5d841110debc9ba6ea7737521d91c8fb4a3 Mon Sep 17 00:00:00 2001 From: saurabh Date: Wed, 13 Jan 2021 13:58:47 +0530 Subject: [PATCH 26/26] changed rabbit client config name --- src/eazyserver/core/manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eazyserver/core/manager.py b/src/eazyserver/core/manager.py index 30bba7f..eb67fdb 100644 --- a/src/eazyserver/core/manager.py +++ b/src/eazyserver/core/manager.py @@ -21,7 +21,7 @@ def __init__(self, **kwargs): self.connector_type = kwargs.get('connector_type','kafka') self.kafka_client_type = kwargs.get('kafka_client_type',None) self.kafka_client_config = kwargs.get('kafka_client_config',None) - self.client_config=kwargs.get('client_config',None) + self.rabbit_client_config=kwargs.get('rabbit_client_config',None) self.pid = os.getpid() @@ -39,7 +39,7 @@ def __init__(self, **kwargs): self.connected_behaviour = RabbitMqConnector( self.behaviour, on_exit=self.stop, - **self.client_config) + **self.self.rabbit_client_config) self.signal_map = kwargs.get('signal_map', {})