Skip to content

Commit

Permalink
Rename PollItem to reflect its current role as a facade on executor (#…
Browse files Browse the repository at this point in the history
…3307)

This should not change behaviour, but should make the behaviour of this
class a bit more understandable for human readers.

The history here was PollItem used to be two classes, PollItem and
ExecutorStatus. Those two classes were merged, leaving only
PollItem; and the combo gained more facade-like behaviour, rather
than being only a status cache.
  • Loading branch information
benclifford authored Mar 29, 2024
1 parent 6210703 commit 675df3c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
12 changes: 6 additions & 6 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1216,19 +1216,19 @@ def cleanup(self) -> None:

logger.info("Scaling in and shutting down executors")

for pi in self.job_status_poller._poll_items:
if not pi.executor.bad_state_is_set:
logger.info(f"Scaling in executor {pi.executor.label}")
for ef in self.job_status_poller._executor_facades:
if not ef.executor.bad_state_is_set:
logger.info(f"Scaling in executor {ef.executor.label}")

# this code needs to be at least as many blocks as need
# cancelling, but it is safe to be more, as the scaling
# code will cope with being asked to cancel more blocks
# than exist.
block_count = len(pi.status)
pi.scale_in(block_count)
block_count = len(ef.status)
ef.scale_in(block_count)

else: # and bad_state_is_set
logger.warning(f"Not scaling in executor {pi.executor.label} because it is in bad state")
logger.warning(f"Not scaling in executor {ef.executor.label} because it is in bad state")

for executor in self.executors.values():
logger.info(f"Shutting down executor {executor.label}")
Expand Down
14 changes: 7 additions & 7 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
logger = logging.getLogger(__name__)


class PollItem:
class PolledExecutorFacade:
def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None):
self._executor = executor
self._dfk = dfk
Expand Down Expand Up @@ -110,29 +110,29 @@ class JobStatusPoller(Timer):
def __init__(self, *, strategy: Optional[str], max_idletime: float,
strategy_period: Union[float, int],
dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None) -> None:
self._poll_items = [] # type: List[PollItem]
self._executor_facades = [] # type: List[PolledExecutorFacade]
self.dfk = dfk
self._strategy = Strategy(strategy=strategy,
max_idletime=max_idletime)
super().__init__(self.poll, interval=strategy_period, name="JobStatusPoller")

def poll(self) -> None:
self._update_state()
self._run_error_handlers(self._poll_items)
self._strategy.strategize(self._poll_items)
self._run_error_handlers(self._executor_facades)
self._strategy.strategize(self._executor_facades)

def _run_error_handlers(self, status: List[PollItem]) -> None:
def _run_error_handlers(self, status: List[PolledExecutorFacade]) -> None:
for es in status:
es.executor.handle_errors(es.status)

def _update_state(self) -> None:
now = time.time()
for item in self._poll_items:
for item in self._executor_facades:
item.poll(now)

def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None:
for executor in executors:
if executor.status_polling_interval > 0:
logger.debug("Adding executor {}".format(executor.label))
self._poll_items.append(PollItem(executor, self.dfk))
self._executor_facades.append(PolledExecutorFacade(executor, self.dfk))
self._strategy.add_executors(executors)
46 changes: 23 additions & 23 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,22 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
for executor in executors:
self.executors[executor.label] = {'idle_since': None}

def _strategy_init_only(self, status_list: List[jsp.PollItem]) -> None:
def _strategy_init_only(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None:
"""Scale up to init_blocks at the start, then nothing more.
"""
for exec_status in status_list:
if exec_status.first:
executor = exec_status.executor
for ef in executor_facades:
if ef.first:
executor = ef.executor
logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}")
exec_status.scale_out(executor.provider.init_blocks)
exec_status.first = False
ef.scale_out(executor.provider.init_blocks)
ef.first = False
else:
logger.debug("strategy_init_only: doing nothing")

def _strategy_simple(self, status_list: List[jsp.PollItem]) -> None:
self._general_strategy(status_list, strategy_type='simple')
def _strategy_simple(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None:
self._general_strategy(executor_facades, strategy_type='simple')

def _strategy_htex_auto_scale(self, status_list: List[jsp.PollItem]) -> None:
def _strategy_htex_auto_scale(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None:
"""HTEX specific auto scaling strategy
This strategy works only for HTEX. This strategy will scale out by
Expand All @@ -176,30 +176,30 @@ def _strategy_htex_auto_scale(self, status_list: List[jsp.PollItem]) -> None:
expected to scale in effectively only when # of workers, or tasks executing
per block is close to 1.
"""
self._general_strategy(status_list, strategy_type='htex')
self._general_strategy(executor_facades, strategy_type='htex')

@wrap_with_logs
def _general_strategy(self, status_list, *, strategy_type):
logger.debug(f"general strategy starting with strategy_type {strategy_type} for {len(status_list)} executors")
def _general_strategy(self, executor_facades, *, strategy_type):
logger.debug(f"general strategy starting with strategy_type {strategy_type} for {len(executor_facades)} executors")

for exec_status in status_list:
executor = exec_status.executor
for ef in executor_facades:
executor = ef.executor
label = executor.label
if not isinstance(executor, BlockProviderExecutor):
logger.debug(f"Not strategizing for executor {label} because scaling not enabled")
continue
logger.debug(f"Strategizing for executor {label}")

if exec_status.first:
executor = exec_status.executor
if ef.first:
executor = ef.executor
logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}")
exec_status.scale_out(executor.provider.init_blocks)
exec_status.first = False
ef.scale_out(executor.provider.init_blocks)
ef.first = False

# Tasks that are either pending completion
active_tasks = executor.outstanding

status = exec_status.status
status = ef.status

# FIXME we need to handle case where provider does not define these
# FIXME probably more of this logic should be moved to the provider
Expand Down Expand Up @@ -255,7 +255,7 @@ def _general_strategy(self, status_list, *, strategy_type):
# We have resources idle for the max duration,
# we have to scale_in now.
logger.debug(f"Idle time has reached {self.max_idletime}s for executor {label}; scaling in")
exec_status.scale_in(active_blocks - min_blocks)
ef.scale_in(active_blocks - min_blocks)

else:
logger.debug(
Expand All @@ -278,7 +278,7 @@ def _general_strategy(self, status_list, *, strategy_type):
excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block))
excess_blocks = min(excess_blocks, max_blocks - active_blocks)
logger.debug(f"Requesting {excess_blocks} more blocks")
exec_status.scale_out(excess_blocks)
ef.scale_out(excess_blocks)

elif active_slots == 0 and active_tasks > 0:
logger.debug("Strategy case 4a: No active slots but some active tasks - could scale out by a single block")
Expand All @@ -287,7 +287,7 @@ def _general_strategy(self, status_list, *, strategy_type):
if active_blocks < max_blocks:
logger.debug("Requesting single block")

exec_status.scale_out(1)
ef.scale_out(1)
else:
logger.debug("Not requesting single block, because at maxblocks already")

Expand All @@ -303,7 +303,7 @@ def _general_strategy(self, status_list, *, strategy_type):
excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block))
excess_blocks = min(excess_blocks, active_blocks - min_blocks)
logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s")
exec_status.scale_in(excess_blocks, max_idletime=self.max_idletime)
ef.scale_in(excess_blocks, max_idletime=self.max_idletime)
else:
logger.error("This strategy does not support scaling in except for HighThroughputExecutor - taking no action")
else:
Expand Down

0 comments on commit 675df3c

Please sign in to comment.