From bfff592e4dc3dc132a00a76a93f0391eebe64596 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 29 Mar 2024 07:12:18 -0500 Subject: [PATCH] Rename self.blocks and self.block_mapping for human clarity (#3308) This PR makes it more obvious what they are maps for, and that they are/should be inverses of each other. --- parsl/executors/high_throughput/executor.py | 10 +++++----- parsl/executors/status_handling.py | 12 ++++++------ parsl/executors/taskvine/executor.py | 6 +++--- parsl/executors/workqueue/executor.py | 6 +++--- parsl/tests/site_tests/test_provider.py | 2 +- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 164781792a..e7dce07dba 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -688,7 +688,7 @@ def create_monitoring_info(self, status): d['status'] = s.status_name d['timestamp'] = datetime.datetime.now() d['executor_label'] = self.label - d['job_id'] = self.blocks.get(bid, None) + d['job_id'] = self.blocks_to_job_id.get(bid, None) d['block_id'] = bid msg.append(d) return msg @@ -767,14 +767,14 @@ class BlockInfo: # Now kill via provider # Potential issue with multiple threads trying to remove the same blocks - to_kill = [self.blocks[bid] for bid in block_ids_to_kill if bid in self.blocks] + to_kill = [self.blocks_to_job_id[bid] for bid in block_ids_to_kill if bid in self.blocks_to_job_id] r = self.provider.cancel(to_kill) job_ids = self._filter_scale_in_ids(to_kill, r) - # to_kill block_ids are fetched from self.blocks - # If a block_id is in self.block, it must exist in self.block_mapping - block_ids_killed = [self.block_mapping[jid] for jid in job_ids] + # to_kill block_ids are fetched from self.blocks_to_job_id + # If a block_id is in self.blocks_to_job_id, it must exist in self.job_ids_to_block + block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] return block_ids_killed diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 3608cd56df..1f51f3086b 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -68,8 +68,8 @@ def __init__(self, *, self._block_id_counter = AtomicIDCounter() self._tasks = {} # type: Dict[object, Future] - self.blocks = {} # type: Dict[str, str] - self.block_mapping = {} # type: Dict[str, str] + self.blocks_to_job_id = {} # type: Dict[str, str] + self.job_ids_to_block = {} # type: Dict[str, str] def _make_status_dict(self, block_ids: List[str], status_list: List[JobStatus]) -> Dict[str, JobStatus]: """Given a list of block ids and a list of corresponding status strings, @@ -194,8 +194,8 @@ def scale_out(self, blocks: int = 1) -> List[str]: logger.info(f"Allocated block ID {block_id}") try: job_id = self._launch_block(block_id) - self.blocks[block_id] = job_id - self.block_mapping[job_id] = block_id + self.blocks_to_job_id[block_id] = job_id + self.job_ids_to_block[job_id] = block_id block_ids.append(block_id) except Exception as ex: self._fail_job_async(block_id, @@ -232,10 +232,10 @@ def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]: # Not using self.blocks.keys() and self.blocks.values() simultaneously # The dictionary may be changed during invoking this function # As scale_in and scale_out are invoked in multiple threads - block_ids = list(self.blocks.keys()) + block_ids = list(self.blocks_to_job_id.keys()) job_ids = [] # types: List[Any] for bid in block_ids: - job_ids.append(self.blocks[bid]) + job_ids.append(self.blocks_to_job_id[bid]) return block_ids, job_ids @abstractproperty diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index cf07cdd763..ee9a95d3cd 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -600,8 +600,8 @@ def scale_in(self, count): """Scale in method. Cancel a given number of blocks """ # Obtain list of blocks to kill - to_kill = list(self.blocks.keys())[:count] - kill_ids = [self.blocks[block] for block in to_kill] + to_kill = list(self.blocks_to_job_id.keys())[:count] + kill_ids = [self.blocks_to_job_id[block] for block in to_kill] # Cancel the blocks provisioned if self.provider: @@ -625,7 +625,7 @@ def shutdown(self, *args, **kwargs): self._should_stop.set() # Remove the workers that are still going - kill_ids = [self.blocks[block] for block in self.blocks.keys()] + kill_ids = [self.blocks_to_job_id[block] for block in self.blocks_to_job_id.keys()] if self.provider: logger.debug("Cancelling blocks") self.provider.cancel(kill_ids) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 571a4650ac..6cf55ba28c 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -695,8 +695,8 @@ def scale_in(self, count): """Scale in method. """ # Obtain list of blocks to kill - to_kill = list(self.blocks.keys())[:count] - kill_ids = [self.blocks[block] for block in to_kill] + to_kill = list(self.blocks_to_job_id.keys())[:count] + kill_ids = [self.blocks_to_job_id[block] for block in to_kill] # Cancel the blocks provisioned if self.provider: @@ -720,7 +720,7 @@ def shutdown(self, *args, **kwargs): self.should_stop.value = True # Remove the workers that are still going - kill_ids = [self.blocks[block] for block in self.blocks.keys()] + kill_ids = [self.blocks_to_job_id[block] for block in self.blocks_to_job_id.keys()] if self.provider: logger.debug("Cancelling blocks") self.provider.cancel(kill_ids) diff --git a/parsl/tests/site_tests/test_provider.py b/parsl/tests/site_tests/test_provider.py index 64cb8d8226..eef0420583 100644 --- a/parsl/tests/site_tests/test_provider.py +++ b/parsl/tests/site_tests/test_provider.py @@ -58,7 +58,7 @@ def test_provider(): logger.info("Job in terminal state") _, current_jobs = executor._get_block_and_job_ids() - # PR 1952 stoped removing scale_in blocks from self.blocks + # PR 1952 stoped removing scale_in blocks from self.blocks_to_job_id # A new PR will handle removing blocks from self.block # this includes failed/completed/canceled blocks assert len(current_jobs) == 1, "Expected current_jobs == 1"