from __future__ import annotations
import itertools
import logging
from collections.abc import Iterable, Sequence
import more_itertools as _more_itertools
from libb._libb import backfill, backfill_iterdict, collapse, same_order
logger = logging.getLogger(__name__)
__all__ = [
'chunked',
'chunked_even',
'collapse',
'compact',
'grouper',
'hashby',
'infinite_iterator',
'iscollection',
'isiterable',
'issequence',
'partition',
'peel',
'roundrobin',
'rpeel',
'unique',
'unique_iter',
'same_order',
'coalesce',
'getitem',
'backfill',
'backfill_iterdict',
'align_iterdict',
]
#: Split iterable into chunks of length n. See :func:`more_itertools.chunked`.
chunked = _more_itertools.chunked
#: Split iterable into n chunks of roughly equal size. See :func:`more_itertools.chunked_even`.
chunked_even = _more_itertools.chunked_even
#: Collect data into fixed-length chunks. See :func:`more_itertools.grouper`.
grouper = _more_itertools.grouper
#: Partition items into those where pred is False/True. See :func:`more_itertools.partition`.
partition = _more_itertools.partition
#: Interleave items from multiple iterables. See :func:`more_itertools.roundrobin`.
roundrobin = _more_itertools.roundrobin
#: Yield unique elements, preserving order. See :func:`more_itertools.unique_everseen`.
unique_iter = _more_itertools.unique_everseen
[docs]
def isiterable(obj):
"""Check if object is iterable (excluding strings).
:param obj: Object to check.
:returns: True if iterable and not a string.
:rtype: bool
Example::
>>> isiterable([])
True
>>> isiterable(tuple())
True
>>> isiterable(object())
False
>>> isiterable('foo')
False
Note: DataFrames and arrays are iterable::
>>> import pandas as pd
>>> isiterable(pd.DataFrame([['foo', 1]], columns=['key', 'val']))
True
>>> import numpy as np
>>> isiterable(np.array([1,2,3]))
True
"""
return isinstance(obj, Iterable) and not isinstance(obj, str)
[docs]
def issequence(obj):
"""Check if object is a sequence (excluding strings).
:param obj: Object to check.
:returns: True if sequence and not a string.
:rtype: bool
Example::
>>> issequence([])
True
>>> issequence(tuple())
True
>>> issequence('foo')
False
>>> issequence(object())
False
Note: DataFrames and arrays are NOT sequences::
>>> import pandas as pd
>>> issequence(pd.DataFrame([['foo', 1]], columns=['key', 'val']))
False
>>> import numpy as np
>>> issequence(np.array([1,2,3]))
False
"""
return isinstance(obj, Sequence) and not isinstance(obj, str)
[docs]
def iscollection(obj):
"""Check if object is a collection (iterable and not a string).
:param obj: Object to check.
:returns: True if collection.
:rtype: bool
Example::
>>> iscollection(object())
False
>>> iscollection(range(10))
True
>>> iscollection('hello')
False
"""
return isiterable(obj) and not isinstance(obj, str)
[docs]
def unique(iterable, key=None):
"""Remove duplicate elements while preserving order.
Unlike `more_itertools.unique <https://more-itertools.readthedocs.io/en/
stable/api.html#more_itertools.unique>`_, this preserves the original
insertion order rather than returning elements in sorted order. Internally
uses :func:`more_itertools.unique_everseen`. Returns a list instead of a
generator.
:param iterable: Iterable to deduplicate.
:param key: Optional function to compute uniqueness key.
:returns: List of unique elements.
:rtype: list
Basic Usage::
>>> unique([9,0,2,1,0])
[9, 0, 2, 1]
With Key Function::
>>> unique(['Foo', 'foo', 'bar'], key=lambda s: s.lower())
['Foo', 'bar']
Unhashable Items (use hashing keys for better performance)::
>>> unique(([1, 2],[2, 3],[1, 2]), key=tuple)
[[1, 2], [2, 3]]
>>> unique(({1,2,3},{4,5,6},{1,2,3}), key=frozenset)
[{1, 2, 3}, {4, 5, 6}]
>>> unique(({'a':1,'b':2},{'a':3,'b':4},{'a':1,'b':2}), key=lambda x: frozenset(x.items()))
[{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]
"""
return list(unique_iter(iterable, key))
[docs]
def compact(iterable):
"""Remove falsy values from an iterable (including None and 0).
:param iterable: Iterable to filter.
:returns: Tuple of truthy values.
:rtype: tuple
.. warning::
This also removes zero!
Example::
>>> compact([0,2,3,4,None,5])
(2, 3, 4, 5)
"""
return tuple(item for item in iterable if item)
[docs]
def hashby(iterable, keyfunc):
"""Create a dictionary from iterable using a key function.
:param iterable: Items to hash.
:param keyfunc: Function to extract key from each item.
:returns: Dictionary mapping keys to items.
:rtype: dict
Example::
>>> items = [{'id': 1, 'name': 'a'}, {'id': 2, 'name': 'b'}]
>>> hashby(items, lambda x: x['id'])
{1: {'id': 1, 'name': 'a'}, 2: {'id': 2, 'name': 'b'}}
"""
return {keyfunc(item): item for item in iterable}
def negate_permute(*items):
"""Generate permutations of items with each value negated.
For each item, yields permutations with positive and negative versions.
:param items: Items to permute.
:yields: Tuples of permuted values.
Example::
>>> next(negate_permute(1, 2))
(-1, 1, -2, 2)
>>> next(negate_permute(-float('inf'), 0))
(inf, -inf, 0, 0)
"""
yield from itertools.permutations(itertools.chain(*((-a, a) for a in items)))
[docs]
def infinite_iterator(iterable):
"""Create an iterator that cycles infinitely through items.
:param iterable: Sequence to cycle through.
:returns: Generator that cycles forever.
Example::
>>> ii = infinite_iterator([1,2,3,4,5])
>>> [next(ii) for i in range(9)]
[1, 2, 3, 4, 5, 1, 2, 3, 4]
"""
global i
i = 0
def next():
global i
while True:
n = iterable[i % len(iterable)]
i += 1
yield n
return next()
# collapse is now implemented in Rust
# See libb._libb for the implementation
[docs]
def peel(str_or_iter):
"""Peel iterator one by one, yield item, aliasor item, item
>>> list(peel(["a", ("", "b"), "c"]))
[('a', 'a'), ('', 'b'), ('c', 'c')]
"""
things = (_ for _ in str_or_iter)
while things:
try:
this = next(things)
except StopIteration:
return
if isinstance(this, (tuple, list)):
yield this
else:
yield this, this
[docs]
def rpeel(str_or_iter):
"""Peel iterator one by one, yield alias if tuple, else item"
>>> list(rpeel(["a", ("", "b"), "c"]))
['a', 'b', 'c']
"""
things = (_ for _ in str_or_iter)
while things:
try:
this = next(things)
except StopIteration:
return
if isinstance(this, (tuple, list)):
yield this[-1]
else:
yield this
# same_order is now implemented in Rust
# See libb._libb for the implementation
[docs]
def coalesce(*args):
"""Return first non-None value.
Example::
>>> coalesce(None, None, 1, 2)
1
>>> coalesce(None, None) is None
True
>>> coalesce(0, 1, 2)
0
"""
return next((a for a in args if a is not None), None)
[docs]
def getitem(sequence, index, default=None):
"""Safe sequence indexing with default value
>>> getitem([1, 2, 3], 1)
2
>>> getitem([1, 2, 3], 10) is None
True
>>> getitem([1, 2, 3], -1)
3
>>> getitem([1, 2, 3], -100) is None
True
"""
try:
return sequence[index]
except IndexError:
return default
# backfill is now implemented in Rust
# See libb._libb for the implementation
# backfill_iterdict is now implemented in Rust
# See libb._libb for the implementation
[docs]
def align_iterdict(iterdict_a, iterdict_b, **kw):
"""Given two lists of dicts ('iterdicts'), sorted on some attribute,
build a single list with dicts, with keys within a given tolerance
anything that cannot be aligned is DROPPED
>>> list(zip(*align_iterdict(
... [{'a': 1}, {'a': 2}, {'a': 5}],
... [{'b': 5}],
... a='a',
... b='b',
... diff=lambda x, y: x - y,
... )))
[({'a': 5},), ({'b': 5},)]
>>> list(zip(*align_iterdict(
... [{'b': 5}],
... [{'a': 1}, {'a': 2}, {'a': 5}],
... a='b',
... b='a',
... diff=lambda x, y: x - y
... )))
[({'b': 5},), ({'a': 5},)]
"""
attr_a = kw.get('a', 'date')
attr_b = kw.get('b', 'date')
tolerance = kw.get('tolerance', 0)
diff = kw.get('diff', lambda x, y: (x - y).days)
gen_a, gen_b = (_ for _ in iterdict_a), (_ for _ in iterdict_b)
this_a, this_b = None, None
while gen_a or gen_b:
if not this_a or diff(this_a.get(attr_a), this_b.get(attr_b)) < tolerance:
try:
this_a = next(gen_a)
except StopIteration:
break
logger.debug(f'Advanced A to {this_a.get(attr_a)}')
if not this_b or diff(this_a.get(attr_a), this_b.get(attr_b)) > tolerance:
try:
this_b = next(gen_b)
except StopIteration:
break
logger.debug(f'Advanced B to {this_b.get(attr_b)}')
if abs(diff(this_a.get(attr_a), this_b.get(attr_b))) <= tolerance:
logger.debug(f'Aligned iters to A {this_a.get(attr_a)} B {this_b.get(attr_b)}')
yield this_a, this_b
try:
this_a, this_b = next(gen_a), next(gen_b)
except StopIteration:
break
if __name__ == '__main__':
__import__('doctest').testmod(optionflags=4 | 8 | 32)