diff --git a/changelog/59959.fixed.md b/changelog/59959.fixed.md new file mode 100644 index 00000000000..4e98e5f7e5d --- /dev/null +++ b/changelog/59959.fixed.md @@ -0,0 +1 @@ +Fixed requisites by parallel states on parallel states being evaluated synchronously (blocking state execution for other parallel states) diff --git a/salt/state.py b/salt/state.py index 9865e602597..2ec3cd91a0b 100644 --- a/salt/state.py +++ b/salt/state.py @@ -2468,6 +2468,20 @@ def call_chunks( """ Iterate over a list of chunks and call them, checking for requires. """ + + def _call_pending( + pending: dict[str, LowChunk], running: dict[str, dict] + ) -> tuple[dict[str, LowChunk], dict[str, dict], bool]: + still_pending = {} + for tag, pend in pending.items(): + if tag not in running: + running, is_pending = self.call_chunk(pend, running, chunks) + if is_pending: + still_pending[tag] = pend + if self.check_failhard(pend, running): + return still_pending, running, True + return still_pending, running, False + if disabled_states is None: # Check for any disabled states disabled = {} @@ -2476,7 +2490,11 @@ def call_chunks( else: disabled = disabled_states running = {} + pending_chunks = {} for low in chunks: + pending_chunks, running, failhard = _call_pending(pending_chunks, running) + if failhard: + return running if "__FAILHARD__" in running: running.pop("__FAILHARD__") return running @@ -2486,9 +2504,15 @@ def call_chunks( action = self.check_pause(low) if action == "kill": break - running = self.call_chunk(low, running, chunks) + running, pending = self.call_chunk(low, running, chunks) + if pending: + pending_chunks[tag] = low if self.check_failhard(low, running): return running + while pending_chunks: + pending_chunks, running, failhard = _call_pending(pending_chunks, running) + if failhard: + return running while True: if self.reconcile_procs(running): break @@ -2599,6 +2623,7 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]): states. """ reqs = {} + pending = False for req_type, chunk in self.dependency_dag.get_dependencies(low): reqs.setdefault(req_type, []).append(chunk) fun_stats = set() @@ -2617,17 +2642,15 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]): if run_dict_chunk: filtered_run_dict[tag] = run_dict_chunk run_dict = filtered_run_dict - - while True: - if self.reconcile_procs(run_dict): - break - time.sleep(0.01) + pending = bool(not self.reconcile_procs(run_dict) and low.get("parallel")) for chunk in chunks: tag = _gen_tag(chunk) if tag not in run_dict: req_stats.add("unmet") continue + if pending: + continue # A state can include a "skip_req" key in the return dict # with a True value to skip triggering onchanges, watch, or # other requisites which would result in a only running on a @@ -2684,6 +2707,8 @@ def _check_requisites(self, low: LowChunk, running: dict[str, dict[str, Any]]): if "unmet" in fun_stats: status = "unmet" + elif pending: + status = "pending" elif "fail" in fun_stats: status = "fail" elif "skip_req" in fun_stats and (fun_stats & {"onchangesmet", "premet"}): @@ -2765,7 +2790,7 @@ def call_chunk( running: dict[str, dict], chunks: Sequence[LowChunk], depth: int = 0, - ) -> dict[str, dict]: + ) -> tuple[dict[str, dict], bool]: """ Execute the chunk if the requisites did not fail """ @@ -2781,8 +2806,13 @@ def call_chunk( status, reqs = self._check_requisites(low, running) if status == "unmet": - if self._call_unmet_requisites(low, running, chunks, tag, depth): - return running + running_failhard, pending = self._call_unmet_requisites( + low, running, chunks, tag, depth + ) + if running_failhard or pending: + return running, pending + elif status == "pending": + return running, True elif status == "met": if low.get("__prereq__"): self.pre[tag] = self.call(low, chunks, running) @@ -2910,7 +2940,7 @@ def call_chunk( for key in ("__sls__", "__id__", "name"): running[sub_tag][key] = low.get(key) - return running + return running, False def _assign_not_run_result_dict( self, @@ -2944,16 +2974,19 @@ def _call_unmet_requisites( chunks: Sequence[LowChunk], tag: str, depth: int, - ) -> dict[str, dict]: + ) -> tuple[dict[str, dict], bool]: + pending = False for _, chunk in self.dependency_dag.get_dependencies(low): # Check to see if the chunk has been run, only run it if # it has not been run already ctag = _gen_tag(chunk) if ctag not in running: - running = self.call_chunk(chunk, running, chunks) + running, pending = self.call_chunk(chunk, running, chunks) + if pending: + return running, pending if self.check_failhard(chunk, running): running["__FAILHARD__"] = True - return running + return running, pending if low.get("__prereq__"): status, _ = self._check_requisites(low, running) self.pre[tag] = self.call(low, chunks, running) @@ -2975,11 +3008,11 @@ def _call_unmet_requisites( for key in ("__sls__", "__id__", "name"): running[tag][key] = low.get(key) else: - running = self.call_chunk(low, running, chunks, depth) + running, pending = self.call_chunk(low, running, chunks, depth) if self.check_failhard(low, running): running["__FAILHARD__"] = True - return running - return {} + return running, pending + return {}, pending def call_beacons(self, chunks: Iterable[LowChunk], running: dict) -> dict: """