Skip to content

Commit

Permalink
Rename self.blocks and self.block_mapping for human clarity (#3308)
Browse files Browse the repository at this point in the history
This PR makes it more obvious what they are maps for, and that they
are/should be inverses of each other.
  • Loading branch information
benclifford authored Mar 29, 2024
1 parent 675df3c commit bfff592
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 18 deletions.
10 changes: 5 additions & 5 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def create_monitoring_info(self, status):
d['status'] = s.status_name
d['timestamp'] = datetime.datetime.now()
d['executor_label'] = self.label
d['job_id'] = self.blocks.get(bid, None)
d['job_id'] = self.blocks_to_job_id.get(bid, None)
d['block_id'] = bid
msg.append(d)
return msg
Expand Down Expand Up @@ -767,14 +767,14 @@ class BlockInfo:

# Now kill via provider
# Potential issue with multiple threads trying to remove the same blocks
to_kill = [self.blocks[bid] for bid in block_ids_to_kill if bid in self.blocks]
to_kill = [self.blocks_to_job_id[bid] for bid in block_ids_to_kill if bid in self.blocks_to_job_id]

r = self.provider.cancel(to_kill)
job_ids = self._filter_scale_in_ids(to_kill, r)

# to_kill block_ids are fetched from self.blocks
# If a block_id is in self.block, it must exist in self.block_mapping
block_ids_killed = [self.block_mapping[jid] for jid in job_ids]
# to_kill block_ids are fetched from self.blocks_to_job_id
# If a block_id is in self.blocks_to_job_id, it must exist in self.job_ids_to_block
block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids]

return block_ids_killed

Expand Down
12 changes: 6 additions & 6 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def __init__(self, *,
self._block_id_counter = AtomicIDCounter()

self._tasks = {} # type: Dict[object, Future]
self.blocks = {} # type: Dict[str, str]
self.block_mapping = {} # type: Dict[str, str]
self.blocks_to_job_id = {} # type: Dict[str, str]
self.job_ids_to_block = {} # type: Dict[str, str]

def _make_status_dict(self, block_ids: List[str], status_list: List[JobStatus]) -> Dict[str, JobStatus]:
"""Given a list of block ids and a list of corresponding status strings,
Expand Down Expand Up @@ -194,8 +194,8 @@ def scale_out(self, blocks: int = 1) -> List[str]:
logger.info(f"Allocated block ID {block_id}")
try:
job_id = self._launch_block(block_id)
self.blocks[block_id] = job_id
self.block_mapping[job_id] = block_id
self.blocks_to_job_id[block_id] = job_id
self.job_ids_to_block[job_id] = block_id
block_ids.append(block_id)
except Exception as ex:
self._fail_job_async(block_id,
Expand Down Expand Up @@ -232,10 +232,10 @@ def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]:
# Not using self.blocks.keys() and self.blocks.values() simultaneously
# The dictionary may be changed during invoking this function
# As scale_in and scale_out are invoked in multiple threads
block_ids = list(self.blocks.keys())
block_ids = list(self.blocks_to_job_id.keys())
job_ids = [] # types: List[Any]
for bid in block_ids:
job_ids.append(self.blocks[bid])
job_ids.append(self.blocks_to_job_id[bid])
return block_ids, job_ids

@abstractproperty
Expand Down
6 changes: 3 additions & 3 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,8 @@ def scale_in(self, count):
"""Scale in method. Cancel a given number of blocks
"""
# Obtain list of blocks to kill
to_kill = list(self.blocks.keys())[:count]
kill_ids = [self.blocks[block] for block in to_kill]
to_kill = list(self.blocks_to_job_id.keys())[:count]
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]

# Cancel the blocks provisioned
if self.provider:
Expand All @@ -625,7 +625,7 @@ def shutdown(self, *args, **kwargs):
self._should_stop.set()

# Remove the workers that are still going
kill_ids = [self.blocks[block] for block in self.blocks.keys()]
kill_ids = [self.blocks_to_job_id[block] for block in self.blocks_to_job_id.keys()]
if self.provider:
logger.debug("Cancelling blocks")
self.provider.cancel(kill_ids)
Expand Down
6 changes: 3 additions & 3 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ def scale_in(self, count):
"""Scale in method.
"""
# Obtain list of blocks to kill
to_kill = list(self.blocks.keys())[:count]
kill_ids = [self.blocks[block] for block in to_kill]
to_kill = list(self.blocks_to_job_id.keys())[:count]
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]

# Cancel the blocks provisioned
if self.provider:
Expand All @@ -720,7 +720,7 @@ def shutdown(self, *args, **kwargs):
self.should_stop.value = True

# Remove the workers that are still going
kill_ids = [self.blocks[block] for block in self.blocks.keys()]
kill_ids = [self.blocks_to_job_id[block] for block in self.blocks_to_job_id.keys()]
if self.provider:
logger.debug("Cancelling blocks")
self.provider.cancel(kill_ids)
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/site_tests/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_provider():
logger.info("Job in terminal state")

_, current_jobs = executor._get_block_and_job_ids()
# PR 1952 stoped removing scale_in blocks from self.blocks
# PR 1952 stoped removing scale_in blocks from self.blocks_to_job_id
# A new PR will handle removing blocks from self.block
# this includes failed/completed/canceled blocks
assert len(current_jobs) == 1, "Expected current_jobs == 1"
Expand Down

0 comments on commit bfff592

Please sign in to comment.