From 2cd3065a7fed06a5841646eeea0e15d01480e94d Mon Sep 17 00:00:00 2001 From: Yogesh Kumar Date: Tue, 6 Jun 2023 15:19:56 +0530 Subject: [PATCH 1/4] Improve inspect context and detect undefined task logic --- orquesta/specs/native/v1/models.py | 14 +++++++-- .../test_workflow_conductor_performance.py | 30 +++++++++++++++++-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index 1712dfac..9cf10546 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -382,6 +382,7 @@ def detect_undefined_tasks(self, parent=None): # Identify the undefined task in task transitions. result = [] traversed = [] + queued_task = [] q = queue.Queue() for task in self.get_start_tasks(): @@ -412,8 +413,12 @@ def detect_undefined_tasks(self, parent=None): continue if self.has_task(next_task_name): - if next_task_name not in RESERVED_TASK_NAMES + traversed: + if ( + next_task_name not in RESERVED_TASK_NAMES + traversed + and next_task_name not in queued_task + ): q.put(next_task_name) + queued_task.append(next_task_name) else: entry = { "message": 'The task "%s" is not defined.' % next_task_name, @@ -523,6 +528,7 @@ def inspect_context(self, parent=None): ctxs = {} errors = [] traversed = [] + task_ctx_map = {} parent_ctx = parent.get("ctx", []) if parent else [] rolling_ctx = list(set(parent_ctx)) q = queue.Queue() @@ -592,7 +598,11 @@ def inspect_context(self, parent=None): next_task_spec = self.get_task(next_task_name) if not next_task_spec.has_join(): - q.put((next_task_name, branch_ctx)) + if next_task_name not in task_ctx_map or task_ctx_map[next_task_name] != set( + branch_ctx + ): + q.put((next_task_name, branch_ctx)) + task_ctx_map[next_task_name] = set(branch_ctx) else: next_task_ctx = ctxs.get(next_task_name, []) ctxs[next_task_name] = list(set(next_task_ctx + branch_ctx)) diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py index e4df536a..2bbb95ba 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py @@ -76,7 +76,7 @@ def test_serialization_function_of_data_size(self): conductor.deserialize(conductor.serialize()) def test_runtime_function_of_splits_count(self): - num_tasks = 25 + num_tasks = 75 wf_def = {"input": ["data"], "tasks": {}} @@ -100,7 +100,33 @@ def test_runtime_function_of_splits_count(self): t2 = datetime.datetime.utcnow() delta = t2 - t1 - self.assertLess(delta.seconds, 3) + self.assertLess(delta.seconds, 2) + + def test_inspect_performance(self): + num_tasks = 75 + + wf_def = {"input": ["data"], "tasks": {}} + + for i in range(1, num_tasks): + task_name = "t" + str(i) + next_task_name = "t" + str(i + 1) + transition = [ + {"when": "<% succeeded() %>", "do": next_task_name}, + {"when": "<% failed() %>", "do": next_task_name}, + ] + wf_def["tasks"][task_name] = {"action": "core.noop", "next": transition} + + wf_def["tasks"]["t%d" % num_tasks] = {"action": "core.noop"} + wf_def["tasks"]["t%d" % (num_tasks + 1)] = {"action": "core.noop"} + + spec = native_specs.WorkflowSpec(wf_def) + + t1 = datetime.datetime.utcnow() + self.assertDictEqual(spec.inspect(), {}) + t2 = datetime.datetime.utcnow() + + delta = t2 - t1 + self.assertLess(delta.seconds, 2) class WorkflowConductorWithItemsStressTest(test_base.WorkflowConductorWithItemsTest): From db913ef8ca179cf8fd9b94d555d51a2a0c905b1c Mon Sep 17 00:00:00 2001 From: AJ Date: Fri, 13 Sep 2024 14:00:07 -0400 Subject: [PATCH 2/4] remove duplicate test; reset iterations to 25 --- .../test_workflow_conductor_performance.py | 29 +------------------ 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py index 2bbb95ba..b14d634e 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py @@ -76,7 +76,7 @@ def test_serialization_function_of_data_size(self): conductor.deserialize(conductor.serialize()) def test_runtime_function_of_splits_count(self): - num_tasks = 75 + num_tasks = 25 wf_def = {"input": ["data"], "tasks": {}} @@ -102,33 +102,6 @@ def test_runtime_function_of_splits_count(self): delta = t2 - t1 self.assertLess(delta.seconds, 2) - def test_inspect_performance(self): - num_tasks = 75 - - wf_def = {"input": ["data"], "tasks": {}} - - for i in range(1, num_tasks): - task_name = "t" + str(i) - next_task_name = "t" + str(i + 1) - transition = [ - {"when": "<% succeeded() %>", "do": next_task_name}, - {"when": "<% failed() %>", "do": next_task_name}, - ] - wf_def["tasks"][task_name] = {"action": "core.noop", "next": transition} - - wf_def["tasks"]["t%d" % num_tasks] = {"action": "core.noop"} - wf_def["tasks"]["t%d" % (num_tasks + 1)] = {"action": "core.noop"} - - spec = native_specs.WorkflowSpec(wf_def) - - t1 = datetime.datetime.utcnow() - self.assertDictEqual(spec.inspect(), {}) - t2 = datetime.datetime.utcnow() - - delta = t2 - t1 - self.assertLess(delta.seconds, 2) - - class WorkflowConductorWithItemsStressTest(test_base.WorkflowConductorWithItemsTest): def test_runtime_function_of_items_list_size(self): wf_def = """ From 95d8d2441bc6bdc58351c06ebd4d4bd055db142d Mon Sep 17 00:00:00 2001 From: AJ Date: Fri, 13 Sep 2024 14:17:07 -0400 Subject: [PATCH 3/4] fix missing line ending --- .../unit/conducting/test_workflow_conductor_performance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py index b14d634e..29474795 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py @@ -62,7 +62,7 @@ def test_runtime_function_of_graph_size(self): self.forward_task_statuses(conductor, task_name, [statuses.RUNNING, statuses.SUCCEEDED]) self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED) - + def test_serialization_function_of_graph_size(self): num_tasks = 100 conductor = self._prep_conductor(num_tasks, status=statuses.RUNNING) @@ -101,7 +101,7 @@ def test_runtime_function_of_splits_count(self): delta = t2 - t1 self.assertLess(delta.seconds, 2) - + class WorkflowConductorWithItemsStressTest(test_base.WorkflowConductorWithItemsTest): def test_runtime_function_of_items_list_size(self): wf_def = """ From 14ccd5b03e9e3c1a9c5fa64b4a86d982c13da3ab Mon Sep 17 00:00:00 2001 From: AJ Date: Fri, 13 Sep 2024 14:22:42 -0400 Subject: [PATCH 4/4] whitespace fixes --- .../unit/conducting/test_workflow_conductor_performance.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py index 29474795..1c770783 100644 --- a/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py +++ b/orquesta/tests/unit/conducting/test_workflow_conductor_performance.py @@ -62,7 +62,7 @@ def test_runtime_function_of_graph_size(self): self.forward_task_statuses(conductor, task_name, [statuses.RUNNING, statuses.SUCCEEDED]) self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED) - + def test_serialization_function_of_graph_size(self): num_tasks = 100 conductor = self._prep_conductor(num_tasks, status=statuses.RUNNING) @@ -101,7 +101,8 @@ def test_runtime_function_of_splits_count(self): delta = t2 - t1 self.assertLess(delta.seconds, 2) - + + class WorkflowConductorWithItemsStressTest(test_base.WorkflowConductorWithItemsTest): def test_runtime_function_of_items_list_size(self): wf_def = """