From 461b33d579e9ca278d6174590af66eff7a71d338 Mon Sep 17 00:00:00 2001 From: "sandipsamal117@gmail.com" Date: Thu, 11 Apr 2024 18:51:31 -0400 Subject: [PATCH] minor bug fix --- app/config.py | 2 +- app/controllers/subprocesses/wf_manager.py | 35 +++++++++++----------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/app/config.py b/app/config.py index fc35d5c..ccca4a2 100644 --- a/app/config.py +++ b/app/config.py @@ -3,7 +3,7 @@ class Settings(BaseSettings): pflink_mongodb: MongoDsn = 'mongodb://localhost:27017' - version: str = "4.0.5" + version: str = "4.0.6" mongo_username: str = "admin" mongo_password: str = "admin" log_level: str = "DEBUG" diff --git a/app/controllers/subprocesses/wf_manager.py b/app/controllers/subprocesses/wf_manager.py index c0ba8ff..ccd069f 100644 --- a/app/controllers/subprocesses/wf_manager.py +++ b/app/controllers/subprocesses/wf_manager.py @@ -152,7 +152,7 @@ def manage_workflow(self, db_key: str, test: bool): self.create_analysis(db_key) # check on analysis and retry if needs be case State.ANALYZING: - proceed = self.analysis_retry(db_key) + proceed = self.retry_analysis(db_key) # do nothing and exit case State.COMPLETED: return @@ -172,7 +172,7 @@ def create_analysis(self, key: str): 5) Search for required plugin or pipeline inside CUBE 6) Create a new instance of the plugin or pipeline with the previous `dircopy` instance """ - # if PACS files registering is in progress, do nothing and exit + # if PACS files registering is in progress or feed already requested, do nothing and exit if not self.__workflow.response.state_progress == "100%" and self.__workflow.feed_requested: return @@ -369,37 +369,36 @@ def update_and_wait(self, sleep: int, db_key: str, test: bool) -> WorkflowDBSche 3) retrieve the latest data from the DB """ logger.info("Creating new status manager.", extra=d) - self.update_status(self.__request) + self.update_status() logger.info(f"Sleeping for {sleep} seconds.", extra=d) time.sleep(sleep) workflow = retrieve_workflow(db_key, test) return workflow - def analysis_retry(self, db_key: str) -> bool: + def retry_analysis(self, db_key: str) -> bool: """ Retry analysis on failures """ if not self.is_retry_valid(): return False - workflow = self.__workflow - logger.warning(f"Retrying request.{5 - workflow.service_retry}/5 retries left.", extra=d) - logger.warning(f"Setting feed requested status to False in the DB", extra=d) - if workflow.feed_requested: - workflow.service_retry += 1 - workflow.feed_requested = False - workflow.feed_id_generated = "" - workflow.started = False + logger.warning(f"Retrying request.{5 - self.__workflow.service_retry}/5 retries left.", extra=d) + if self.__workflow.feed_requested: + logger.warning(f"Setting feed requested status to False in the DB", extra=d) + self.__workflow.service_retry += 1 + self.__workflow.feed_requested = False + self.__workflow.feed_id_generated = "" + self.__workflow.started = False # reset the current response object - workflow.response = WorkflowStatusResponseSchema() + self.__workflow.response = WorkflowStatusResponseSchema() # set to 'registering state' for manager to retry analysis - workflow.response.workflow_state = State.REGISTERING - workflow.response.state_progress = "100%" + self.__workflow.response.workflow_state = State.REGISTERING + self.__workflow.response.state_progress = "100%" - update_workflow(db_key, workflow) - if workflow.service_retry >= 5: logger.warning(Warnings.max_analysis_retry.value, extra=d) + update_workflow(db_key, self.__workflow) + if self.__workflow.service_retry >= 5: logger.warning(Warnings.max_analysis_retry.value, extra=d) return True def is_retry_valid(self) -> bool: @@ -417,7 +416,7 @@ def is_retry_valid(self) -> bool: and workflow.response.workflow_state == State.ANALYZING and workflow.feed_id_generated == workflow.response.feed_id) - def update_status(self, request: WorkflowRequestSchema): + def update_status(self): """Start an update status in a separate python process""" status_mgr_subprocess = Subprocess("app/controllers/subprocesses/status.py", self.args.data) resp: str = status_mgr_subprocess.run()