Skip to content

Commit

Permalink
add abort_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
hanjinliu committed Sep 3, 2023
1 parent 912a673 commit ac7b440
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions magicclass/fields/_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ def connect_async(
func: Callable[_P, Any | None],
*,
timeout: float = 0.0,
abort_limit: float = float("inf"),
ignore_errors: bool = False,
) -> thread_worker[_P]:
...
Expand All @@ -483,11 +484,19 @@ def connect_async(
func: Literal[None],
*,
timeout: float = 0.0,
abort_limit: float = float("inf"),
ignore_errors: bool = False,
) -> Callable[[Callable[_P, Any | None]], thread_worker[_P]]:
...

def connect_async(self, func=None, *, timeout=0.0, ignore_errors=False):
def connect_async(
self,
func=None,
*,
timeout=0.0,
abort_limit=float("inf"),
ignore_errors=False,
):
"""
Connect a callback function to be called asynchronously.
Expand All @@ -513,6 +522,9 @@ def connect_async(self, func=None, *, timeout=0.0, ignore_errors=False):
Timeout second. All the callback called within this time will be
aborted. In other words, callback will only be called every
``timeout`` seconds.
abort_limit : float, default is inf
If the callback is aborted more than ``abort_limit`` seconds, it
will not be aborted even after `timeout` seconds for the next call.
ignore_errors : bool, default is False
If true, error will be ignored so that it does not disturb users.
"""
Expand All @@ -527,17 +539,26 @@ def _wrapper(fn):
_running: WorkerBase | None = None # the running worker
_last_run = 0.0 # last time the worker was started
_last_run_id = 0.0 # last starting time (= worker ID) that ended
_abort_begin = -1 # last time the worker started being aborted

@wraps(fn)
def _func(self: MagicTemplate, *args, **kwargs):
nonlocal _running, _last_run, _last_run_id
nonlocal _running, _last_run, _last_run_id, _abort_begin
with threading.Lock():
_this_id = default_timer()
f = _afunc.__get__(self)
if timeout >= _this_id - _last_run:
aborted_many_times = (
_abort_begin > 0 and _this_id - _abort_begin > abort_limit
)
if timeout >= _this_id - _last_run and not aborted_many_times:
if _abort_begin < 0:
_abort_begin = _this_id
if _running is not None:
_running.quit()
_running = f._worker()
elif aborted_many_times:
_abort_begin = -1

_last_run = default_timer()
_last_run_copy = _last_run
# call the original function
Expand All @@ -554,20 +575,23 @@ def _func(self: MagicTemplate, *args, **kwargs):
if (
_last_run_copy < _last_run
or _this_id < _last_run_id
):
) and not aborted_many_times:
# if j-th call turned out to be out of date,
# don't run anymore.
return thread_worker.callback()
else:
yield next_value
except Exception as exc:
if _running is not None:
_running.quit()
raise exc
if _this_id < _last_run_id:
if _this_id < _last_run_id and not aborted_many_times:
# if j-th call finished after i-th call (i < j),
# don't run returned callback.
return thread_worker.callback()
with threading.Lock():
_last_run_id = _this_id
# _abort_begin = -1
yield thread_worker.callback() # empty callback
return out

Expand Down

0 comments on commit ac7b440

Please sign in to comment.