Skip to content

Commit

Permalink
Merge pull request #254 from yogeshgoyal26/inspect-performance-fix
Browse files Browse the repository at this point in the history
Improve inspect context and detect undefined task logic.
  • Loading branch information
guzzijones authored Sep 13, 2024
2 parents 4f20e10 + 14ccd5b commit 5ba1467
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
14 changes: 12 additions & 2 deletions orquesta/specs/native/v1/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ def detect_undefined_tasks(self, parent=None):
# Identify the undefined task in task transitions.
result = []
traversed = []
queued_task = []
q = queue.Queue()

for task in self.get_start_tasks():
Expand Down Expand Up @@ -412,8 +413,12 @@ def detect_undefined_tasks(self, parent=None):
continue

if self.has_task(next_task_name):
if next_task_name not in RESERVED_TASK_NAMES + traversed:
if (
next_task_name not in RESERVED_TASK_NAMES + traversed
and next_task_name not in queued_task
):
q.put(next_task_name)
queued_task.append(next_task_name)
else:
entry = {
"message": 'The task "%s" is not defined.' % next_task_name,
Expand Down Expand Up @@ -523,6 +528,7 @@ def inspect_context(self, parent=None):
ctxs = {}
errors = []
traversed = []
task_ctx_map = {}
parent_ctx = parent.get("ctx", []) if parent else []
rolling_ctx = list(set(parent_ctx))
q = queue.Queue()
Expand Down Expand Up @@ -592,7 +598,11 @@ def inspect_context(self, parent=None):
next_task_spec = self.get_task(next_task_name)

if not next_task_spec.has_join():
q.put((next_task_name, branch_ctx))
if next_task_name not in task_ctx_map or task_ctx_map[next_task_name] != set(
branch_ctx
):
q.put((next_task_name, branch_ctx))
task_ctx_map[next_task_name] = set(branch_ctx)
else:
next_task_ctx = ctxs.get(next_task_name, [])
ctxs[next_task_name] = list(set(next_task_ctx + branch_ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_runtime_function_of_splits_count(self):
t2 = datetime.datetime.utcnow()

delta = t2 - t1
self.assertLess(delta.seconds, 3)
self.assertLess(delta.seconds, 2)


class WorkflowConductorWithItemsStressTest(test_base.WorkflowConductorWithItemsTest):
Expand Down

0 comments on commit 5ba1467

Please sign in to comment.