Source code for libb.thread

from __future__ import annotations

import asyncio
import logging
import sys
import time
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from functools import partial, wraps
from threading import Lock, Thread
from typing import Any

if sys.version_info >= (3, 11):
    from typing import Self
else:
    from typing_extensions import Self

logger = logging.getLogger(__name__)

__all__ = [
    'asyncd',
    'call_with_future',
    'RateLimitedExecutor',
    'TaskRequest',
    'TaskResponse',
    'threaded',
]


[docs] @dataclass class TaskRequest: """Request to process an item with optional ID for tracking. :param item: The item to process. :param id: Optional identifier for tracking. """ item: Any id: Any = None
[docs] @dataclass class TaskResponse: """Response from processing a request. :param result: The result from processing. :param request: The original TaskRequest. :param exception: Any exception that occurred (None if successful). """ result: Any request: TaskRequest exception: Exception = None @property def success(self) -> bool: """Check if task completed successfully. :returns: True if no exception occurred. :rtype: bool """ return self.exception is None
[docs] def asyncd(func): """Decorator to run synchronous function asynchronously. :param func: Synchronous function to wrap. :returns: Async wrapper function. .. note:: Based on https://stackoverflow.com/a/50450553 """ @wraps(func) async def run(*args, loop=None, executor=None, **kwargs): if loop is None: loop = asyncio.get_event_loop() pfunc = partial(func, *args, **kwargs) return await loop.run_in_executor(executor, pfunc) return run
[docs] def call_with_future(fn, future, args, kwargs): """Call function and set result on future. :param fn: Function to call. :param future: Future to set result on. :param args: Positional arguments for fn. :param kwargs: Keyword arguments for fn. """ try: result = fn(*args, **kwargs) future.set_result(result) except Exception as exc: future.set_exception(exc)
[docs] class RateLimitedExecutor: """Thread pool executor with rate limiting and Request/Response API. Provides clean request/response API where every response includes the original request for easy result tracking and exception handling. Basic Usage:: with RateLimitedExecutor(max_workers=10, max_per_second=5, show_progress=True) as executor: responses = executor.execute_items(process_fn, items, desc='Processing') for response in responses: if response.success: print(f"Item {response.request.id}: {response.result}") else: print(f"Item {response.request.id} failed: {response.exception}") Advanced Usage with Custom IDs:: requests = [TaskRequest(item=x, id=f'custom_{i}') for i, x in enumerate(items)] responses = executor.execute(process_fn, requests, desc='Processing') result_map = {r.request.id: r.result for r in responses if r.success} """
[docs] def __init__(self, max_workers: int = None, max_per_second: float = float('inf'), show_progress: bool = False): """Initialize rate-limited executor. :param int max_workers: Maximum concurrent threads. :param float max_per_second: Maximum calls per second. :param bool show_progress: Display progress bar during execution. """ self._executor = ThreadPoolExecutor(max_workers=max_workers) self.max_per_second = max_per_second self.show_progress = show_progress self._call_times = [] self._lock = Lock() logger.info(f'Initialized RateLimitedExecutor: {max_per_second} req/sec, {max_workers} workers')
[docs] def execute(self, fn: Callable, requests: list[TaskRequest], desc: str = 'Processing', unit: str = 'item') -> list[TaskResponse]: """Execute function on all requests and return responses in order. :param fn: Function that takes request.item and returns a result. :param list requests: List of TaskRequest objects to process. :param str desc: Description for progress bar. :param str unit: Unit name for progress bar. :returns: List of TaskResponse objects in same order as requests. :rtype: list[TaskResponse] """ from tqdm import tqdm start_time = time.time() futures = {} for i, request in enumerate(requests): future = self.submit(fn, request.item) futures[future] = (i, request) responses = [None] * len(requests) future_iter = as_completed(futures.keys()) if self.show_progress: bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]' future_iter = tqdm(future_iter, total=len(futures), desc=desc, unit=unit, bar_format=bar_format) for future in future_iter: index, request = futures[future] try: result = future.result() responses[index] = TaskResponse(result=result, request=request) except Exception as exc: responses[index] = TaskResponse(result=None, request=request, exception=exc) elapsed = time.time() - start_time success_count = sum(1 for r in responses if r.success) logger.debug(f'Executed {len(requests)} tasks in {elapsed:.2f}s: {success_count} ' f'succeeded, {len(requests) - success_count} failed') return responses
[docs] def execute_items(self, fn: Callable, items: list, desc: str = 'Processing', unit: str = 'item') -> list[TaskResponse]: """Execute function on items with auto-generated request IDs. :param fn: Function that takes an item and returns a result. :param list items: List of items to process. :param str desc: Description for progress bar. :param str unit: Unit name for progress bar. :returns: List of TaskResponse objects with request.id = index. :rtype: list[TaskResponse] """ requests = [TaskRequest(item=item, id=i) for i, item in enumerate(items)] return self.execute(fn, requests, desc=desc, unit=unit)
def _enforce_rate_limit(self) -> None: """Block until rate limit allows another call.""" if self.max_per_second == float('inf'): return with self._lock: now = time.time() cutoff_time = now - 1.0 self._call_times = [t for t in self._call_times if t > cutoff_time] if len(self._call_times) >= self.max_per_second: sleep_time = 1.0 - (now - self._call_times[0]) + 0.01 if sleep_time > 0: time.sleep(sleep_time) now = time.time() cutoff_time = now - 1.0 self._call_times = [t for t in self._call_times if t > cutoff_time] self._call_times.append(now)
[docs] def submit(self, fn, *args, **kwargs) -> Future: """Submit a callable to be executed with rate limiting. :param fn: Callable to execute. :param args: Positional arguments. :param kwargs: Keyword arguments. :returns: Future representing the result. :rtype: Future """ self._enforce_rate_limit() return self._executor.submit(fn, *args, **kwargs)
[docs] def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None: """Shutdown the executor. :param bool wait: Wait for pending futures to complete. :param bool cancel_futures: Cancel pending futures. """ self._executor.shutdown(wait=wait, cancel_futures=cancel_futures)
def __enter__(self) -> Self: return self def __exit__(self, exc_type, exc_val, exc_tb) -> bool: self.shutdown(wait=True) return False
[docs] def threaded(fn): """Decorator to run function in a separate thread. Returns a Future that can be used to get the result. .. note:: Based on https://stackoverflow.com/a/19846691 Example:: >>> class MyClass: ... @threaded ... def get_my_value(self): ... return 1 >>> my_obj = MyClass() >>> fut = my_obj.get_my_value() # this will run in a separate thread >>> fut.result() # will block until result is computed 1 """ def wrapper(*args, **kwargs): future = Future() Thread(target=call_with_future, args=(fn, future, args, kwargs)).start() return future return wrapper
if __name__ == '__main__': __import__('doctest').testmod()