Skip to content

Commit

Permalink
Don't block when evaluating requisites of parallel states
Browse files Browse the repository at this point in the history
  • Loading branch information
lkubb committed Oct 10, 2024
1 parent 537f1bb commit 4c7f0e0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 16 deletions.
1 change: 1 addition & 0 deletions changelog/59959.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed requisites by parallel states on parallel states being evaluated synchronously (blocking state execution for other parallel states)
65 changes: 49 additions & 16 deletions salt/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2468,6 +2468,20 @@ def call_chunks(
"""
Iterate over a list of chunks and call them, checking for requires.
"""

def _call_pending(
pending: dict[str, LowChunk], running: dict[str, dict]
) -> tuple[dict[str, LowChunk], dict[str, dict], bool]:
still_pending = {}
for tag, pend in pending.items():
if tag not in running:
running, is_pending = self.call_chunk(pend, running, chunks)
if is_pending:
still_pending[tag] = pend
if self.check_failhard(pend, running):
return still_pending, running, True
return still_pending, running, False

if disabled_states is None:
# Check for any disabled states
disabled = {}
Expand All @@ -2476,7 +2490,11 @@ def call_chunks(
else:
disabled = disabled_states
running = {}
pending_chunks = {}
for low in chunks:
pending_chunks, running, failhard = _call_pending(pending_chunks, running)
if failhard:
return running
if "__FAILHARD__" in running:
running.pop("__FAILHARD__")
return running
Expand All @@ -2486,9 +2504,15 @@ def call_chunks(
action = self.check_pause(low)
if action == "kill":
break
running = self.call_chunk(low, running, chunks)
running, pending = self.call_chunk(low, running, chunks)
if pending:
pending_chunks[tag] = low
if self.check_failhard(low, running):
return running
while pending_chunks:
pending_chunks, running, failhard = _call_pending(pending_chunks, running)
if failhard:
return running
while True:
if self.reconcile_procs(running):
break
Expand Down Expand Up @@ -2599,6 +2623,7 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]):
states.
"""
reqs = {}
pending = False
for req_type, chunk in self.dependency_dag.get_dependencies(low):
reqs.setdefault(req_type, []).append(chunk)
fun_stats = set()
Expand All @@ -2617,17 +2642,15 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]):
if run_dict_chunk:
filtered_run_dict[tag] = run_dict_chunk
run_dict = filtered_run_dict

while True:
if self.reconcile_procs(run_dict):
break
time.sleep(0.01)
pending = bool(not self.reconcile_procs(run_dict) and low.get("parallel"))

for chunk in chunks:
tag = _gen_tag(chunk)
if tag not in run_dict:
req_stats.add("unmet")
continue
if pending:
continue
# A state can include a "skip_req" key in the return dict
# with a True value to skip triggering onchanges, watch, or
# other requisites which would result in a only running on a
Expand Down Expand Up @@ -2684,6 +2707,8 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]):

if "unmet" in fun_stats:
status = "unmet"
elif pending:
status = "pending"
elif "fail" in fun_stats:
status = "fail"
elif "skip_req" in fun_stats and (fun_stats & {"onchangesmet", "premet"}):
Expand Down Expand Up @@ -2765,7 +2790,7 @@ def call_chunk(
running: dict[str, dict],
chunks: Sequence[LowChunk],
depth: int = 0,
) -> dict[str, dict]:
) -> tuple[dict[str, dict], bool]:
"""
Execute the chunk if the requisites did not fail
"""
Expand All @@ -2781,8 +2806,13 @@ def call_chunk(

status, reqs = self._check_requisites(low, running)
if status == "unmet":
if self._call_unmet_requisites(low, running, chunks, tag, depth):
return running
running_failhard, pending = self._call_unmet_requisites(
low, running, chunks, tag, depth
)
if running_failhard or pending:
return running, pending
elif status == "pending":
return running, True
elif status == "met":
if low.get("__prereq__"):
self.pre[tag] = self.call(low, chunks, running)
Expand Down Expand Up @@ -2910,7 +2940,7 @@ def call_chunk(
for key in ("__sls__", "__id__", "name"):
running[sub_tag][key] = low.get(key)

return running
return running, False

def _assign_not_run_result_dict(
self,
Expand Down Expand Up @@ -2944,16 +2974,19 @@ def _call_unmet_requisites(
chunks: Sequence[LowChunk],
tag: str,
depth: int,
) -> dict[str, dict]:
) -> tuple[dict[str, dict], bool]:
pending = False
for _, chunk in self.dependency_dag.get_dependencies(low):
# Check to see if the chunk has been run, only run it if
# it has not been run already
ctag = _gen_tag(chunk)
if ctag not in running:
running = self.call_chunk(chunk, running, chunks)
running, pending = self.call_chunk(chunk, running, chunks)
if pending:
return running, pending
if self.check_failhard(chunk, running):
running["__FAILHARD__"] = True
return running
return running, pending
if low.get("__prereq__"):
status, _ = self._check_requisites(low, running)
self.pre[tag] = self.call(low, chunks, running)
Expand All @@ -2975,11 +3008,11 @@ def _call_unmet_requisites(
for key in ("__sls__", "__id__", "name"):
running[tag][key] = low.get(key)
else:
running = self.call_chunk(low, running, chunks, depth)
running, pending = self.call_chunk(low, running, chunks, depth)
if self.check_failhard(low, running):
running["__FAILHARD__"] = True
return running
return {}
return running, pending
return {}, pending

def call_beacons(self, chunks: Iterable[LowChunk], running: dict) -> dict:
"""
Expand Down

0 comments on commit 4c7f0e0

Please sign in to comment.