Source code for libb.dicts

from __future__ import annotations

import copy
import inspect
import itertools
import logging
from collections.abc import Mapping
from contextlib import contextmanager
from typing import Any

from trace_dkey import trace

from libb._rust import multikeysort as _multikeysort_impl

logger = logging.getLogger(__name__)

__all__ = [
    'ismapping',
    'invert',
    'mapkeys',
    'mapvals',
    'flatten',
    'unnest',
    'replacekey',
    'replaceattr',
    'cmp',
    'multikeysort',
    'map',
    'get_attrs',
    'trace_key',
    'trace_value',
    'add_branch',
    'merge_dict',
]


[docs] def ismapping(something): """Check if something is a mapping (dict-like). :param something: Object to check. :returns: True if the object is a mapping. :rtype: bool Example:: >>> ismapping(dict()) True """ return isinstance(something, Mapping)
[docs] def invert(dct): """Invert a dictionary, swapping keys and values. :param dict dct: Dictionary to invert. :returns: New dictionary with keys and values swapped. :rtype: dict Example:: >>> invert({'a': 1, 'b': 2}) {1: 'a', 2: 'b'} """ return {v: k for k, v in list(dct.items())}
[docs] def mapkeys(func, dct): """Apply a function to all keys in a dictionary. :param func: Function to apply to each key. :param dict dct: Dictionary to transform. :returns: New dictionary with transformed keys. :rtype: dict Example:: >>> mapkeys(str.upper, {'a': 1, 'b': 2}) {'A': 1, 'B': 2} """ return {func(key): val for key, val in list(dct.items())}
[docs] def mapvals(func, dct): """Apply a function to all values in a dictionary. :param func: Function to apply to each value. :param dict dct: Dictionary to transform. :returns: New dictionary with transformed values. :rtype: dict Example:: >>> mapvals(lambda x: x * 2, {'a': 1, 'b': 2}) {'a': 2, 'b': 4} """ return {key: func(val) for key, val in list(dct.items())}
[docs] def flatten(kv, prefix=None): """Flatten a dictionary, recursively flattening nested dicts. Unlike `more_itertools.flatten <https://more-itertools.readthedocs.io/en/ stable/api.html#more_itertools.flatten>`_, this operates on dictionaries rather than iterables. It recursively flattens nested dict keys by joining them with underscores (e.g., ``{'a': {'b': 1}}`` becomes ``('a_b', 1)``), whereas more_itertools.flatten removes one level of nesting from a list of lists. :param dict kv: Dictionary to flatten. :param list prefix: Internal prefix list for recursion (do not set manually). :yields: Tuples of (flattened_key, value). Example:: >>> data = [ ... {'event': 'User Clicked', 'properties': {'user_id': '123', 'page_visited': 'contact_us'}}, ... {'event': 'User Clicked', 'properties': {'user_id': '456', 'page_visited': 'homepage'}}, ... {'event': 'User Clicked', 'properties': {'user_id': '789', 'page_visited': 'restaurant'}} ... ] >>> from pandas import DataFrame >>> df = DataFrame({k:v for k,v in flatten(kv)} for kv in data) >>> list(df) ['event', 'properties_user_id', 'properties_page_visited'] >>> len(df) 3 """ if prefix is None: prefix = [] for k, v in list(kv.items()): if isinstance(v, dict): yield from flatten(v, prefix + [str(k)]) elif prefix: yield '_'.join(prefix + [str(k)]), v else: yield str(k), v
[docs] def unnest(d, keys=None): """Recursively convert dict into list of tuples. :param dict d: Dictionary to unnest. :param list keys: Internal key accumulator (do not set manually). :returns: List of tuples representing paths to leaf values. :rtype: list Example:: >>> unnest({'a': {'b': 1}, 'c': 2}) [('a', 'b', 1), ('c', 2)] """ if keys is None: keys = [] result = [] for k, v in d.items(): if isinstance(v, dict): result.extend(unnest(v, keys + [k])) else: result.append(tuple(keys + [k, v])) return result
[docs] @contextmanager def replacekey(d, key, newval): """Context manager for temporarily patching a dictionary value. :param dict d: Dictionary to patch. :param key: Key to temporarily replace. :param newval: Temporary value to set. Basic Usage:: >>> f = dict(x=13) >>> with replacekey(f, 'x', 'pho'): ... f['x'] 'pho' >>> f['x'] 13 If the dict does not have the key set before, we return to that state:: >>> import os, sys >>> rand_key = str(int.from_bytes(os.urandom(10), sys.byteorder)) >>> with replacekey(os.environ, rand_key, '22'): ... os.environ[rand_key]=='22' True >>> rand_key in os.environ False """ wasset = key in d oldval = d.get(key) d[key] = newval yield if wasset: d[key] = oldval else: del d[key]
[docs] @contextmanager def replaceattr(obj, attrname, newval): """Context manager for temporarily monkey patching an object attribute. :param obj: Object to patch. :param str attrname: Attribute name to temporarily replace. :param newval: Temporary value to set. Basic Usage:: >>> class Foo: pass >>> f = Foo() >>> f.x = 13 >>> with replaceattr(f, 'x', 'pho'): ... f.x 'pho' >>> f.x 13 If the obj did not have the attr set, we remove it:: >>> with replaceattr(f, 'y', 'boo'): ... f.y=='boo' True >>> hasattr(f, 'y') False """ wasset = hasattr(obj, attrname) oldval = getattr(obj, attrname, None) setattr(obj, attrname, newval) yield if wasset: setattr(obj, attrname, oldval) else: delattr(obj, attrname)
[docs] def cmp(left, right): """Python 2 style cmp function with null value handling. Handles null values gracefully in sort comparisons. :param left: First value to compare. :param right: Second value to compare. :returns: -1 if left < right, 0 if equal, 1 if left > right. :rtype: int Example:: >>> cmp(None, 2) -1 >>> cmp(2, None) 1 >>> cmp(-1, 2) -1 >>> cmp(2, -1) 1 >>> cmp(1, 1) 0 """ _cmp = lambda a, b: (a > b) - (a < b) try: _ = iter(left) and iter(right) if None in left and None in right: return 0 if None in left and None not in right: return -1 if None not in left and None in right: return 1 return _cmp(left, right) except TypeError: pass if left is None and right is None: return 0 if left is None and right is not None: return -1 if left is not None and right is None: return 1 return _cmp(left, right)
[docs] def multikeysort(items: list[dict], columns, inplace=False): """Sort list of dictionaries by list of keys. Equivalent to SQL ``ORDER BY`` - use no prefix for ascending, ``-`` prefix for descending. :param list items: List of dictionaries to sort. :param columns: List of column names to sort by (prefix with ``-`` for descending). :param bool inplace: If True, sort in place; otherwise return new sorted list. :returns: Sorted list if inplace=False, otherwise None. Basic Usage:: >>> ds = [ ... {'category': 'c1', 'total': 96.0}, ... {'category': 'c2', 'total': 96.0}, ... {'category': 'c3', 'total': 80.0}, ... {'category': 'c4', 'total': None}, ... {'category': 'c5', 'total': 80.0}, ... ] >>> asc = multikeysort(ds, ['total', 'category']) >>> total = [_['total'] for _ in asc] >>> assert all([cmp(total[i], total[i+1]) in (0,-1,) ... for i in range(len(total)-1)]) Missing Columns are Ignored:: >>> us = multikeysort(ds, ['missing',]) >>> assert us[0]['total'] == 96.0 >>> assert us[1]['total'] == 96.0 >>> assert us[2]['total'] == 80.0 >>> assert us[3]['total'] == None >>> assert us[4]['total'] == 80.0 None Columns are Handled:: >>> us = multikeysort(ds, None) >>> assert us[0]['total'] == 96.0 >>> assert us[1]['total'] == 96.0 >>> assert us[2]['total'] == 80.0 >>> assert us[3]['total'] == None >>> assert us[4]['total'] == 80.0 Descending Order with Inplace:: >>> multikeysort(ds, ['-total', 'category'], inplace=True) # desc >>> total = [_['total'] for _ in ds] >>> assert all([cmp(total[i], total[i+1]) in (0, 1,) ... for i in range(len(total)-1)]) """ return _multikeysort_impl(items, columns, inplace)
[docs] def map(func, *iterables): """Simulate a Python 2-like map with longest iterable behavior. Continues until the longest of the argument iterables is exhausted, extending the other arguments with None. :param func: Function to apply (or None for tuple aggregation). :param iterables: Iterables to map over. :returns: Iterator of mapped results. Example:: >>> def foo(a, b): ... if b is not None: ... return a - b ... return -a >>> list(map(foo, range(5), [3,2,1])) [-3, -1, 1, -3, -4] """ zipped = itertools.zip_longest(*iterables) if func is None: return zipped return itertools.starmap(func, zipped)
[docs] def get_attrs(klazz): """Get class attributes (excluding methods and dunders). :param type klazz: Class to inspect. :returns: List of (name, value) tuples for class attributes. :rtype: list Example:: >>> class MyClass(object): ... a = '12' ... b = '34' ... def myfunc(self): ... return self.a >>> get_attrs(MyClass) [('a', '12'), ('b', '34')] """ attrs = inspect.getmembers(klazz, lambda a: not (inspect.isroutine(a))) return [a for a in attrs if not (a[0].startswith('__') and a[0].endswith('__'))]
[docs] def trace_key(d, attrname) -> list[list]: """Trace dictionary key in nested dictionary. :param dict d: Dictionary to search. :param str attrname: Key name to find. :returns: List of paths (as lists) to the key. :rtype: list[list] :raises AttributeError: If key is not found. Basic Usage:: >>> l=dict(a=dict(b=dict(c=dict(d=dict(e=dict(f=1)))))) >>> trace_key(l,'f') [['a', 'b', 'c', 'd', 'e', 'f']] Multiple Locations:: >>> l=dict(a=dict(b=dict(c=dict(d=dict(e=dict(f=1))))), f=2) >>> trace_key(l,'f') [['a', 'b', 'c', 'd', 'e', 'f'], ['f']] With Missing Key:: >>> trace_key(l, 'g') Traceback (most recent call last): ... AttributeError: g """ t = trace(d, attrname) if not t: raise AttributeError(attrname) return t
[docs] def trace_value(d, attrname) -> list: """Get values at all locations of a key in nested dictionary. :param dict d: Dictionary to search. :param str attrname: Key name to find. :returns: List of values found at each key location. :rtype: list :raises AttributeError: If key is not found. Basic Usage:: >>> l=dict(a=dict(b=dict(c=dict(d=dict(e=dict(f=1)))))) >>> trace_value(l, 'f') [1] Multiple Locations:: >>> l=dict(a=dict(b=dict(c=dict(d=dict(e=dict(f=1))))), f=2) >>> trace_value(l,'f') [1, 2] With Missing Key:: >>> trace_value(l, 'g') Traceback (most recent call last): ... AttributeError: g """ values = [] t = trace_key(d, attrname) for i, result in enumerate(t): _node = d values.append(None) for key in result: _node = _node[key] values[i] = _node return values
[docs] def add_branch(tree, vector, value): """Insert a value into a dict at the path specified by vector. Given a dict, a vector, and a value, insert the value into the dict at the tree leaf specified by the vector. Recursive! :param dict tree: The data structure to insert the vector into. :param list vector: A list of values representing the path to the leaf node. :param value: The object to be inserted at the leaf. :returns: The dict with the value placed at the path specified. :rtype: dict .. note:: Algorithm from https://stackoverflow.com/a/47276490 Algorithm: - If we're at the leaf, add it as key/value to the tree - Else: If the subtree doesn't exist, create it. - Recurse with the subtree and the left shifted vector. - Return the tree. Useful for parsing ini files with dot-delimited keys:: [app] site1.ftp.host = hostname site1.ftp.username = username site1.database.hostname = db_host Example 1:: >>> tree = {'a': 'apple'} >>> vector = ['b', 'c', 'd'] >>> value = 'dog' >>> tree = add_branch(tree, vector, value) >>> unnest(tree) [('a', 'apple'), ('b', 'c', 'd', 'dog')] Example 2:: >>> vector2 = ['b', 'c', 'e'] >>> value2 = 'egg' >>> tree = add_branch(tree, vector2, value2) >>> unnest(tree) [('a', 'apple'), ('b', 'c', 'd', 'dog'), ('b', 'c', 'e', 'egg')] """ key = vector[0] tree[key] = value \ if len(vector) == 1 \ else add_branch(tree.get(key, {}), vector[1:], value) return tree
# Define a type for dictionaries that can contain nested dictionaries DictType = dict[str, Any]
[docs] def merge_dict(old: DictType, new: DictType, inplace: bool = True) -> DictType | None: """Recursively merge two dictionaries, including nested dictionaries and iterables. This function performs a deep merge of ``new`` into ``old``, handling nested dictionaries, iterables (like lists and tuples), and type mismatches gracefully. :param dict old: The dictionary to merge into (will be modified if inplace=True). :param dict new: The dictionary to merge from (remains unchanged). :param bool inplace: If True, modifies old in place; if False, returns a new merged dict. :returns: If inplace=False, returns the merged dictionary. Otherwise, returns None. Basic Nested Merge:: >>> l1 = {'a': {'b': 1, 'c': 2}, 'b': 2} >>> l2 = {'a': {'a': 9}, 'c': 3} >>> merge_dict(l1, l2, inplace=False) {'a': {'b': 1, 'c': 2, 'a': 9}, 'b': 2, 'c': 3} >>> l1=={'a': {'b': 1, 'c': 2}, 'b': 2} True >>> l2=={'a': {'a': 9}, 'c': 3} True Multilevel Merging:: >>> xx = {'a': {'b': 1, 'c': 2}, 'b': 2} >>> nice = {'a': {'a': 9}, 'c': 3} >>> merge_dict(xx, nice) >>> 'a' in xx['a'] True >>> 'c' in xx True Values Get Overwritten:: >>> warn = {'a': {'c': 9}, 'b': 3} >>> merge_dict(xx, warn) >>> xx['a']['c'] 9 >>> xx['b'] 3 Merges Iterables (preserving types when possible):: >>> l1 = {'a': {'c': [5, 2]}, 'b': 1} >>> l2 = {'a': {'c': [1, 2]}, 'b': 3} >>> merge_dict(l1, l2) >>> len(l1['a']['c']) 4 >>> l1['b'] 3 Handles Type Mismatches (converts to lists):: >>> l1 = {'a': {'c': [5, 2]}, 'b': 1} >>> l3 = {'a': {'c': (1, 2,)}, 'b': 3} >>> merge_dict(l1, l3) >>> len(l1['a']['c']) 4 >>> isinstance(l1['a']['c'], list) True Handles None Values:: >>> l1 = {'a': {'c': None}, 'b': 1} >>> l2 = {'a': {'c': [1, 2]}, 'b': 3} >>> merge_dict(l1, l2) >>> l1['a']['c'] [1, 2] """ from libb.iter import isiterable if not inplace: old = copy.deepcopy(old) for key, new_val in new.items(): old_val = old.get(key) # Case 1: Both values are dictionaries - recursively merge if ismapping(old_val) and ismapping(new_val): merge_dict(old_val, new_val, inplace=True) continue # Case 2: Target value is None - use source value directly if old_val is None: old[key] = new_val continue # Case 3: Both values are iterables (excluding strings) - combine them if isiterable(old_val) and isiterable(new_val) and not isinstance(new_val, str): try: old[key] = old_val + new_val except (TypeError, ValueError): old[key] = list(old_val) + list(new_val) continue # Case 4: Default case - overwrite target value old[key] = new_val if not inplace: return old
if __name__ == '__main__': __import__('doctest').testmod(optionflags=4 | 8 | 32)