diff --git a/README.md b/README.md index ed4067e..90102b7 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ To run all components of the MANO framework you have to start their containers. The parameter `broker_host` provides the url on which the message broker can be found. It is build as `amqp://:@:5672/%2F`. -With the deployment of the SLM, it is possible to add some parameters to the command, to indicate the urls where the SLM can locate the VNFR, NSR and MONITORING repositories. These parameters are optional, and only useful if the MANO Framework is used inside the full setup of the SONATA service platform. +With the deployment of the SLM, it is possible to add some parameters to the command, to indicate the urls where the SLM can locate the VNFR, NSR and MONITORING repositories. These parameters are optional, and only useful if the MANO framework is used inside the full setup of the SONATA service platform. Runtime information for these docker containers can be accessed through the standard docker commands: diff --git a/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm.py b/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm.py index 0ab7afb..883e1b5 100644 --- a/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm.py +++ b/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm.py @@ -35,7 +35,7 @@ import threading import sys import concurrent.futures as pool -#import psutil +# import psutil from sonmanobase.plugin import ManoBasePlugin try: @@ -73,19 +73,11 @@ # The topic to which available vims are published INFRA_ADAPTOR_AVAILABLE_VIMS = 'infrastructure.management.compute.list' -# Topics for interaction with the specific manager registry +# Topics for interaction with the specific manager registry SRM_ONBOARD = 'specific.manager.registry.ssm.on-board' SRM_START = 'specific.manager.registry.ssm.instantiate' SRM_UPDATE = 'specific.manager.registry.ssm.update' -# The NSR Repository can be accessed through a RESTful -# API. Links are red from ENV variables. -#NSR_REPOSITORY_URL = os.environ.get("url_nsr_repository") -#VNFR_REPOSITORY_URL = os.environ.get("url_vnfr_repository") - -# Monitoring repository, can be accessed throught a RESTful -# API. Link is red from ENV variable. - class ServiceLifecycleManager(ManoBasePlugin): """ @@ -110,7 +102,7 @@ def __init__(self, # Create the ledger that saves state self.services = {} - #The frequency of state sharing events + # The frequency of state sharing events self.state_share_frequency = 1 # Create a configuration dict that contains config info of SLM @@ -147,8 +139,8 @@ def __init__(self, ver = "0.1-dev" des = "This is the SLM plugin" - super(self.__class__, self).__init__(version=ver, - description=des, + super(self.__class__, self).__init__(version=ver, + description=des, auto_register=auto_register, wait_for_registration=wait_for_registration, start_running=start_running) @@ -194,11 +186,6 @@ def declare_subscriptions(self): # The topic on which monitoring information is received self.manoconn.subscribe(self.monitoring_feedback, t.MON_RECEIVE) - # To be removed when transition to new SLM is completed - self.manoconn.register_async_endpoint( - self.on_gk_service_update, - GK_INSTANCE_UPDATE) - def on_lifecycle_start(self, ch, mthd, prop, msg): """ This event is called when the plugin has successfully registered itself @@ -221,7 +208,7 @@ def deregister(self): LOG.info('Deregistering SLM with uuid ' + str(self.uuid)) message = {"uuid": self.uuid} self.manoconn.notify("platform.management.plugin.deregister", - json.dumps(message)) + json.dumps(message)) os._exit(0) def on_registration_ok(self): @@ -239,10 +226,9 @@ def on_registration_ok(self): # SLM Threading management ########################## - def get_ledger(self, serv_id): - return self.services[serv_id] + return self.services[serv_id] def get_services(self): @@ -263,8 +249,8 @@ def error_handling(self, serv_id, topic, message): 'status': 'ERROR'} corr_id = self.services[serv_id]['original_corr_id'] - self.manoconn.notify(topic, - yaml.dump(message), + self.manoconn.notify(topic, + yaml.dump(message), correlation_id=corr_id) return @@ -280,17 +266,19 @@ def start_next_task(self, serv_id): # If the kill field is active, the chain is killed if self.services[serv_id]['kill_chain']: - LOG.info("Killing workflow with id: " + serv_id) - #TODO: delete SSMs, already deployed fucntions, records, stop monitoring - #TODO: Or, jump into the kill workflow. + LOG.info("Service " + serv_id + ": Killing running workflow") + # TODO: delete SSMs, already deployed fucntions, records, stop + # monitoring + # TODO: Or, jump into the kill workflow. del self.services[serv_id] return # Select the next task, only if task list is not empty if len(self.services[serv_id]['schedule']) > 0: - #share state with other SLMs - next_task = getattr(self, self.services[serv_id]['schedule'].pop(0)) + # share state with other SLMs + next_task = getattr(self, + self.services[serv_id]['schedule'].pop(0)) # Push the next task to the threadingpool task = self.thrd_pool.submit(next_task, serv_id) @@ -313,9 +301,8 @@ def start_next_task(self, serv_id): else: self.start_next_task(serv_id) - else: - #share state with other SLMs + # share state with other SLMs self.slm_share('DONE', self.services[serv_id]) del self.services[serv_id] @@ -329,7 +316,7 @@ def plugin_status(self, ch, method, properties, payload): This method is called when the plugin manager broadcasts new information on the plugins. """ - #TODO: needs unit testing + # TODO: needs unit testing message = yaml.load(payload) @@ -343,20 +330,21 @@ def slm_down(self): has gone missing. This SLM needs to determine whether it should take over unfinished tasks from this SLM. """ - #TODO: needs unit testing + # TODO: needs unit testing for serv_id in self.slm_config['tasks_other_slm'].keys(): + tasks_other_slm = self.slm_config['tasks_other_slm'] # TODO: only take over when ID's match LOG.info('SLM down, taking over requests') - self.services[serv_id] = self.slm_config['tasks_other_slm'][serv_id] + self.services[serv_id] = tasks_other_slm[serv_id] if 'schedule' not in self.services[serv_id].keys(): del self.services[serv_id] - ch = self.slm_config['tasks_other_slm'][serv_id]['ch'] - method = self.slm_config['tasks_other_slm'][serv_id]['method'] - properties = self.slm_config['tasks_other_slm'][serv_id]['properties'] - payload = self.slm_config['tasks_other_slm'][serv_id]['payload'] + ch = self.tasks_other_slm[serv_id]['ch'] + method = tasks_other_slm[serv_id]['method'] + properties = tasks_other_slm[serv_id]['properties'] + payload = tasks_other_slm[serv_id]['payload'] self.service_instance_create(ch, method, properties, payload) @@ -369,19 +357,19 @@ def inter_slm(self, ch, method, properties, payload): """ This method handles messages that are shared between different SLMs. """ - #TODO: needs unit testing + # TODO: needs unit testing msg = yaml.load(payload) if msg['slm_id'] != str(self.uuid): + tasks_other_slm = self.slm_config['tasks_other_slm'] if msg['status'] == 'DONE': - if (str(msg['corr_id'])) in self.slm_config['tasks_other_slm'].keys(): - del self.slm_config['tasks_other_slm'][str(msg['corr_id'])] + if (str(msg['corr_id'])) in tasks_other_slm.keys(): + del tasks_other_slm[str(msg['corr_id'])] if msg['status'] == 'IN PROGRESS': - self.slm_config['tasks_other_slm'][str(msg['corr_id'])] = msg['state'] - + tasks_other_slm[str(msg['corr_id'])] = msg['state'] def service_instance_create(self, ch, method, properties, payload): """ @@ -392,30 +380,27 @@ def service_instance_create(self, ch, method, properties, payload): # Check if the messages comes from the GK or is forward by another SLM message_from_gk = True if properties.app_id == self.name: - message_from_gk = False + message_from_gk = False if properties.reply_to is None: return - - # Bypass for backwards compatibility, to be removed after transition - # to new version of SLM is completed + + # Bypass for backwards compatibility, to be removed after + # transition to new version of SLM is completed message = yaml.load(payload) - # if 'NSD' in message.keys(): - # if message['NSD']['descriptor_version'] == '1.0': - # response = self.on_gk_service_instance_create(ch, method, properties, payload) - # self.manoconn.notify(GK_INSTANCE_CREATE_TOPIC, response, correlation_id=properties.correlation_id) # Extract the correlation id and generate a reduced id corr_id = properties.correlation_id - reduced_id = tools.convert_corr_id(corr_id) + reduced_id = tools.convert_corr_id(corr_id) # If the message comes from another SLM, check if the request has made # a round trip if not message_from_gk: - roundtrip = (reduced_id % self.slm_config['slm_total'] == self.slm_config['slm_rank']) + calc_rank = reduced_id % self.slm_config['slm_total'] + roundtrip = (calc_rank == self.slm_config['slm_rank']) if roundtrip: # If the message made a round trip, a new SLM should be started - # as this implies that the resources are exhausted + # as this implies that the resources are exhausted deploy_new_slm() else: @@ -435,22 +420,22 @@ def service_instance_create(self, ch, method, properties, payload): # Schedule the tasks that the SLM should do for this request. add_schedule = [] - + add_schedule.append('validate_deploy_request') add_schedule.append('contact_gk') - #Onboard and instantiate the SSMs, if required. - if self.services[serv_id]['service']['ssm'] != None: - add_schedule.append('onboard_ssms') - add_schedule.append('instant_ssms') + # Onboard and instantiate the SSMs, if required. + if self.services[serv_id]['service']['ssm'] is not None: + add_schedule.append('onboard_ssms') + add_schedule.append('instant_ssms') - if self.services[serv_id]['service']['ssm'] != None: + if self.services[serv_id]['service']['ssm'] is not None: if 'task' in self.services[serv_id]['service']['ssm'].keys(): add_schedule.append('trigger_task_ssm') add_schedule.append('request_topology') - #Perform the placement + # Perform the placement if self.services[serv_id]['service']['ssm'] is None: add_schedule.append('SLM_mapping') else: @@ -467,10 +452,10 @@ def service_instance_create(self, ch, method, properties, payload): add_schedule.append('start_monitoring') add_schedule.append('inform_gk') - self.services[serv_id]['schedule'].extend(add_schedule) - LOG.info("New instantiation request received. Instantiation started.") + msg = ": New instantiation request received. Instantiation started." + LOG.info("Service " + serv_id + msg) # Start the chain of tasks self.start_next_task(serv_id) @@ -493,7 +478,7 @@ def service_instance_kill(self, ch, method, prop, payload): # Check if the messages comes from the GK or is forward by another SLM message_from_gk = True if prop.app_id == self.name: - message_from_gk = False + message_from_gk = False if properties.reply_to is None: return @@ -501,8 +486,8 @@ def service_instance_kill(self, ch, method, prop, payload): serv_id = content['instance_id'] LOG.info("Termination request received for service " + str(serv_id)) - # Check if the ledger has an entry for this instance, as this method can - # be called from multiple paths + # Check if the ledger has an entry for this instance, as this method + # can be called from multiple paths if serv_id not in self.services.keys(): # Based on the received payload, the ledger entry is recreated. LOG.info("Recreating ledger.") @@ -544,11 +529,11 @@ def resp_topo(self, ch, method, prop, payload): """ message = yaml.load(payload) - LOG.info("Topology received from IA.") - LOG.debug("Requested info on topology: " + str(message)) - # Retrieve the service uuid - serv_id = tools.serv_id_from_corr_id(self.services, prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, prop.correlation_id) + + LOG.info("Service " + serv_id + ": Topology received from IA.") + LOG.debug("Requested info on topology: " + str(message)) # Add topology to ledger self.services[serv_id]['infrastructure']['topology'] = message @@ -556,23 +541,26 @@ def resp_topo(self, ch, method, prop, payload): # Continue with the scheduled tasks self.start_next_task(serv_id) - - def resp_onboard(self, ch, method, prop, payload): + def resp_onboard(self, ch, method, prop, payload): """ This function handles responses to a request to onboard the ssms of a new service. """ - LOG.info("Onboarding response received from SMR.") # Retrieve the service uuid - serv_id = tools.serv_id_from_corr_id(self.services, prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, prop.correlation_id) + LOG.info("Service " + serv_id + ": Onboarding resp received from SMR.") message = yaml.load(payload) - if message['error'] is None: - LOG.info("SSMs onboarded succesfully") - else: - LOG.info("SSM onboarding failed: " + response['error']) - self.error_handling(serv_id, t.GK_CREATE, message['error']) + for key in message.keys(): + if message[key]['error'] == 'None': + LOG.info("Service " + serv_id + ": SSMs onboarded succesfully") + else: + msg = ": SSM onboarding failed: " + message[key]['error'] + LOG.info("Service " + serv_id + msg) + self.error_handling(serv_id, + t.GK_CREATE, + message[key]['error']) # Continue with the scheduled tasks self.start_next_task(serv_id) @@ -582,21 +570,24 @@ def resp_instant(self, ch, method, prop, payload): This function handles responses to a request to onboard the ssms of a new service. """ - LOG.info("Instantiating response received from SMR.") # Retrieve the service uuid - serv_id = tools.serv_id_from_corr_id(self.services, prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, prop.correlation_id) + msg = ": Instantiating response received from SMR." + LOG.info("Service " + serv_id + msg) + LOG.debug(payload) message = yaml.load(payload) for ssm_type in self.services[serv_id]['service']['ssm'].keys(): ssm = self.services[serv_id]['service']['ssm'][ssm_type] response = message[ssm['id']] ssm['instantiated'] = False - if response['error'] is None: - LOG.info("SSM instantiated correctly.") + if response['error'] == 'None': + LOG.info("Service " + serv_id + ": SSM instantiated correct.") ssm['instantiated'] = True else: - LOG.info("SSM instantiation failed: " + response['error']) + msg = ": SSM instantiation failed: " + response['error'] + LOG.info("Service " + serv_id + msg) self.error_handling(serv_id, t.GK_CREATE, response['error']) ssm['uuid'] = response['uuid'] @@ -608,17 +599,18 @@ def resp_task(self, ch, method, prop, payload): """ This method handles updates of the task schedule by the an SSM. """ - #TODO: Test this method - - LOG.info("Response from task ssm: " + payload) + # TODO: Test this method # Retrieve the service uuid - serv_id = tools.serv_id_from_corr_id(self.services, prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, prop.correlation_id) + + LOG.info("Service " + serv_id + ": Response from task ssm: " + payload) message = yaml.load(payload) self.services[serv_id]['schedule'] = message['schedule'] - LOG.info("New taskschedule: " + str(self.services[serv_id]['schedule'])) + msg = ": New taskschedule: " + str(self.services[serv_id]['schedule']) + LOG.info("Service " + serv_id + msg) # # Continue with the scheduled tasks self.start_next_task(serv_id) @@ -627,26 +619,26 @@ def resp_place(self, ch, method, prop, payload): """ This method handles a placement performed by an SSM. """ - #TODO: Test this method + # TODO: Test this method - LOG.info(payload) message = yaml.load(payload) is_dict = isinstance(message, dict) - LOG.info("Type Dict: " + str(is_dict)) + LOG.debug("Type Dict: " + str(is_dict)) # Retrieve the service uuid - serv_id = tools.serv_id_from_corr_id(self.services, prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, prop.correlation_id) mapping = message['mapping'] error = message['error'] - if error != None: + if error is not None: self.error_handling(serv_id, t.GK_CREATE, error) - + else: # Add mapping to ledger - LOG.info("Calculated SSM mapping: " + str(mapping)) + msg = ": Calculated SSM mapping: " + str(mapping) + LOG.info("Service " + serv_id + msg) self.services[serv_id]['service']['mapping'] = mapping for function in self.services[serv_id]['function']: vnf_id = function['id'] @@ -658,40 +650,41 @@ def resp_ssm_configure(self, ch, method, prop, payload): """ This method handles an ssm configuration response """ - #TODO: Test this method + # TODO: Test this method LOG.info(payload) message = yaml.load(payload) # Retrieve the service uuid - serv_id = tools.serv_id_from_corr_id(self.services, prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, prop.correlation_id) self.start_next_task(serv_id) def resp_vnf_depl(self, ch, method, prop, payload): """ This method handles a response from the FLM to a vnf deploy request. """ - LOG.info('Message received from FLM on VNF deploy call.') message = yaml.load(payload) # Retrieve the service uuid - serv_id = tools.serv_id_from_corr_id(self.services, prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, prop.correlation_id) + msg = ": Message received from FLM on VNF deploy call." + LOG.info("Service " + serv_id + msg) - #Inform GK if VNF deployment failed - if message['error'] != None: + # Inform GK if VNF deployment failed + if message['error'] is not None: - LOG.info("Deployment of VNF failed") + LOG.info("Service " + serv_id + ": Deployment of VNF failed") LOG.debug("Message: " + str(message)) self.error_handling(serv_id, t.GK_CREATE, message['error']) else: - LOG.info("VNF correctly Deployed.") + LOG.info("Service " + serv_id + ": VNF correctly Deployed.") for function in self.services[serv_id]['function']: if function['id'] == message['vnfr']['id']: function['vnfr'] = message['vnfr'] - LOG.info("Added vnfr for instance: " + message['vnfr']['id']) - - vnfs_to_depl = self.services[serv_id]['vnfs_to_deploy'] - 1 + LOG.info("Added vnfr for inst: " + message['vnfr']['id']) + + vnfs_to_depl = self.services[serv_id]['vnfs_to_deploy'] - 1 self.services[serv_id]['vnfs_to_deploy'] = vnfs_to_depl # Only continue if all vnfs are deployed @@ -704,20 +697,20 @@ def resp_prepare(self, ch, method, prop, payload): This method handles a response to a prepare request. """ # Retrieve the service uuid - serv_id = tools.serv_id_from_corr_id(self.services, prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, prop.correlation_id) response = yaml.load(payload) LOG.debug("Response from IA on .prepare call: " + str(response)) if response['request_status'] == "COMPLETED": - LOG.info("Message from IA: Infrastructure prepared") + LOG.info("Service " + serv_id + ": Msg from IA: Infra prepared") else: - LOG.info("Error occured while preparing vims, aborting workflow") + msg = ": Error occured while preparing vims, aborting workflow" + LOG.info("Service " + serv_id + msg) self.error_handling(serv_id, t.GK_CREATE, response['message']) self.start_next_task(serv_id) - def contact_gk(self, serv_id): """ This method handles communication towards the gatekeeper.` @@ -728,7 +721,7 @@ def contact_gk(self, serv_id): # Get the correlation_id for the message corr_id = self.services[serv_id]['original_corr_id'] - #Build the message for the GK + # Build the message for the GK message = {} message['status'] = self.services[serv_id]['status'] message['error'] = self.services[serv_id]['error'] @@ -760,7 +753,7 @@ def request_topology(self, serv_id): # Pause the chain of tasks to wait for response self.services[serv_id]['pause_chain'] = True - LOG.info("Topology requested from IA.") + LOG.info("Service " + serv_id + ": Topology requested from IA.") def ia_prepare(self, serv_id): """ @@ -770,7 +763,8 @@ def ia_prepare(self, serv_id): :param serv_id: The instance uuid of the service """ - LOG.info("Requesting IA to prepare the infrastructure.") + msg = ": Requesting IA to prepare the infrastructure." + LOG.info("Service " + serv_id + msg) # Build mapping message for IA IA_mapping = {} @@ -784,7 +778,7 @@ def ia_prepare(self, serv_id): for function in self.services[serv_id]['function']: vim_uuid = function['vim_uuid'] - #Add VIM uuid if new + # Add VIM uuid if new new_vim = True for vim in IA_mapping['vim_list']: if vim['uuid'] == vim_uuid: @@ -792,15 +786,16 @@ def ia_prepare(self, serv_id): index = IA_mapping['vim_list'].index(vim) if new_vim: - IA_mapping['vim_list'].append({'uuid': vim_uuid, 'vm_images': []}) + IA_mapping['vim_list'].append({'uuid': vim_uuid, + 'vm_images': []}) index = len(IA_mapping['vim_list']) - 1 for vdu in function['vnfd']['virtual_deployment_units']: url = vdu['vm_image'] vm_uuid = tools.generate_image_uuid(vdu, function['vnfd']) - IA_mapping['vim_list'][index]['vm_images'].append({'image_uuid': vm_uuid,'image_url': url}) - + content = {'image_uuid': vm_uuid, 'image_url': url} + IA_mapping['vim_list'][index]['vm_images'].append(content) # Add correlation id to the ledger for future reference corr_id = str(uuid.uuid4()) @@ -833,16 +828,16 @@ def vnf_deploy(self, serv_id): message = function message['service_id'] = serv_id - LOG.info("Requesting the deployment of vnf " + function['id']) + msg = ": Requesting the deployment of vnf " + function['id'] + LOG.info("Service " + serv_id + msg) LOG.debug("Payload of request: " + str(message)) self.manoconn.call_async(self.resp_vnf_depl, - t.MANO_DEPLOY, - yaml.dump(message), + t.MANO_DEPLOY, + yaml.dump(message), correlation_id=corr_id) self.services[serv_id]['pause_chain'] = True - def onboard_ssms(self, serv_id): """ This method instructs the ssm registry manager to onboard the @@ -871,7 +866,7 @@ def onboard_ssms(self, serv_id): # Pause the chain of tasks to wait for response self.services[serv_id]['pause_chain'] = True - LOG.info("SSM on-boarding trigger sent to SMR.") + LOG.info("Service " + serv_id + ": SSM on-board trigger sent to SMR.") def instant_ssms(self, serv_id): """ @@ -882,15 +877,16 @@ def instant_ssms(self, serv_id): :param ssm_id: which ssm you want to deploy """ - corr_id = str(uuid.uuid4()) + corr_id = str(uuid.uuid4()) # Sending the NSD to the SRM triggers it to instantiate the ssms - msg = {} - msg['NSD'] = self.services[serv_id]['service']['nsd'] - msg['UUID'] = serv_id - - LOG.info("Keys in message for SSM instantiation: " + str(msg.keys())) - pyld = yaml.dump(msg) + msg_for_smr = {} + msg_for_smr['NSD'] = self.services[serv_id]['service']['nsd'] + msg_for_smr['UUID'] = serv_id + + msg = ": Keys in message for SSM instant: " + str(msg_for_smr.keys()) + LOG.info("Service " + serv_id + msg) + pyld = yaml.dump(msg_for_smr) self.manoconn.call_async(self.resp_instant, t.SRM_INSTANT, @@ -930,7 +926,7 @@ def trigger_task_ssm(self, serv_id): payload, correlation_id=corr_id) - LOG.info("task registered on " + str(topic)) + LOG.info("Service " + serv_id + ": task registered on " + str(topic)) # Pause the chain of tasks to wait for response self.services[serv_id]['pause_chain'] = True @@ -945,11 +941,11 @@ def req_placement_from_ssm(self, serv_id): corr_id = str(uuid.uuid4()) self.services[serv_id]['act_corr_id'] = corr_id - #Check if placement SSM is available + # Check if placement SSM is available ssm_place = self.services[serv_id]['service']['ssm']['placement'] - #If not available, fall back on SLM placement - if ssm_place['instantiated'] == False: - return self.SLM_mapping(serv_id) + # If not available, fall back on SLM placement + if ssm_place['instantiated'] is False: + return self.SLM_mapping(serv_id) # build message for placement SSM nsd = self.services[serv_id]['service']['nsd'] top = self.services[serv_id]['infrastructure']['topology'] @@ -960,12 +956,16 @@ def req_placement_from_ssm(self, serv_id): vnfd_to_add['instance_uuid'] = function['id'] vnfds.append(function['vnfd']) - message = {'nsd': nsd, 'topology': top, 'uuid': serv_id, 'vnfds': vnfds} + message = {'nsd': nsd, + 'topology': top, + 'uuid': serv_id, + 'vnfds': vnfds} # Contact SSM payload = yaml.dump(message) - LOG.info("Placement requested from SSM: " + str(message.keys())) + msg = ": Placement requested from SSM: " + str(message.keys()) + LOG.info("Service " + serv_id + msg) self.manoconn.call_async(self.resp_place, t.EXEC_PLACE, @@ -990,7 +990,8 @@ def configure_ssm(self, serv_id): LOG.info("Configuration SSM requested but not available") return - if not self.services[serv_id]['service']['ssm']['configure']['instantiated']: + ssm = self.services[serv_id]['service']['ssm'] + if not ssm['configure']['instantiated']: LOG.info("Configuration SSM not instantiated") return @@ -1009,10 +1010,13 @@ def configure_ssm(self, serv_id): # Pause the chain of tasks to wait for response self.services[serv_id]['pause_chain'] = True - def slm_share(self, status, content): - message = {'status':status, 'state':content, 'corr_id': content['original_corr_id'], 'slm_id': self.uuid} + message = {'status': status, + 'state': content, + 'corr_id': content['original_corr_id'], + 'slm_id': self.uuid} + payload = yaml.dump(message) self.manoconn.notify('mano.inter.slm', payload) @@ -1035,7 +1039,7 @@ def flm_deploy(self, ch, method, prop, payload): payload = yaml.dump(outg_message) corr_id = str(uuid.uuid4()) - #adding the vnfd to the flm ledger + # adding the vnfd to the flm ledger self.flm_ledger[corr_id] = {} self.flm_ledger[corr_id]['vnfd'] = message['vnfd'] self.flm_ledger[corr_id]['orig_corr_id'] = prop.correlation_id @@ -1053,7 +1057,7 @@ def IA_deploy_response(self, ch, method, prop, payload): This method fakes the FLMs reaction to a IA response. """ - # When the IA responses, the FLM builds the record and then + # When the IA responses, the FLM builds the record and then # forwards this to the SLM. LOG.info("IA reply to fake FLM on VNF deploy call") LOG.debug("Payload of request: " + str(payload)) @@ -1075,69 +1079,78 @@ def IA_deploy_response(self, ch, method, prop, payload): # Build the record vnfr = tools.build_vnfr(inc_message['vnfr'], vnfd) - outg_message['vnfr'] = vnfr + outg_message['vnfr'] = vnfr # Store the record # try: - vnfr_response = requests.post(t.VNFR_REPOSITORY_URL + 'vnf-instances', data=yaml.dump(vnfr), headers={'Content-Type':'application/x-yaml'}, timeout=1.0) - LOG.info("Storing VNFR on " + str(t.VNFR_REPOSITORY_URL + 'vnf-instances')) + url = t.VNFR_REPOSITORY_URL + 'vnf-instances' + header = {'Content-Type': 'application/json'} + vnfr_response = requests.post(url, + data=json.dumps(vnfr), + headers=header, + timeout=1.0) + LOG.info("Storing VNFR on " + url) LOG.debug("VNFR: " + str(vnfr)) if (vnfr_response.status_code == 200): LOG.info("VNFR storage accepted.") outg_message['vnfr'] = vnfr - #If storage fails, add error code and message to rply to gk + # If storage fails, add error code and message to rply to gk else: - error = {'http_code': vnfr_response.status_code, 'message': vnfr_response.json()} + error = {'http_code': vnfr_response.status_code, + 'message': vnfr_response.json()} LOG.info('vnfr to repo failed: ' + str(error)) # except: - # error = {'http_code': '0', 'message': 'Timeout when contacting VNFR server'} + # error = {'http_code': '0', + # 'message': 'Timeout contacting VNFR server'} # LOG.info('time-out on vnfr to repo') outg_message['error'] = error outg_message['inst_id'] = vnfd['instance_uuid'] + corr_id = self.flm_ledger[prop.correlation_id]['orig_corr_id'] self.manoconn.notify(t.MANO_DEPLOY, yaml.dump(outg_message), - correlation_id=self.flm_ledger[prop.correlation_id]['orig_corr_id']) + correlation_id=corr_id) def store_nsr(self, serv_id): - #TODO: get request_status from response from IA on chain + # TODO: get request_status from response from IA on chain request_status = 'normal operation' if request_status == 'normal operation': - LOG.info("Update status of the VNFR") + LOG.info("Service " + serv_id + ": Update status of the VNFR") for function in self.services[serv_id]['function']: function['vnfr']['status'] = "normal operation" function['vnfr']['version'] = '2' url = t.VNFR_REPOSITORY_URL + 'vnf-instances/' + function['id'] - LOG.info("URL for VNFR update: " + url) + LOG.info("Service " + serv_id + ": URL VNFR update: " + url) error = None try: - header = {'Content-Type':'application/json'} + header = {'Content-Type': 'application/json'} vnfr_resp = requests.put(url, data=json.dumps(function['vnfr']), headers=header, timeout=1.0) vnfr_resp_json = str(vnfr_resp.json()) if (vnfr_resp.status_code == 200): - LOG.info("VNFR update accepted for " + function['id']) + msg = ": VNFR update accepted for " + function['id'] + LOG.info("Service " + serv_id + msg) else: - LOG.info('VNFR update not accepted: ' + vnfr_resp_json) - error = {'http_code': vnfr_resp.status_code, + msg = ": VNFR update not accepted: " + vnfr_resp_json + LOG.info("Service " + serv_id + msg) + error = {'http_code': vnfr_resp.status_code, 'message': vnfr_resp_json} except: - error = {'http_code': '0', + error = {'http_code': '0', 'message': 'Timeout when contacting VNFR repo'} - if error != None: + if error is not None: self.error_handling(serv_id, t.GK_CREATE, error) return - nsd = self.services[serv_id]['service']['nsd'] vnfr_ids = [] @@ -1150,32 +1163,34 @@ def store_nsr(self, serv_id): error = None try: - header = {'Content-Type':'application/json'} + header = {'Content-Type': 'application/json'} nsr_resp = requests.post(t.NSR_REPOSITORY_URL + 'ns-instances', - data=json.dumps(nsr), + data=json.dumps(nsr), headers=header, timeout=1.0) nsr_resp_json = nsr_resp.json() if (nsr_resp.status_code == 200): - LOG.info("NSR accepted and stored for instance " + serv_id) + msg = ": NSR accepted and stored for instance " + serv_id + LOG.info("Service " + serv_id + msg) else: - LOG.info('NSR not accepted: ' + str(nsr_resp_json)) - error = {'http_code': nsr_resp.status_code, + msg = ": NSR not accepted: " + str(nsr_resp_json) + LOG.info("Service " + serv_id + msg) + error = {'http_code': nsr_resp.status_code, 'message': nsr_resp_json} except: - error = {'http_code': '0', + error = {'http_code': '0', 'message': 'Timeout when contacting NSR repo'} self.services[serv_id]['service']['nsr'] = nsr - if error != None: + if error is not None: self.error_handling(serv_id, t.GK_CREATE, error) return def vnf_chain(self, serv_id): """ - This method instructs the IA how to chain the functions together. + This method instructs the IA how to chain the functions together. """ corr_id = str(uuid.uuid4()) @@ -1203,7 +1218,7 @@ def vnf_chain(self, serv_id): yaml.dump(chain), correlation_id=corr_id) - LOG.info("Requested the Chaining of the VNFs.") + LOG.info("Service " + serv_id + ": Requested to chain the VNFs.") # Pause the chain of tasks to wait for response self.services[serv_id]['pause_chain'] = True @@ -1212,12 +1227,12 @@ def IA_chain_response(self, ch, method, prop, payload): This method handles the IA response to the chain request """ # Get the serv_id of this service - serv_id = tools.serv_id_from_corr_id(self.services, - prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, + prop.correlation_id) message = yaml.load(payload) - LOG.info("Chaining request completed.") + LOG.info("Service " + serv_id + ": Chaining request completed.") if message['message'] != '': error = message['message'] @@ -1228,39 +1243,41 @@ def IA_chain_response(self, ch, method, prop, payload): def vnf_unchain(self, serv_id): """ - This method instructs the IA to unchain the functions in the service. + This method instructs the IA to unchain the functions in the service. """ - LOG.info("Deconfiguring the chaining of the service") + msg = ": Deconfiguring the chaining of the service" + LOG.info("Service " + serv_id + msg) corr_id = str(uuid.uuid4()) self.services[serv_id]['act_corr_id'] = corr_id - payload = json.dumps({'service_instance_id':serv_id}) + payload = json.dumps({'service_instance_id': serv_id}) self.manoconn.call_async(self.IA_termination_response, - t.IA_DECONF_CHAIN, - payload, - correlation_id=corr_id) + t.IA_DECONF_CHAIN, + payload, + correlation_id=corr_id) # Pause the chain of tasks to wait for response self.services[serv_id]['pause_chain'] = True - def IA_unchain_response(self, ch, method, prop, payload): """ This method handles the IA response on the unchain request """ # Get the serv_id of this service - serv_id = tools.serv_id_from_corr_id(self.services, - prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, + prop.correlation_id) message = yaml.load(payload) if message['request_status'] == 'COMPLETED': - LOG.info("Response from IA: Service unchaining succeeded.") + msg = ": Response from IA: Service unchaining succeeded." + LOG.info("Service " + serv_id + msg) else: error = message['message'] - LOG.info("Response from IA: Service unchaining failed: " + error) + msg = ": Response from IA: Service unchaining failed: " + error + LOG.info("Service " + serv_id + msg) self.error_handling(serv_id, t.GK_KILL, error) return @@ -1270,37 +1287,38 @@ def terminate_service(self, serv_id): """ This method requests the termination of a service to the IA """ - LOG.info('Requesting IA to terminate service') + LOG.info("Service " + serv_id + ": Requesting IA to terminate service") corr_id = str(uuid.uuid4()) self.services[serv_id]['act_corr_id'] = corr_id - payload = json.dumps({'instance_uuid':serv_id}) + payload = json.dumps({'instance_uuid': serv_id}) self.manoconn.call_async(self.IA_termination_response, - t.IA_REMOVE, - payload, - correlation_id=corr_id) + t.IA_REMOVE, + payload, + correlation_id=corr_id) # Pause the chain of tasks to wait for response self.services[serv_id]['pause_chain'] = True - def IA_termination_response(self, ch, method, prop, payload): """ This method handles the response from the IA on the termination call. """ # Get the serv_id of this service - serv_id = tools.serv_id_from_corr_id(self.services, - prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, + prop.correlation_id) message = yaml.load(payload) if message['request_status'] == 'COMPLETED': - LOG.info("Response from IA: Service termination succeeded.") + msg = ": Response from IA: Service termination succeeded." + LOG.info("Service " + serv_id + msg) else: error = message['message'] - LOG.info("IA response: Service termination failed: " + error) + msg = ": IA response: Service termination failed: " + error + LOG.info("Service " + serv_id + msg) self.error_handling(serv_id, t.GK_KILL, error) return @@ -1313,29 +1331,30 @@ def terminate_ssms(self, serv_id): corr_id = str(uuid.uuid4()) self.services[serv_id]['act_corr_id'] = corr_id - LOG.info("Setting termination flags for ssms.") + LOG.info("Service " + serv_id + ": Setting termination flag for ssms.") nsd = self.services[serv_id]['service']['nsd'] for ssm in nsd['service_specific_managers']: if 'options' not in ssm.keys(): ssm['options'] = [] - ssm['options'].append({'key':'termination', 'value':'true'}) + ssm['options'].append({'key': 'termination', 'value': 'true'}) - LOG.info("SSM part of NSD: " + str(nsd['service_specific_managers'])) + msg = ": SSM part of NSD: " + str(nsd['service_specific_managers']) + LOG.info("Service " + serv_id + msg) - payload = yaml.dump({'NSD':nsd}) + payload = yaml.dump({'NSD': nsd}) self.manoconn.call_async(self.ssm_termination_response, - t.SRM_UPDATE, - payload, - correlation_id=corr_id) + t.SRM_UPDATE, + payload, + correlation_id=corr_id) # Pause the chain of tasks to wait for response self.services[serv_id]['pause_chain'] = True def ssm_termination_response(self, ch, method, prop, payload): """ - This method handles a response from the SMR on the ssm termination + This method handles a response from the SMR on the ssm termination call. """ message = yaml.load(payload) @@ -1348,11 +1367,11 @@ def wan_configure(self, serv_id): This method configures the WAN of a service """ - LOG.info("WAN Configuration") + LOG.info("Service " + serv_id + ": WAN Configuration") corr_id = str(uuid.uuid4()) self.services[serv_id]['act_corr_id'] = corr_id - message = {'service_instance_id':serv_id} + message = {'service_instance_id': serv_id} # self.manoconn.call_async(self.wan_configure_response, # t.IA_CONF_WAN, @@ -1367,10 +1386,10 @@ def wan_configure_response(self, ch, method, prop, payload): This method handles the IA response to the WAN request """ # Get the serv_id of this service - serv_id = tools.serv_id_from_corr_id(self.services, prop.correlation_id) + serv_id = tools.servid_from_corrid(self.services, prop.correlation_id) message = yaml.load(payload) - self.services[serv_id]['status'] = message['status'] + self.services[serv_id]['status'] = message['status'] self.services[serv_id]['error'] = None # TODO: handle negative status @@ -1381,7 +1400,7 @@ def wan_deconfigure(self, serv_id): This method will deconfigure the WAN """ - #TODO: when WIM implementation is finished + # TODO: when WIM implementation is finished pass @@ -1390,7 +1409,7 @@ def wan_deconfigure_response(self, ch, method, prop, payload): This method handles responses on the wan_deconfigure call """ - #TODO: when WIM implementation is finished + # TODO: when WIM implementation is finished pass @@ -1400,33 +1419,36 @@ def stop_monitoring(self, serv_id): """ url = t.MONITORING_URL + "services/" + serv_id - LOG.info("Stopping Monitoring by sending on " + url) + msg = ": Stopping Monitoring by sending on " + url + LOG.info("Service " + serv_id + msg) error = None # try: - header = {'Content-Type':'application/json'} + header = {'Content-Type': 'application/json'} mon_resp = requests.delete(url, headers=header, timeout=10.0) - LOG.info('response from monitoring manager: ' + str(mon_resp)) + msg = ": response from monitoring manager: " + str(mon_resp) + LOG.info("Service " + serv_id + msg) monitoring_json = json.loads(mon_resp) if (mon_resp.status_code == 200): - LOG.info('Monitoring delete message accepted') + LOG.info("Service " + serv_id + ": Monitoring DEL msg accepted") else: - LOG.info('Monitoring delete message not accepted') - LOG.info('Monitoring response: ' + str(monitoring_json)) + LOG.info("Service " + serv_id + ": Monitoring DEL msg not acceptd") + msg = ": Monitoring response: " + str(monitoring_json) + LOG.info("Service " + serv_id + msg) error = {'http_code': mon_resp.status_code, 'message': monitoring_json} # except: # LOG.info('timeout on monitoring communication.') - # error = {'http_code': '0', + # error = {'http_code': '0', # 'message': 'Timeout when contacting monitoring manager'} - #If an error occured, the workflow is aborted and the GK is informed - if error != None: + # If an error occured, the workflow is aborted and the GK is informed + if error is not None: self.error_handling(serv_id, t.GK_KILL, error) return @@ -1436,7 +1458,7 @@ def start_monitoring(self, serv_id): This method instructs the monitoring manager to start monitoring """ - LOG.info("Setting Monitoring") + LOG.info("Service " + serv_id + ": Setting Monitoring") service = self.services[serv_id]['service'] functions = self.services[serv_id]['function'] @@ -1446,7 +1468,7 @@ def start_monitoring(self, serv_id): error = None try: - header = {'Content-Type':'application/json'} + header = {'Content-Type': 'application/json'} mon_resp = requests.post(t.MONITORING_URL + 'service/new', data=json.dumps(mon_mess), headers=header, @@ -1454,21 +1476,22 @@ def start_monitoring(self, serv_id): monitoring_json = mon_resp.json() if (mon_resp.status_code == 200): - LOG.info('Monitoring message accepted') - + LOG.info("Service " + serv_id + ": Monitoring msg accepted") + else: - LOG.info('Monitoring message not accepted') - LOG.info('Monitoring response: ' + str(monitoring_json)) + LOG.info("Service " + serv_id + ": Monitoring msg not acceptd") + msg = ": Monitoring response: " + str(monitoring_json) + LOG.info("Service " + serv_id + msg) error = {'http_code': mon_resp.status_code, 'message': mon_resp.json()} except: - LOG.info('timeout on monitoring communication.') - error = {'http_code': '0', + LOG.info("Service " + serv_id + ": timeout on monitoring server.") + error = {'http_code': '0', 'message': 'Timeout when contacting server'} - #If an error occured, the workflow is aborted and the GK is informed - if error != None: + # If an error occured, the workflow is aborted and the GK is informed + if error is not None: self.error_handling(serv_id, t.GK_CREATE, error) return @@ -1477,7 +1500,7 @@ def inform_gk(self, serv_id): """ This method informs the gatekeeper. """ - LOG.info("Reporting result to GK") + LOG.info("Service " + serv_id + ": Reporting result to GK") message = {} @@ -1492,10 +1515,10 @@ def inform_gk(self, serv_id): LOG.debug("Payload of message " + str(message)) + orig_corr_id = self.services[serv_id]['original_corr_id'] self.manoconn.notify(t.GK_CREATE, yaml.dump(message), - correlation_id=self.services[serv_id]['original_corr_id']) - + correlation_id=orig_corr_id) ########### # SLM tasks @@ -1523,8 +1546,11 @@ def add_service_to_ledger(self, payload, corr_id): for key in payload.keys(): if key[:4] == 'VNFD': vnf_id = str(uuid.uuid4()) - LOG.info("VNFD instance id generated: " + vnf_id) - self.services[serv_id]['function'].append({'vnfd': payload[key], 'id':vnf_id}) + msg = "VNFD instance id generated: " + vnf_id + LOG.info("Service " + serv_id + msg) + vnfd = payload[key] + self.services[serv_id]['function'].append({'vnfd': vnfd, + 'id': vnf_id}) # Add to correlation id to the ledger self.services[serv_id]['original_corr_id'] = corr_id @@ -1545,7 +1571,7 @@ def add_service_to_ledger(self, payload, corr_id): self.services[serv_id]['service']['ssm'] = ssm_dict print(self.services[serv_id]['service']['ssm']) - + # Create counter for vnfs self.services[serv_id]['vnfs_to_deploy'] = 0 @@ -1569,7 +1595,7 @@ def request_returned_with_error(request): code = str(request['error']) mess = str(request['content']) LOG.info("Retrieving of NSR failed: " + code + " " + mess) - #TODO: get out of this + # TODO: get out of this self.services[serv_id] = {} self.services[serv_id]['original_corr_id'] = corr_id @@ -1579,42 +1605,43 @@ def request_returned_with_error(request): base = t.NSR_REPOSITORY_URL + "ns-instances/" request = tools.getRestData(base, serv_id) - if request['error'] != None: - request_returned_with_error(request) + if request['error'] is not None: + request_returned_with_error(request) return self.services[serv_id]['service']['nsr'] = request['content'] - LOG.info("Recreating ledger: NSR retrieved.") + LOG.info("Service " + serv_id + ": Recreating ledger: NSR retrieved.") # Retrieve the NSD based on the service record nsr = self.services[serv_id]['service']['nsr'] nsd_id = nsr['descriptor_reference'] base = t.GK_SERVICES_URL - LOG.info("Requesting nsd on url " + base) + LOG.info("Service " + serv_id + ": Requesting nsd on url " + base) request = tools.getRestData(base, nsd_id) - if request['error'] != None: - request_returned_with_error(request) + if request['error'] is not None: + request_returned_with_error(request) return self.services[serv_id]['service']['nsd'] = request['content']['nsd'] - LOG.info("Recreating ledger: NSD retrieved.") + LOG.info("Service " + serv_id + ": Recreating ledger: NSD retrieved.") LOG.info(request['content']) # Retrieve the function records based on the service record self.services[serv_id]['function'] = [] nsr = self.services[serv_id]['service']['nsr'] for vnf in nsr['network_functions']: - base = t.VNFR_REPOSITORY_URL + "vnf-instances/" + base = t.VNFR_REPOSITORY_URL + "vnf-instances/" request = tools.getRestData(base, vnf['vnfr_id']) - if request['error'] != None: - request_returned_with_error(request) + if request['error'] is not None: + request_returned_with_error(request) return new_function = {'id': vnf['vnfr_id'], 'vnfr': request['content']} self.services[serv_id]['function'].append(new_function) - LOG.info("Recreating ledger: VNFR retrieved.") + msg = ": Recreating ledger: VNFR retrieved." + LOG.info("Service " + serv_id + msg) # Retrieve the VNFDS based on the function records for vnf in self.services[serv_id]['function']: @@ -1623,12 +1650,13 @@ def request_returned_with_error(request): LOG.info(base + vnf['id']) request = tools.getRestData(base, vnfd_id) - if request['error'] != None: - request_returned_with_error(request) + if request['error'] is not None: + request_returned_with_error(request) return vnf['vnfd'] = request['content'] - LOG.info("Recreating ledger: VNFD retrieved.") + msg = ": Recreating ledger: VNFD retrieved." + LOG.info("Service " + serv_id + msg) LOG.info(request['content']) # Retrieve the deployed SSMs based on the NSD @@ -1637,7 +1665,7 @@ def request_returned_with_error(request): self.services[serv_id]['service']['ssm'] = ssm_dict - LOG.info('ssm_dict: ' + str(ssm_dict)) + LOG.info("Service " + serv_id + ": ssm_dict: " + str(ssm_dict)) # Retrieve the deployed FSMs based on the VNFD @@ -1668,7 +1696,8 @@ def validate_deploy_request(self, serv_id): # The service request in the yaml file should be a dictionary if not isinstance(payload, dict): - LOG.info("Validation of request completed. Status: Not a Dict") + msg = ": Validation of request completed. Status: Not a Dict" + LOG.info("Service " + serv_id + msg) response = "Request " + corr_id + ": payload is not a dict." self.services[serv_id]['status'] = 'ERROR' self.services[serv_id]['error'] = response @@ -1676,7 +1705,8 @@ def validate_deploy_request(self, serv_id): # The dictionary should contain a 'NSD' key if 'NSD' not in payload.keys(): - LOG.info("Validation of request completed. Status: No NSD") + msg = ": Validation of request completed. Status: No NSD" + LOG.info("Service " + serv_id + msg) response = "Request " + corr_id + ": NSD is not a dict." self.services[serv_id]['status'] = 'ERROR' self.services[serv_id]['error'] = response @@ -1690,7 +1720,8 @@ def validate_deploy_request(self, serv_id): number_of_vnfds = number_of_vnfds + 1 if len(payload['NSD']['network_functions']) != number_of_vnfds: - LOG.info("Validation of request completed. Status: Number of VNFDs incorrect") + msg = ": Validation request completed. Number of VNFDs incorrect" + LOG.info("Service " + serv_id + msg) response = "Request " + corr_id + ": # of VNFDs doesn't match NSD." self.services[serv_id]['status'] = 'ERROR' self.services[serv_id]['error'] = response @@ -1700,13 +1731,15 @@ def validate_deploy_request(self, serv_id): for key in payload.keys(): if key[:4] == 'VNFD': if payload[key] is None: - LOG.info("Validation of request completed. Status: Empty VNFD") + msg = ": Validation request completed. Empty VNFD" + LOG.info("Service " + serv_id + msg) response = "Request " + corr_id + ": empty VNFD." self.services[serv_id]['status'] = 'ERROR' self.services[serv_id]['error'] = response return - LOG.info("Validation of request completed. Status: Instantiating") + msg = ": Validation of request completed. Status: Instantiating" + LOG.info("Service " + serv_id + msg) # If all tests succeed, the status changes to 'INSTANTIATING' message = {'status': 'INSTANTIATING', 'error': None} self.services[serv_id]['status'] = 'INSTANTIATING' @@ -1724,7 +1757,7 @@ def SLM_mapping(self, serv_id): :param serv_id: The instance uuid of the service """ - LOG.info("Calculating the placement") + LOG.info("Service " + serv_id + ": Calculating the placement") topology = self.services[serv_id]['infrastructure']['topology'] NSD = self.services[serv_id]['service']['nsd'] functions = self.services[serv_id]['function'] @@ -1734,11 +1767,13 @@ def SLM_mapping(self, serv_id): if mapping is None: # The GK should be informed that the placement failed and the # deployment was aborted. - self.error_handling(serv_id, t.GK_CREATE, 'Unable to perform placement.') + self.error_handling(serv_id, + t.GK_CREATE, + 'Unable to perform placement.') else: # Add mapping to ledger - LOG.info("Placement calculations completed") + LOG.info("Service " + serv_id + ": Placement completed") LOG.debug("Calculated SLM placement: " + str(mapping)) self.services[serv_id]['service']['mapping'] = mapping for function in self.services[serv_id]['function']: @@ -1760,9 +1795,7 @@ def update_slm_configuration(self, plugin_dict): # Substract information on the different SLMs from the dict for plugin_uuid in plugin_dict.keys(): -# print(plugin_dict[plugin_uuid]['name']) if plugin_dict[plugin_uuid]['name'] == self.name: -# print('GOT HERE') active_slms.append(plugin_uuid) # Check if the list of active SLMs is identical to the known list @@ -1771,7 +1804,6 @@ def update_slm_configuration(self, plugin_dict): # print(active_slms) # print(self.known_slms) if active_slms == self.known_slms: -# print('GOT HEREE') # No action te be taken return else: @@ -1810,487 +1842,6 @@ def update_slm_configuration(self, plugin_dict): self.slm_down() - ###: Below the old functionality of the SLM. To be removed once transition is done. - - - def on_gk_service_instance_create(self, ch, method, properties, message): - """ - This is our first SLM specific event method. It is called when the SLM - receives a message from the GK published to GK_INSTANCE_CREATE_TOPIC. - Here we should react and trigger the service instantiation. - :param ch: RabbitMQ channel - :param method: RabbitMQ method - :param properties: RabbitMQ properties - :param message: RabbitMQ message content - :return: - """ - - LOG.info("Message received on service.instances.create corr_id: " + str(properties.correlation_id)) - - #The request data is in the message as a yaml file, - #and should be constructed like: - #--- - #NSD: - # descriptor_version: - # ... - #VNFD1: - # descriptor_version: - # ... - #VNFD2: - # descriptor_version: - # ... - #... - - service_request_from_gk = yaml.load(message) - - LOG.info("Checking whether request payload is formatted correctly.") - # The request should not have a correlation_id that is already being - # used by a different request path/track - for prop_id in self.service_requests_being_handled.keys(): - if properties.correlation_id in self.service_requests_being_handled[prop_id]['original_corr_id']: - LOG.info("Request has correlation_id that is already in use.") - return yaml.dump({'status' : 'ERROR', - 'error' : 'Correlation_id is already in use, please make sure to generate a new one.', - 'timestamp': time.time()}) - - # The service request in the yaml file should be a dictionary - if not isinstance(service_request_from_gk, dict): - LOG.info("service request with corr_id " + properties.correlation_id + "rejected: Message is not a dictionary.") - return yaml.dump({'status' : 'ERROR', - 'error' : 'Message is not a dictionary', - 'timestamp': time.time()}) - - # The dictionary should contain a 'NSD' key - if 'NSD' not in service_request_from_gk.keys(): - LOG.info("service request with corr_id " + properties.correlation_id + "rejected: NSD is not a dictionary.") - return yaml.dump({'status' : 'ERROR', - 'error' : 'No NSD field in dictionary', - 'timestamp': time.time()}) - - # Their should be as many VNFDx keys in the dictionary as their - # are network functions according to the NSD. - number_of_vnfds = 0 - for key in service_request_from_gk.keys(): - if key[:4] == 'VNFD': - number_of_vnfds = number_of_vnfds + 1 - - if len(service_request_from_gk['NSD']['network_functions']) != number_of_vnfds: - LOG.info("service request with corr_id " + properties.correlation_id + "rejected: number of vnfds does not match nsd.") - - return yaml.dump({'status' : 'ERROR', - 'error' : 'Number of VNFDs doesn\'t match number of vnfs', - 'timestamp': time.time()}) - - #Check whether a vnfd is none. - for key in service_request_from_gk.keys(): - if key[:4] == 'VNFD': - if service_request_from_gk[key] is None: - return yaml.dump({'status' : 'ERROR', - 'error' : 'VNFDs are not allowed to be empty', - 'timestamp': time.time()}) - - LOG.info("Request payload is formatted correctly, proceeding...") - - #If all checks on the received message pass, an uuid is created for - #the service, and we add it to the dict of services being deployed. - #Each VNF also gets an uuid. This is added to the VNFD dictionary. - #The correlation_id is used as key for this dict, since it should - #be available in all the callback functions. - self.service_requests_being_handled[properties.correlation_id] = service_request_from_gk - - #Since the key will change when new async calls are being made - #(each new call needs a unique corr_id), we need to keep track - #of the original one to reply to the GK at a later stage. - self.service_requests_being_handled[properties.correlation_id]['original_corr_id'] = properties.correlation_id - - self.service_requests_being_handled[properties.correlation_id]['NSD']['instance_uuid'] = str(uuid.uuid4()) - LOG.info("instance uuid for service generated: " + self.service_requests_being_handled[properties.correlation_id]['NSD']['instance_uuid']) - - for key in service_request_from_gk.keys(): - if key[:4] == 'VNFD': - self.service_requests_being_handled[properties.correlation_id][key]['instance_uuid'] = str(uuid.uuid4()) - LOG.info("instance uuid for vnf <" + key + "> generated: " + self.service_requests_being_handled[properties.correlation_id][key]['instance_uuid']) - - #SSM handling: if NSD has service_specific_managers field, - #then SLM contacts the SMR with this NSD. - #'ssms_ready_to_start' is set to false. This flag is used - #To make sure that both the ssm onboarding is finished and - #the service deployed before the ssm start trigger is made. - if 'service_specific_managers' in service_request_from_gk['NSD'].keys(): - if len(service_request_from_gk['NSD']['service_specific_managers']) > 0: - LOG.info('SSMs needed for this service, trigger on-boarding process in SMR.') - corr_id_for_onboarding = str(uuid.uuid4()) - self.service_requests_being_handled[properties.correlation_id]['corr_id_for_onboarding'] = corr_id_for_onboarding - self.manoconn.call_async(self.on_ssm_onboarding_return, SRM_ONBOARD, yaml.dump(service_request_from_gk['NSD']), correlation_id=corr_id_for_onboarding) - self.service_requests_being_handled[properties.correlation_id]['ssms_ready_to_start'] = False - - #After the received request has been processed, we can start - #handling it in a different thread. - LOG.info('Starting deployment of new service.') - t = threading.Thread(target=self.start_new_service_deployment, args=(ch, method, properties, message)) - t.daemon = True - t.start() - - response_for_gk = {'status' : 'INSTANTIATING', # INSTANTIATING or ERROR - 'error' : None, # NULL or a string describing the ERROR - 'timestamp': time.time()} # time() returns the number of seconds since the epoch in UTC as a float - - LOG.info('Response from SLM to GK on request: ' + str(response_for_gk)) - return yaml.dump(response_for_gk) - - def on_gk_service_update(self, ch, method, properties, message): - """ - This method handles a service update request - """ - - request = yaml.load(message) - LOG.info('Update request received for instance ' + request['Instance_id']) - - #get nsr from repository - appendix_nsr_link = 'ns-instances/' + request['Instance_id'] - nsr_response = requests.get(NSR_REPOSITORY_URL + appendix_nsr_link, timeout=10.0) - - if (nsr_response.status_code == 200): - LOG.info("NSR retrieved successfully") - nsr = nsr_response.json() - - #get the vnfrs from repository - vnfrs = [] - vnfr_dict = {} - for vnfr_obj in nsr['network_functions']: - vnfr_id = vnfr_obj['vnfr_id'] - vnfr_response = requests.get(VNFR_REPOSITORY_URL + 'vnf-instances/' + vnfr_id, timeout=10.0) - - if (vnfr_response.status_code == 200): - vnfr = vnfr_response.json() - vnfrs.append(vnfr) - vnfr_dict[vnfr_id] = vnfr - - else: - LOG.info('retrieving vnfr failed, aborting...') - error_message = {'status':'ERROR', 'error':'Updating failed, could not retrieve vnfr.'} - return yaml.dump(error_message) - else: - LOG.info('retrieving nsr failed, aborting...') - error_message = {'status':'ERROR', 'error':'Updating failed, could not retrieve nsr.'} - return yaml.dump(error_message) - - nsr['status'] = 'updating' - try: - nsr['id'] = nsr['uuid'] - except: - pass - nsr['version'] = str(int(nsr['version']) + 1) - - #create corr_id for interation with SMR to use as reference one response is received. - corr_id = str(uuid.uuid4()) - #keep track of running updates, so we can update the records after the response is received. - self.service_updates_being_handled[corr_id] = {'nsd':request['NSD'], 'nsr':nsr, 'instance_id':request['Instance_id'], 'orig_corr_id':properties.correlation_id, 'vnfrs':vnfr_dict} - - #Change status of NSR to updating. - second_nsr_dict = {} - - #remove fields that SLM is not allowed to set. - for key in nsr.keys(): - if key not in ['uuid', 'created_at', 'updated_at']: - second_nsr_dict[key] = nsr[key] - -# try: - link_for_put = NSR_REPOSITORY_URL + 'ns-instances/' + str(request['Instance_id']) - LOG.info("making put request to change status of NSR to updating") - nsr_response = requests.put(link_for_put, data=json.dumps(second_nsr_dict), headers={'Content-Type':'application/json'}, timeout=10.0) - - if nsr_response.status_code is not 200: - LOG.info('nsr updated failed, request denied.') - message = {'status':'ERROR', 'error':'could not update records.'} - return yaml.dump(message) - - LOG.info(nsr_response.json()) - -# except: -# message = {'status':'ERROR', 'error':'time-out on storing the record.'} -# return yaml.dump(message) - - - #Build request for SMR - LOG.info('retrieving nsr and vnfrs succeeded, building message for SMR...') - request_for_smr = {'NSD':request['NSD'], 'NSR':nsr, 'VNFR':vnfrs} - self.manoconn.call_async(self.on_update_request_reply, SRM_UPDATE, yaml.dump(request_for_smr), correlation_id=corr_id) - LOG.info('SMR contacted, awaiting response...') - - response_for_gk = {'status':'UPDATING', 'error':None} - LOG.info('Inform GK that update process is started.') - return yaml.dump(response_for_gk) - - def on_update_request_reply(self, ch, method, properties, message): - """ - This method handles a response of the SMR on an update request - """ - message_from_srm = yaml.load(message) - - LOG.info('Update report received from SMR, updating the records...') - LOG.info('Response from SMR: ' + yaml.dump(message, indent=4)) - - if message_from_srm['status'] == 'Updated': - message_from_srm['status'] = 'UPDATE_COMPLETED' - #updating the records. As only the nsr changes in the demo, we only update the nsr for now. - nsr = self.service_updates_being_handled[properties.correlation_id]['nsr'] - instance_id = self.service_updates_being_handled[properties.correlation_id]['instance_id'] - nsd = self.service_updates_being_handled[properties.correlation_id]['nsd'] - - nsr['version'] = str(int(nsr['version']) + 1) - nsr['descriptor_reference'] = nsd['uuid'] - nsr['status'] = 'normal operation' - #id can be stored as 'id' or 'uuid' - try: - nsr['id'] = nsr['uuid'] - except: - pass - - second_nsr_dict = {} - - #remove fields that SLM is not allowed to set. - for key in nsr.keys(): - if key not in ['uuid', 'created_at', 'updated_at']: - second_nsr_dict[key] = nsr[key] - - try: - nsr_response = requests.put(NSR_REPOSITORY_URL + 'ns-instances/' + str(instance_id), data=json.dumps(second_nsr_dict), headers={'Content-Type':'application/json'}, timeout=10.0) - - if nsr_response.status_code is not 200: - message = {'status':'ERROR', 'error':'could not update records.'} - self.manoconn.notify(GK_INSTANCE_UPDATE, yaml.dump(message), correlation_id=self.service_updates_being_handled[properties.correlation_id]['orig_corr_id']) - return - except: - message = {'status':'ERROR', 'error':'time-out on storing the record.'} - self.manoconn.notify(GK_INSTANCE_UPDATE, yaml.dump(message), correlation_id=self.service_updates_being_handled[properties.correlation_id]['orig_corr_id']) - return - - message_from_srm['nsr'] = second_nsr_dict - LOG.info('Records updated.') - - LOG.info('Reporting back to GK.') - message_for_gk = message_from_srm - #The SLM just takes the message from the SMR and forwards it towards the GK - self.manoconn.notify(GK_INSTANCE_UPDATE, yaml.dump(message_for_gk), correlation_id=self.service_updates_being_handled[properties.correlation_id]['orig_corr_id']) - - def on_ssm_onboarding_return(self, ch, method, properties, message): - """ - This method catches a reply on the ssm onboarding topic - """ - - LOG.info("Response from SRM regarding on-boarding received.") - - for service_request in self.service_requests_being_handled: - service_request = self.service_requests_being_handled[service_request] - #Check which service the response relates to. - if 'corr_id_for_onboarding' in service_request.keys(): - if service_request['corr_id_for_onboarding'] == properties.correlation_id: - #If service deployment is finished, the ssm start can be triggered. - if service_request['ssms_ready_to_start'] == True: - LOG.info("Request to start SSMs sent.") - self.manoconn.call_async(self.on_ssm_start_return, SRM_START, yaml.dump(service_request['message_for_srm'])) - self.service.requests_being_handled.pop(service_request) - break - #If service deployment is not finished, this sets a flag to state - #that onboarding is finished. - else: - LOG.info("Request to start SSMs is pending, waiting for service deployement to finish.") - service_request['ssms_ready_to_start'] = True - break - - - def start_new_service_deployment(self, ch, method, properties, message): - """ - This method initiates the deployment of a new service - """ - - LOG.info("VIM list requested from IA, to facilitate service with uuid " + self.service_requests_being_handled[properties.correlation_id]['NSD']['instance_uuid']) - #First, we need to request a list of the available vims, in order - #to choose one to place the service on. This is done by sending - #a message with an empty body on the infrastructure.management. - #resource.list topic. - new_corr_id, self.service_requests_being_handled = oldtools.replace_old_corr_id_by_new(self.service_requests_being_handled, properties.correlation_id) - self.manoconn.call_async(self.start_vim_selection, INFRA_ADAPTOR_AVAILABLE_VIMS, None, correlation_id=new_corr_id) - - def start_vim_selection(self, ch, method, properties, message): - """ - This method manages the decision of which vim the service is going to be placed on. - """ - #For now, we will go through the vims in the list and check if they have enough resources for the service. Once we find such a vim, we stick with this one. - #TODO: Outsource this process to an SSM if there is one available. - - vimList = yaml.load(message) - LOG.info("VIM list received: " + yaml.dump(vimList, indent=4)) - - if not isinstance(vimList, list): - self.inform_gk_with_error(properties.correlation_id, error_msg='No VIM.') - return - if len(vimList) == 0: - self.inform_gk_with_error(properties.correlation_id, error_msg='No VIM.') - return - - #TODO: If an SSM needs to select the vim, this is where to trigger it. Currently, an internal method is handling the decision. - #For now, we take the first one in the list, and just store the vim_uuid - self.service_requests_being_handled[properties.correlation_id]['vim'] = vimList[0]['vim_uuid'] - LOG.info("VIM selected: " + yaml.dump(self.service_requests_being_handled[properties.correlation_id]['vim'], indent=4)) - - self.request_deployment_from_IA(properties.correlation_id) - - def inform_gk_with_error(self, correlation_id, error_msg=None): - """ - This method informs the gk that no vim has the resources neede to deploy this service. - """ - - LOG.info("Inform GK of Error for service with instance uuid " + self.service_requests_being_handled[correlation_id]['NSD']['instance_uuid']) - response_message = {'status':'ERROR', 'error': error_msg, 'timestamp': time.time()} - self.manoconn.notify(GK_INSTANCE_CREATE_TOPIC, yaml.dump(response_message), correlation_id=self.service_requests_being_handled[correlation_id]['original_corr_id']) - - - def request_deployment_from_IA(self, correlation_id): - """ - This method is triggered once a vim is selected to place the service on. - """ - - request = oldtools.build_message_for_IA(self.service_requests_being_handled[correlation_id]) - LOG.info('Request message for IA built: ' + yaml.dump(request, indent=4)) - #In the service_requests_being_handled dictionary, we replace the old corr_id with the new one, to be able to keep track of the request - new_corr_id, self.service_requests_being_handled = oldtools.replace_old_corr_id_by_new(self.service_requests_being_handled, correlation_id) - LOG.info('Contacting the IA on infrastructure.service.deploy.') - self.manoconn.call_async(self.on_infra_adaptor_service_deploy_reply, - INFRA_ADAPTOR_INSTANCE_DEPLOY_REPLY_TOPIC, - yaml.dump(request), - correlation_id=new_corr_id) - - def on_infra_adaptor_service_deploy_reply(self, ch, method, properties, message): - """ - This method is called when the Infrastructure Adaptor replies to a service deploy request from the SLM. - Based on the content of the reply message, the NSR has to be contacted. - The GK should be notified of the result of the service request. - """ - - LOG.info("Deployment reply received from IA for instance uuid " + self.service_requests_being_handled[properties.correlation_id]['NSD']['instance_uuid']) - - msg = yaml.load(message) - - LOG.info("Response from IA: " + yaml.dump(msg, indent=4)) - #The message that will be returned to the gk - message_for_gk = {} - message_for_gk['status'] = 'ERROR' - message_for_gk['error'] = {} - message_for_gk['vnfrs'] = [] - - message_for_gk['timestamp'] = time.time() - - if msg['request_status'][:8] == 'DEPLOYED': - nsr = oldtools.build_nsr(self.service_requests_being_handled[properties.correlation_id], msg) - LOG.info('nsr built: ' + yaml.dump(nsr, indent=4)) - #Retrieve VNFRs from message and translate - vnfrs = oldtools.build_vnfrs(self.service_requests_being_handled[properties.correlation_id], msg['vnfrs']) - LOG.info('vnfrs built: ' + yaml.dump(vnfrs, indent=4)) - ## Store vnfrs in the repository and add vnfr ids to nsr if it is not already present - for vnfr in vnfrs: - #Store the message, catch exception when time-out occurs - try: - vnfr_response = requests.post(VNFR_REPOSITORY_URL + 'vnf-instances', data=yaml.dump(vnfr), headers={'Content-Type':'application/x-yaml'}, timeout=10.0) - #If storage succeeds, add vnfr to reply to gk - if (vnfr_response.status_code == 200): - message_for_gk['vnfrs'].append(vnfr) - #If storage fails, add error code and message to rply to gk - LOG.info('repo response for vnfr: ' + str(vnfr_response)) - else: - message_for_gk['error']['vnfr'] = {'http_code': vnfr_response.status_code, 'message': vnfr_response.json()} - LOG.info('vnfr to repo failed: ' + str(message_for_gk['error']['vnfr'])) - break - except: - message_for_gk['error']['vnfr'] = {'http_code': '0', 'message': 'Timeout when contacting server'} - LOG.info('time-out on vnfr to repo') - - break - - #Store nsr in the repository, catch exception when time-out occurs - try: - nsr_response = requests.post(NSR_REPOSITORY_URL + 'ns-instances', data=json.dumps(nsr), headers={'Content-Type':'application/json'}, timeout=10.0) - if (nsr_response.status_code == 200): - LOG.info('repo response for nsr: ' + str(nsr_response)) - message_for_gk['nsr'] = nsr - - else: - message_for_gk['error']['nsr'] = {'http_code': nsr_response.status_code, 'message': nsr_response.json()} - LOG.info('nsr to repo failed: ' + str(message_for_gk['error']['nsr'])) - except: - message_for_gk['error']['nsr'] = {'http_code': '0', 'message': 'Timeout when contacting server'} - - #TODO: put this in an if clause, so it is only done when nsr and - #vnfrs are accepted by repository. - LOG.info('nsr and vnfrs stored in Repositories, starting montitoring process.') - monitoring_message = oldtools.build_monitoring_message(self.service_requests_being_handled[properties.correlation_id], msg, nsr, vnfrs) - LOG.info('Monitoring message built: ' + json.dumps(monitoring_message, indent=4)) - - try: - monitoring_response = requests.post(MONITORING_REPOSITORY_URL + 'service/new', data=json.dumps(monitoring_message), headers={'Content-Type':'application/json'}, timeout=10.0) - - if (monitoring_response.status_code == 200): - LOG.info('Monitoring response: ' + str(monitoring_response)) - monitoring_json = monitoring_response.json() - LOG.info('Monitoring json: ' + str(monitoring_json)) - - if ('status' not in monitoring_json.keys()) or (monitoring_json['status'] != 'success'): - message_for_gk['error']['monitoring'] = monitoring_json - - else: - message_for_gk['error']['monitoring'] = {'http_code': monitoring_response.status_code, 'message': monitoring_response.json()} - - except: - message_for_gk['error']['monitoring'] = {'http_code': '0', 'message': 'Timeout when contacting server'} - LOG.info('time-out on monitoring manager.') - - - #If no errors occured, return message fields are set accordingly - #And SRM is informed to start ssms if needed. - self.service_requests_being_handled[properties.correlation_id]['completed'] = True - if message_for_gk['error'] == {}: - message_for_gk['status'] = 'READY' - message_for_gk['error'] = None - - #Check if SSMs must be started with this service - if 'service_specific_managers' in self.service_requests_being_handled[properties.correlation_id]['NSD'].keys(): - if len(self.service_requests_being_handled[properties.correlation_id]['NSD']['service_specific_managers']) > 0: - dict_for_srm = {'NSD':self.service_requests_being_handled[properties.correlation_id]['NSD'], 'NSR':nsr} - #If onboarding is finished, the ssm start trigger can be sent. - if self.service_requests_being_handled[properties.correlation_id]['ssms_ready_to_start'] == True: - LOG.info("Informing SRM that SSMs can be started.") - self.manoconn.call_async(self.on_ssm_start_return, SRM_START, yaml.dump(dict_for_srm)) - #If onboarding is not finished, this sets a flag so that when onboarding is finished, they can trigger ssm start. - else: - LOG.info("service deployement completed. Waiting for SSM onboarding to finish so SSMs can be started.") - self.service_requests_being_handled[properties.correlation_id]['ssms_ready_to_start'] = True - self.service_requests_being_handled[properties.correlation_id]['completed'] = False - self.service_requests_being_handled[properties.correlation_id]['message_for_srm'] = dict_for_srm - - else: - LOG.info("inform gk of result of deployment for service with uuid " + self.service_requests_being_handled[properties.correlation_id]['NSD']['instance_uuid']) - LOG.info("Message for gk: " + yaml.dump(message_for_gk, indent=4)) - message_for_gk['error'] = 'Deployment result: ' + msg['request_status'] - self.manoconn.notify(GK_INSTANCE_CREATE_TOPIC, yaml.dump(message_for_gk), correlation_id=self.service_requests_being_handled[properties.correlation_id]['original_corr_id']) - return - - #Inform the gk of the result. - LOG.info("inform gk of result of deployment for service with uuid " + self.service_requests_being_handled[properties.correlation_id]['NSD']['instance_uuid']) - LOG.info("Message for gk: " + yaml.dump(message_for_gk, indent=4)) - self.manoconn.notify(GK_INSTANCE_CREATE_TOPIC, yaml.dump(message_for_gk), correlation_id=self.service_requests_being_handled[properties.correlation_id]['original_corr_id']) - #Delete service request from handling dictionary, as handling is completed. - if self.service_requests_being_handled[properties.correlation_id]['completed']: - self.service_requests_being_handled.pop(properties.correlation_id, None) - - def on_ssm_start_return(self, ch, method, properties, message): - """ - This method handles responses from the srm.management.start topic - """ - - pass - - def main(): """ Entry point to start plugin. diff --git a/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm_helpers.py b/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm_helpers.py index 3ee2612..9c2789d 100644 --- a/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm_helpers.py +++ b/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm_helpers.py @@ -28,21 +28,22 @@ import uuid import yaml + def convert_corr_id(corr_id): """ - This method converts the correlation id into an integer that is + This method converts the correlation id into an integer that is small enough to be used with a modulo operation. :param corr_id: The correlation id as a String - """ + """ - #Select the final 4 digits of the string + # Select the final 4 digits of the string reduced_string = corr_id[-4:] reduced_int = int(reduced_string, 16) return reduced_int -def serv_id_from_corr_id(ledger, corr_id): +def servid_from_corrid(ledger, corr_id): """ This method returns the service uuid based on a correlation id. It is used for responses from different modules that use the @@ -62,6 +63,7 @@ def serv_id_from_corr_id(ledger, corr_id): return serv_id + def generate_image_uuid(vdu, vnfd): """ This method creates the image_uuid based on the vdu info in the @@ -98,13 +100,14 @@ def placement(NSD, functions, topology): vim['core_used'] = vim['core_used'] + needed_cpu vim['memory_used'] = vim['memory_used'] + needed_mem break - + # Check if all VNFs have been mapped if len(mapping.keys()) == len(functions): return mapping else: return None + def build_resource_request(descriptors, vim): """ This method builds a resource request message based on the needed resourcs. @@ -223,6 +226,7 @@ def build_nsr(request_status, nsd, vnfr_ids, service_instance_id): return nsr + def get_ssm_from_nsd(nsd): if 'service_specific_managers' in nsd: @@ -239,6 +243,7 @@ def get_ssm_from_nsd(nsd): return ssm_dict + def getRestData(base, path, expected_code=200): """ This method can be used to retrieve data through a rest api. @@ -252,20 +257,19 @@ def getRestData(base, path, expected_code=200): if (code == expected_code): print("GET for " + str(path) + " succeeded: " + str(content)) - return {'error':None, "content": content} + return {'error': None, "content": content} else: print("GET returned with status_code: " + str(code)) return{'error': code, "content": content} except: print("GET request timed out") - return{'error': '400', 'content':'request timed out'} - + return{'error': '400', 'content': 'request timed out'} def build_vnfr(ia_vnfr, vnfd): """ This method builds the VNFR. VNFRS are built from the stripped VNFRs - returned by the Infrastructure Adaptor (IA), combining it with the + returned by the Infrastructure Adaptor (IA), combining it with the provided VNFD. """ @@ -273,7 +277,7 @@ def build_vnfr(ia_vnfr, vnfd): # vnfd base fields vnfr['descriptor_version'] = ia_vnfr['descriptor_version'] vnfr['id'] = ia_vnfr['id'] - #Building the vnfr makes it the first version of this vnfr. + # Building the vnfr makes it the first version of this vnfr. vnfr['version'] = '1' vnfr['status'] = ia_vnfr['status'] vnfr['descriptor_reference'] = ia_vnfr['descriptor_reference'] @@ -308,30 +312,9 @@ def build_vnfr(ia_vnfr, vnfd): vnfc['id'] = ia_vnfc['id'] vnfc['vim_id'] = ia_vnfc['vim_id'] vnfc['vc_id'] = ia_vnfc['vc_id'] -# vnfc['connection_points'] = ia_vnfc['connection_points'] vnfc['connection_points'] = ia_vnfc['connection_points'] -# for cp_ia in ia_vnfc['connection_points']: -# new_cp = - # new_cp['id'] = cp_ia['id'] - # cp_vnfd = get_vdu_cp_by_ref(vnfd, vdu['id'], new_cp['id']) - # new_cp['type'] = cp_vnfd['type'] - # new_cp['interface'] = cp_vnfd['interface'] - - # new_cp['interface']['address'] = cp_ia['type']['address'] - # if 'netmask' in cp_ia['type'].keys(): - # new_cp['interface']['netmask'] = cp_ia['type']['netmask'] - # else: - # new_cp['interface']['netmask'] = '255.255.255.248' - - # if 'hardware_address' in cp_ia['type'].keys(): - # new_cp['interface']['hardware_address'] = cp_ia['type']['hardware_address'] - -# vnfc['connection_points'].append(cp_ia) - vdu['vnfc_instance'].append(vnfc) - - # vdu monitoring-parameters (optional) if vnfd_vdu is not None and 'monitoring_parameters' in vnfd_vdu: @@ -362,6 +345,7 @@ def get_vnfd_vdu_by_reference(vnfd, vdu_reference): return vnfd_vdu return None + def get_vdu_cp_by_ref(vnfd, vdu_id, cp_id): if 'virtual_deployment_units' in vnfd: @@ -373,6 +357,7 @@ def get_vdu_cp_by_ref(vnfd, vdu_id, cp_id): return None + def get_vnfd_by_reference(gk_request, vnfd_reference): for key in gk_request.keys(): @@ -495,7 +480,7 @@ def get_threshold(condition): for mr in vnfd['monitoring_rules']: vdu_id = mr['condition'].split(":")[0] for vnfc in vdu_hostid: - if vnfc[vdu_id] != None: + if vnfc[vdu_id] is not None: host_id = vnfc[vdu_id] rule = {} diff --git a/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm_topics.py b/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm_topics.py index 81932ce..c8f1c54 100644 --- a/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm_topics.py +++ b/plugins/son-mano-service-lifecycle-management/son_mano_slm/slm_topics.py @@ -75,11 +75,11 @@ # With monitoring MON_RECEIVE = "son.monitoring" -## REST APIs +# REST APIs temp = os.environ.get("url_nsr_repository") if temp is None: - temp = "http://api.int.sonata-nfv.eu:4002/records/nsr/" + temp = "http://api.int.sonata-nfv.eu:4002/records/nsr/" BASE_URL = temp.split(":")[0] + ':' + temp.split(":")[1] @@ -94,4 +94,3 @@ # With Monitoring Manager # TODO: Secure this get against failure MONITORING_URL = os.environ.get("url_monitoring_server") - diff --git a/son-mano-base/sonmanobase/messaging.py b/son-mano-base/sonmanobase/messaging.py index c5087b0..b1dea5d 100644 --- a/son-mano-base/sonmanobase/messaging.py +++ b/son-mano-base/sonmanobase/messaging.py @@ -550,6 +550,3 @@ def callback_print(self, ch, method, properties, msg): """ LOG.debug("RECEIVED from %r on %r: %r" % ( properties.app_id, method.routing_key, str(msg))) - - -