Skip to content

Commit

Permalink
Merge branch 'master' into benc-drain-typo
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Oct 15, 2024
2 parents ec33097 + 2a6bd18 commit 6f97421
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 25 deletions.
10 changes: 4 additions & 6 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ def validate_resource_spec(self, resource_specification: dict):
if resource_specification:
raise InvalidResourceSpecification(
set(resource_specification.keys()),
("HTEX does not support the supplied resource_specifications."
("HTEX does not support the supplied resource_specifications. "
"For MPI applications consider using the MPIExecutor. "
"For specifications for core count/memory/walltime, consider using WorkQueueExecutor. ")
"For specifications for core count/memory/walltime, consider using WorkQueueExecutor.")
)
return

Expand Down Expand Up @@ -460,9 +460,7 @@ def _result_queue_worker(self):
except pickle.UnpicklingError:
raise BadMessage("Message received could not be unpickled")

if msg['type'] == 'heartbeat':
continue
elif msg['type'] == 'result':
if msg['type'] == 'result':
try:
tid = msg['task_id']
except Exception:
Expand Down Expand Up @@ -582,7 +580,7 @@ def hold_worker(self, worker_id: str) -> None:
def outstanding(self) -> int:
"""Returns the count of tasks outstanding across the interchange
and managers"""
return self.command_client.run("OUTSTANDING_C")
return len(self.tasks)

@property
def connected_workers(self) -> int:
Expand Down
20 changes: 1 addition & 19 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pickle
import platform
import queue
import signal
import sys
import threading
import time
Expand Down Expand Up @@ -252,13 +251,7 @@ def _command_server(self) -> NoReturn:
try:
command_req = self.command_channel.recv_pyobj()
logger.debug("Received command request: {}".format(command_req))
if command_req == "OUTSTANDING_C":
outstanding = self.pending_task_queue.qsize()
for manager in self._ready_managers.values():
outstanding += len(manager['tasks'])
reply = outstanding

elif command_req == "CONNECTED_BLOCKS":
if command_req == "CONNECTED_BLOCKS":
reply = self.connected_block_history

elif command_req == "WORKERS":
Expand Down Expand Up @@ -319,16 +312,6 @@ def start(self) -> None:
""" Start the interchange
"""

# If a user workflow has set its own signal handler for sigterm, that
# handler will be inherited by the interchange process because it is
# launched as a multiprocessing fork process.
# That can interfere with the interchange shutdown mechanism, which is
# to receive a SIGTERM and exit immediately.
# See Parsl issue #2343 (Threads and multiprocessing cannot be
# intermingled without deadlocks) which talks about other fork-related
# parent-process-inheritance problems.
signal.signal(signal.SIGTERM, signal.SIG_DFL)

logger.info("Starting main interchange method")

if self.hub_address is not None and self.hub_zmq_port is not None:
Expand Down Expand Up @@ -549,7 +532,6 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug("Manager %r sent heartbeat via results connection", manager_id)
b_messages.append((p_message, r))
else:
logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"])

Expand Down

0 comments on commit 6f97421

Please sign in to comment.