Source code for libb.sync

from __future__ import annotations

import datetime
import math
import signal
import threading
import time
import warnings
from collections.abc import Callable
from datetime import timedelta, timezone
from typing import Any, TypeVar, cast

__all__ = [
    'syncd',
    'NonBlockingDelay',
    'delay',
    'debounce',
    'wait_until',
    'timeout',
]


[docs] def syncd(lock): """Decorator to synchronize functions with a shared lock. :param lock: Threading lock to acquire during function execution. :returns: Decorator function. Example:: >>> import threading >>> lock = threading.Lock() >>> @syncd(lock) ... def safe_increment(counter): ... return counter + 1 >>> safe_increment(0) 1 """ def wrap(f): def new_function(*args, **kw): lock.acquire() try: return f(*args, **kw) finally: lock.release() return new_function return wrap
[docs] class NonBlockingDelay: """Non-blocking delay for checking time elapsed.""" def __init__(self): self._timestamp = 0.0 self._delay = 0.0
[docs] def timeout(self): """Check if the delay time has elapsed. :returns: True if time is up. :rtype: bool """ return (time.monotonic() - self._timestamp) > self._delay
[docs] def delay(self, delay: float) -> None: """Start a non-blocking delay. :param float delay: Delay duration in seconds. """ self._timestamp = time.monotonic() self._delay = delay
[docs] def delay(seconds: float) -> None: """Delay non-blocking for N seconds (busy-wait). .. deprecated:: Use time.sleep() for efficient blocking delays. This function is kept for backward compatibility. """ warnings.warn( 'delay() is deprecated, use time.sleep() instead', DeprecationWarning, stacklevel=2 ) d = NonBlockingDelay() d.delay(seconds) while not d.timeout(): continue
class Debouncer: """Debounce handler used by :func:`debounce` decorator.""" def __init__(self, func: Callable[..., Any], wait: float): self.func = func self.wait = wait self._timer: threading.Timer | None = None self._lock = threading.Lock() self._last_args = None self._last_kwargs = None def __call__(self, *args, **kwargs) -> None: with self._lock: self._last_args = args self._last_kwargs = kwargs if self._timer is not None: self._timer.cancel() self._timer = threading.Timer(self.wait, self.func, args, kwargs) self._timer.start() def flush(self) -> None: """Execute any pending call immediately.""" with self._lock: if self._timer is not None: self._timer.cancel() self._timer = None if self._last_args is not None: self.func(*self._last_args, **self._last_kwargs) self._last_args = None self._last_kwargs = None def cancel(self) -> None: """Cancel any pending call without executing it.""" with self._lock: if self._timer is not None: self._timer.cancel() self._timer = None self._last_args = None self._last_kwargs = None VoidFunction = TypeVar('VoidFunction', bound=Callable[..., None])
[docs] def debounce(wait: float): """Decorator to debounce function calls. Waits ``wait`` seconds before calling function, cancels if called again. :param float wait: Seconds to wait before executing. :returns: Decorator function. """ def wrapper(func: VoidFunction) -> VoidFunction: if wait <= 0: return func return cast(VoidFunction, Debouncer(func, wait)) return wrapper
[docs] def wait_until( hour: int, minute: int = 0, second: int = 0, tz: datetime.tzinfo | None = timezone.utc, time_unit: str = 'milliseconds' ) -> int: """Calculate time to wait until specified hour/minute/second. :param int hour: Hour (0-23). :param int minute: Minute (0-59). :param int second: Second (0-59). :param tz: Timezone (default: UTC). :param str time_unit: Return unit ('seconds' or 'milliseconds'). :returns: Time to wait in specified unit. :rtype: int :raises ValueError: If hour/minute/second out of range. """ assert time_unit in {'seconds', 'milliseconds'} if not (0 <= hour <= 23): raise ValueError(f'hour must be between 0 and 23, got {hour}') if not (0 <= minute <= 59): raise ValueError(f'minute must be between 0 and 59, got {minute}') if not (0 <= second <= 59): raise ValueError(f'second must be between 0 and 59, got {second}') this = datetime.datetime.now(tz=tz) then = datetime.datetime(this.year, this.month, this.day, hour, minute, second, tzinfo=tz) if this >= then: then += timedelta(days=1) return math.ceil((then - this).total_seconds()) * (1000 if time_unit == 'milliseconds' else 1)
[docs] class timeout: """Context manager for timing out potentially hanging code. :param int seconds: Timeout in seconds (default: 100). :param str error_message: Message for timeout error. .. warning:: Uses SIGALRM and only works on Unix/Linux systems. """ def __init__(self, seconds: int = 100, error_message: str = 'Timeout!!'): self.seconds = seconds self.error_message = error_message self._previous_handler = None
[docs] def handle_timeout(self, signum, frame): raise OSError(self.error_message)
def __enter__(self): self._previous_handler = signal.signal(signal.SIGALRM, self.handle_timeout) signal.alarm(self.seconds) return self def __exit__(self, type, value, traceback): signal.alarm(0) if self._previous_handler is not None: signal.signal(signal.SIGALRM, self._previous_handler) return False