Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[master] Don't block when evaluating requisites of parallel states #66956

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/59959.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed requisites by parallel states on parallel states being evaluated synchronously (blocking state execution for other parallel states)
2 changes: 1 addition & 1 deletion salt/modules/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ def sls_id(id_, mods, test=None, queue=None, state_events=None, **kwargs):
ret = {}
for chunk in chunks:
if chunk.get("__id__", "") == id_:
ret.update(st_.state.call_chunk(chunk, {}, chunks))
ret.update(st_.state.call_chunk(chunk, {}, chunks)[0])

_set_retcode(ret, highstate=highstate)
# Work around Windows multiprocessing bug, set __opts__['test'] back to
Expand Down
68 changes: 51 additions & 17 deletions salt/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2468,6 +2468,20 @@ def call_chunks(
"""
Iterate over a list of chunks and call them, checking for requires.
"""

def _call_pending(
pending: dict[str, LowChunk], running: dict[str, dict]
) -> tuple[dict[str, LowChunk], dict[str, dict], bool]:
still_pending = {}
for tag, pend in pending.items():
if tag not in running:
running, is_pending = self.call_chunk(pend, running, chunks)
if is_pending:
still_pending[tag] = pend
if self.check_failhard(pend, running):
return still_pending, running, True
return still_pending, running, False

if disabled_states is None:
# Check for any disabled states
disabled = {}
Expand All @@ -2476,7 +2490,11 @@ def call_chunks(
else:
disabled = disabled_states
running = {}
pending_chunks = {}
for low in chunks:
pending_chunks, running, failhard = _call_pending(pending_chunks, running)
if failhard:
return running
if "__FAILHARD__" in running:
running.pop("__FAILHARD__")
return running
Expand All @@ -2486,14 +2504,21 @@ def call_chunks(
action = self.check_pause(low)
if action == "kill":
break
running = self.call_chunk(low, running, chunks)
running, pending = self.call_chunk(low, running, chunks)
if pending:
pending_chunks[tag] = low
if self.check_failhard(low, running):
return running
while pending_chunks:
pending_chunks, running, failhard = _call_pending(pending_chunks, running)
if failhard:
return running
time.sleep(0.01)
while True:
if self.reconcile_procs(running):
break
time.sleep(0.01)
ret = dict(list(disabled.items()) + list(running.items()))
ret = {**disabled, **running}
return ret

def check_failhard(self, low: LowChunk, running: dict[str, dict]):
Expand Down Expand Up @@ -2599,6 +2624,7 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]):
states.
"""
reqs = {}
pending = False
for req_type, chunk in self.dependency_dag.get_dependencies(low):
reqs.setdefault(req_type, []).append(chunk)
fun_stats = set()
Expand All @@ -2617,17 +2643,15 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]):
if run_dict_chunk:
filtered_run_dict[tag] = run_dict_chunk
run_dict = filtered_run_dict

while True:
if self.reconcile_procs(run_dict):
break
time.sleep(0.01)
pending = bool(not self.reconcile_procs(run_dict) and low.get("parallel"))

for chunk in chunks:
tag = _gen_tag(chunk)
if tag not in run_dict:
req_stats.add("unmet")
continue
if pending:
continue
# A state can include a "skip_req" key in the return dict
# with a True value to skip triggering onchanges, watch, or
# other requisites which would result in a only running on a
Expand Down Expand Up @@ -2684,6 +2708,8 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]):

if "unmet" in fun_stats:
status = "unmet"
elif pending:
status = "pending"
elif "fail" in fun_stats:
status = "fail"
elif "skip_req" in fun_stats and (fun_stats & {"onchangesmet", "premet"}):
Expand Down Expand Up @@ -2765,7 +2791,7 @@ def call_chunk(
running: dict[str, dict],
chunks: Sequence[LowChunk],
depth: int = 0,
) -> dict[str, dict]:
) -> tuple[dict[str, dict], bool]:
"""
Execute the chunk if the requisites did not fail
"""
Expand All @@ -2781,8 +2807,13 @@ def call_chunk(

status, reqs = self._check_requisites(low, running)
if status == "unmet":
if self._call_unmet_requisites(low, running, chunks, tag, depth):
return running
running_failhard, pending = self._call_unmet_requisites(
low, running, chunks, tag, depth
)
if running_failhard or pending:
return running, pending
elif status == "pending":
return running, True
elif status == "met":
if low.get("__prereq__"):
self.pre[tag] = self.call(low, chunks, running)
Expand Down Expand Up @@ -2910,7 +2941,7 @@ def call_chunk(
for key in ("__sls__", "__id__", "name"):
running[sub_tag][key] = low.get(key)

return running
return running, False

def _assign_not_run_result_dict(
self,
Expand Down Expand Up @@ -2944,16 +2975,19 @@ def _call_unmet_requisites(
chunks: Sequence[LowChunk],
tag: str,
depth: int,
) -> dict[str, dict]:
) -> tuple[dict[str, dict], bool]:
pending = False
for _, chunk in self.dependency_dag.get_dependencies(low):
# Check to see if the chunk has been run, only run it if
# it has not been run already
ctag = _gen_tag(chunk)
if ctag not in running:
running = self.call_chunk(chunk, running, chunks)
running, pending = self.call_chunk(chunk, running, chunks)
if pending:
return running, pending
if self.check_failhard(chunk, running):
running["__FAILHARD__"] = True
return running
return running, pending
if low.get("__prereq__"):
status, _ = self._check_requisites(low, running)
self.pre[tag] = self.call(low, chunks, running)
Expand All @@ -2975,11 +3009,11 @@ def _call_unmet_requisites(
for key in ("__sls__", "__id__", "name"):
running[tag][key] = low.get(key)
else:
running = self.call_chunk(low, running, chunks, depth)
running, pending = self.call_chunk(low, running, chunks, depth)
if self.check_failhard(low, running):
running["__FAILHARD__"] = True
return running
return {}
return running, pending
return {}, pending

def call_beacons(self, chunks: Iterable[LowChunk], running: dict) -> dict:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import time

import pytest
Expand Down Expand Up @@ -537,6 +538,69 @@ def test_parallel_state_with_requires(state, state_tree):
assert "__parallel__" in ret[_id]


@pytest.mark.skip_on_windows
def test_parallel_state_with_requires_on_parallel(state, state_tree):
"""
Parallel states requiring other parallel states should not block
state execution while waiting on their requisites.
Issue #59959
"""
sls_contents = """
service_a:
cmd.run:
- name: sleep 2
- parallel: True
service_b1:
cmd.run:
- name: sleep 5
- parallel: True
- require:
- service_a
service_b2:
cmd.run:
- name: 'true'
- parallel: True
- require:
- service_b1
service_c:
cmd.run:
- name: 'true'
- parallel: True
- require:
- service_a
"""

with pytest.helpers.temp_file("requisite_parallel.sls", sls_contents, state_tree):
ret = state.sls(
"requisite_parallel",
__pub_jid="1", # Because these run in parallel we need a fake JID)
)
start_b1 = datetime.datetime.combine(
datetime.date.today(),
datetime.time.fromisoformat(
ret["cmd_|-service_b1_|-sleep 5_|-run"]["start_time"]
),
)
start_c = datetime.datetime.combine(
datetime.date.today(),
datetime.time.fromisoformat(
ret["cmd_|-service_c_|-true_|-run"]["start_time"]
),
)
start_diff = start_c - start_b1
# Expected order:
# a > (b1, c) > b2
# When b2 blocks while waiting for b1, c has to wait for b1 as well.
# c should approximately start at the same time as b1 though.
assert start_diff < datetime.timedelta(seconds=5) # b1 sleeps for 5 seconds
for state_ret in ret.raw.values():
assert "__parallel__" in state_ret


def test_issue_59922_conflict_in_name_and_id_for_require_in(state, state_tree):
"""
Make sure that state_type is always honored while compiling down require_in to
Expand Down
2 changes: 1 addition & 1 deletion tests/pytests/unit/modules/state/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def call_chunk(data, data1, data2):
"""
Mock call_chunk method
"""
return {"": "ABC"}
return {"": "ABC"}, False

@staticmethod
def call_chunks(data):
Expand Down
2 changes: 1 addition & 1 deletion tests/pytests/unit/state/test_state_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ def test_call_chunk_sub_state_run(minion_opts):
with patch("salt.state.State.call", return_value=mock_call_return):
minion_opts["disabled_requisites"] = ["require"]
state_obj = salt.state.State(minion_opts)
ret = state_obj.call_chunk(low_data, {}, [])
ret, _ = state_obj.call_chunk(low_data, {}, [])
sub_state = ret.get(expected_sub_state_tag)
assert sub_state
assert sub_state["__run_num__"] == 1
Expand Down
Loading