Source code for libb.sync

import datetime
import math
import signal
import threading
import time
import warnings
from collections.abc import Callable
from datetime import timedelta, timezone
from typing import Any, TypeVar, cast

__all__ = [
    'syncd',
    'NonBlockingDelay',
    'delay',
    'debounce',
    'wait_until',
    'timeout',
]


[docs] def syncd(lock): """Decorator to synchronize functions with a shared lock. :param lock: Threading lock to acquire during function execution. :returns: Decorator function. Example:: >>> import threading >>> lock = threading.Lock() >>> @syncd(lock) ... def safe_increment(counter): ... return counter + 1 >>> safe_increment(0) 1 """ def wrap(f): def new_function(*args, **kw): lock.acquire() try: return f(*args, **kw) finally: lock.release() return new_function return wrap
[docs] class NonBlockingDelay: """Non-blocking delay for checking time elapsed.""" def __init__(self): self._timestamp = 0.0 self._delay = 0.0
[docs] def timeout(self): """Check if the delay time has elapsed. :returns: True if time is up. :rtype: bool """ return (time.monotonic() - self._timestamp) > self._delay
[docs] def delay(self, delay: float) -> None: """Start a non-blocking delay. :param float delay: Delay duration in seconds. """ self._timestamp = time.monotonic() self._delay = delay
[docs] def delay(seconds: float) -> None: """Delay non-blocking for N seconds (busy-wait). .. deprecated:: Use time.sleep() for efficient blocking delays. This function is kept for backward compatibility. """ warnings.warn( 'delay() is deprecated, use time.sleep() instead', DeprecationWarning, stacklevel=2 ) d = NonBlockingDelay() d.delay(seconds) while not d.timeout(): continue
class Debouncer: """Debounce handler used by :func:`debounce` decorator.""" def __init__(self, func: Callable[..., Any], wait: float): self.func = func self.wait = wait self._timer: threading.Timer | None = None self._lock = threading.Lock() self._last_args = None self._last_kwargs = None def __call__(self, *args, **kwargs) -> None: with self._lock: self._last_args = args self._last_kwargs = kwargs if self._timer is not None: self._timer.cancel() self._timer = threading.Timer(self.wait, self.func, args, kwargs) self._timer.start() def flush(self) -> None: """Execute any pending call immediately.""" with self._lock: if self._timer is not None: self._timer.cancel() self._timer = None if self._last_args is not None: self.func(*self._last_args, **self._last_kwargs) self._last_args = None self._last_kwargs = None def cancel(self) -> None: """Cancel any pending call without executing it.""" with self._lock: if self._timer is not None: self._timer.cancel() self._timer = None self._last_args = None self._last_kwargs = None VoidFunction = TypeVar('VoidFunction', bound=Callable[..., None])
[docs] def debounce(wait: float): """Decorator to debounce function calls. Waits ``wait`` seconds before calling function, cancels if called again. :param float wait: Seconds to wait before executing. :returns: Decorator function. """ def wrapper(func: VoidFunction) -> VoidFunction: if wait <= 0: return func return cast(VoidFunction, Debouncer(func, wait)) return wrapper
[docs] def wait_until( hour: int, minute: int = 0, second: int = 0, tz: datetime.tzinfo | None = timezone.utc, time_unit: str = 'milliseconds' ) -> int: """Calculate time to wait until specified hour/minute/second. :param int hour: Hour (0-23). :param int minute: Minute (0-59). :param int second: Second (0-59). :param tz: Timezone (default: UTC). :param str time_unit: Return unit ('seconds' or 'milliseconds'). :returns: Time to wait in specified unit. :rtype: int :raises ValueError: If hour/minute/second out of range. """ assert time_unit in {'seconds', 'milliseconds'} if not (0 <= hour <= 23): raise ValueError(f'hour must be between 0 and 23, got {hour}') if not (0 <= minute <= 59): raise ValueError(f'minute must be between 0 and 59, got {minute}') if not (0 <= second <= 59): raise ValueError(f'second must be between 0 and 59, got {second}') this = datetime.datetime.now(tz=tz) then = datetime.datetime(this.year, this.month, this.day, hour, minute, second, tzinfo=tz) if this >= then: then += timedelta(days=1) return math.ceil((then - this).total_seconds()) * (1000 if time_unit == 'milliseconds' else 1)
[docs] class timeout: """Context manager for timing out potentially hanging code. :param int seconds: Timeout in seconds (default: 100). :param str error_message: Message for timeout error. .. warning:: Uses SIGALRM and only works on Unix/Linux systems. """ def __init__(self, seconds: int = 100, error_message: str = 'Timeout!!'): self.seconds = seconds self.error_message = error_message self._previous_handler = None
[docs] def handle_timeout(self, signum, frame): raise OSError(self.error_message)
def __enter__(self): self._previous_handler = signal.signal(signal.SIGALRM, self.handle_timeout) signal.alarm(self.seconds) return self def __exit__(self, type, value, traceback): signal.alarm(0) if self._previous_handler is not None: signal.signal(signal.SIGALRM, self._previous_handler) return False