diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 338bc57a4..7f8ea42d7 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -346,9 +346,9 @@ def validate_resource_spec(self, resource_specification: dict): if resource_specification: raise InvalidResourceSpecification( set(resource_specification.keys()), - ("HTEX does not support the supplied resource_specifications." + ("HTEX does not support the supplied resource_specifications. " "For MPI applications consider using the MPIExecutor. " - "For specifications for core count/memory/walltime, consider using WorkQueueExecutor. ") + "For specifications for core count/memory/walltime, consider using WorkQueueExecutor.") ) return @@ -460,9 +460,7 @@ def _result_queue_worker(self): except pickle.UnpicklingError: raise BadMessage("Message received could not be unpickled") - if msg['type'] == 'heartbeat': - continue - elif msg['type'] == 'result': + if msg['type'] == 'result': try: tid = msg['task_id'] except Exception: @@ -582,7 +580,7 @@ def hold_worker(self, worker_id: str) -> None: def outstanding(self) -> int: """Returns the count of tasks outstanding across the interchange and managers""" - return self.command_client.run("OUTSTANDING_C") + return len(self.tasks) @property def connected_workers(self) -> int: diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index d61c76fed..b0228b52f 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -6,7 +6,6 @@ import pickle import platform import queue -import signal import sys import threading import time @@ -252,13 +251,7 @@ def _command_server(self) -> NoReturn: try: command_req = self.command_channel.recv_pyobj() logger.debug("Received command request: {}".format(command_req)) - if command_req == "OUTSTANDING_C": - outstanding = self.pending_task_queue.qsize() - for manager in self._ready_managers.values(): - outstanding += len(manager['tasks']) - reply = outstanding - - elif command_req == "CONNECTED_BLOCKS": + if command_req == "CONNECTED_BLOCKS": reply = self.connected_block_history elif command_req == "WORKERS": @@ -319,16 +312,6 @@ def start(self) -> None: """ Start the interchange """ - # If a user workflow has set its own signal handler for sigterm, that - # handler will be inherited by the interchange process because it is - # launched as a multiprocessing fork process. - # That can interfere with the interchange shutdown mechanism, which is - # to receive a SIGTERM and exit immediately. - # See Parsl issue #2343 (Threads and multiprocessing cannot be - # intermingled without deadlocks) which talks about other fork-related - # parent-process-inheritance problems. - signal.signal(signal.SIGTERM, signal.SIG_DFL) - logger.info("Starting main interchange method") if self.hub_address is not None and self.hub_zmq_port is not None: @@ -549,7 +532,6 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': logger.debug("Manager %r sent heartbeat via results connection", manager_id) - b_messages.append((p_message, r)) else: logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"])