def combinations(pool, r: int): """ Return successive r-length combinations of elements in the iterable. combinations(range(4), 3) --> (0,1,2), (0,1,3), (0,2,3), (1,2,3) """ def combinations_helper(pool_list, r): n = len(pool_list) if r > n: return indices = list(range(r)) yield [pool_list[i] for i in indices] while True: b = -1 for i in reversed(range(r)): if indices[i] != i + n - r: b = i break if b == -1: return indices[b] += 1 for j in range(b+1, r): indices[j] = indices[j-1] + 1 yield [pool_list[i] for i in indices] if r < 0: raise ValueError("r must be non-negative") if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"): return combinations_helper(pool, r) else: return combinations_helper([a for a in pool], r) def combinations_with_replacement(pool, r: int): """ Return successive r-length combinations of elements in the iterable allowing individual elements to have successive repeats. """ def combinations_with_replacement_helper(pool_list, r): n = len(pool_list) if not n and r: return indices = [0 for _ in range(r)] yield [pool_list[i] for i in indices] while True: b = -1 for i in reversed(range(r)): if indices[i] != n - 1: b = i break if b == -1: return newval = indices[b] + 1 for j in range(r - b): indices[b+j] = newval yield [pool_list[i] for i in indices] if r < 0: raise ValueError("r must be non-negative") if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"): return combinations_with_replacement_helper(pool, r) else: return combinations_with_replacement_helper([a for a in pool], r) @tuple class islice: """ Make an iterator that returns selected elements from the iterable. """ def __new__[T](iterable: Generator[T], stop: Optional[int]): if stop and ~stop < 0: raise ValueError("Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize.") i = 0 for x in iterable: if stop and i >= ~stop: break yield x i += 1 def __new__[T](iterable: Generator[T], start: Optional[int], stop: Optional[int], step: Optional[int] = None): from sys import maxsize startx = 0 stopx = maxsize stepx = 1 have_stop = False if start: startx = ~start if stop: stopx = ~stop if step: stepx = ~step if startx < 0 or stopx < 0: raise ValueError("Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize.") elif stepx < 0: raise ValueError("Step for islice() must be a positive integer or None.") it = range(startx, stopx, stepx) N = len(it) idx = 0 b = -1 if N == 0: for i, element in zip(range(start), iterable): pass return nexti = it[0] for i, element in enumerate(iterable): if i == nexti: yield element idx += 1 if idx >= N: b = i break nexti = it[idx] if b >= 0: for i, element in zip(range(b + 1, stopx), iterable): pass @inline def count(start = 0, step = 1): """ Return a count object whose ``__next__`` method returns consecutive values. """ n = start while True: yield n n += step @inline def repeat(object, times: Optional[int] = None): """ Make an iterator that returns a given object over and over again. """ if times is None: while True: yield object else: for i in range(~times): yield object @inline def cycle(iterable): """ Cycles repeatedly through an iterable. """ saved = [] for element in iterable: yield element saved.append(element) while saved: for element in saved: yield element @inline def compress(data, selectors): """ Return data elements corresponding to true selector elements. Forms a shorter iterator from selected data elements using the selectors to choose the data elements. """ for d,s in zip(data, selectors): if s: yield d @inline def dropwhile(predicate, iterable): """ Drop items from the iterable while predicate(item) is true. Afterwards, return every element until the iterable is exhausted. """ b = False for x in iterable: if not b and not predicate(x): b = True if b: yield x @inline def takewhile(predicate, iterable): """ Return successive entries from an iterable as long as the predicate evaluates to true for each entry. """ for x in iterable: if predicate(x): yield x else: break @inline def filterfalse(predicate, iterable): """ Return those items of iterable for which function(item) is false. """ for x in iterable: if not predicate(x): yield x def permutations(pool, r: Optional[int] = None): """ Return successive r-length permutations of elements in the iterable. """ def permutations_helper(pool_list, r): n = len(pool_list) rx = ~r if r else n if rx > n: return indices = list(range(n)) cycles = list(range(n, n - rx, -1)) yield [pool_list[i] for i in indices[:rx]] while n: b = -1 for i in reversed(range(rx)): cycles[i] -= 1 if cycles[i] == 0: indices = indices[:i] + indices[i+1:] + indices[i:i+1] cycles[i] = n - i else: b = i j = cycles[i] indices[i], indices[-j] = indices[-j], indices[i] yield [pool_list[i] for i in indices[:rx]] break if b == -1: return if r is not None and ~r < 0: raise ValueError("r must be non-negative") if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"): return permutations_helper(pool, r) else: return permutations_helper([a for a in pool], r) @tuple class accumulate: """ Make an iterator that returns accumulated sums, or accumulated results of other binary functions (specified via the optional func argument). """ @inline def __new__(iterable, func = lambda a, b: a + b, initial = 0): total = initial yield total for element in iterable: total = func(total, element) yield total @inline def __new__(iterable, func = lambda a, b: a + b): total = None for element in iterable: total = element if total is None else func(~total, element) yield ~total @tuple class chain: """ Make an iterator that returns elements from the first iterable until it is exhausted, then proceeds to the next iterable, until all of the iterables are exhausted. """ @inline def __new__(*iterables): for it in iterables: for element in it: yield element @inline def from_iterable(iterables): for it in iterables: for element in it: yield element @inline def starmap(function, iterable): """ Return an iterator whose values are returned from the function evaluated with an argument tuple taken from the given sequence. """ for args in iterable: yield function(*args) # TODO: fix this once Optional[Callable] lands @inline def groupby(iterable, key = Optional[int]()): """ Make an iterator that returns consecutive keys and groups from the iterable. """ currkey = None group = [] for currvalue in iterable: k = currvalue if isinstance(key, Optional) else key(currvalue) if currkey is None: currkey = k if k != ~currkey: yield ~currkey, group currkey = k group = [] group.append(currvalue) if currkey is not None: yield ~currkey, group @tuple class zip_longest: """ Make an iterator that aggregates elements from each of the iterables. If the iterables are of uneven length, missing values are filled-in with fillvalue. Iteration continues until the longest iterable is exhausted. """ @inline def __new__(*args, fillvalue): if staticlen(args) == 2: a = iter(args[0]) b = iter(args[1]) a_done = False b_done = False while not a.done(): a_val = a.next() b_val = fillvalue if not b_done: b_done = b.done() if not b_done: b_val = b.next() yield a_val, b_val if not b_done: while not b.done(): yield fillvalue, b.next() a.destroy() b.destroy() else: iterators = tuple(iter(it) for it in args) num_active = len(iterators) if not num_active: return while True: values = [] for it in iterators: if it.__done__(): # already done values.append(fillvalue) elif it.done(): # resume and check num_active -= 1 if not num_active: return values.append(fillvalue) else: values.append(it.next()) yield values @inline def __new__(*args): def get_next(it): if it.__done__() or it.done(): return None return it.next() iters = tuple(iter(arg) for arg in args) while True: done_count = 0 result = tuple(get_next(it) for it in iters) all_none = True for a in result: if a is not None: all_none = False if all_none: return yield result for it in iters: it.destroy() @tuple class product: """ Cartesian product of input iterables. """ @inline def __new__(*args): if staticlen(args) == 0: yield () else: for a in args[0]: rest = args[1:] for b in product(*rest): yield (a, *b) @inline def __new__(*args, repeat: int): if repeat < 0: raise ValueError("repeat argument cannot be negative") pools = [list(pool) for _ in range(repeat) for pool in args] result = [List[type(pools[0][0])]()] for pool in pools: result = [x+[y] for x in result for y in pool] for prod in result: yield prod def tee[T](iterable: Generator[T], n: int = 2): """ Return n independent iterators from a single iterable. """ from collections import deque it = iter(iterable) deques = [deque[T]() for i in range(n)] def gen(mydeque): while True: if not mydeque: # when the local deque is empty if it.__done__(): return it.__resume__() if it.__done__(): return newval = it.next() for d in deques: # load it to all the deques d.append(newval) yield mydeque.popleft() return [gen(d) for d in deques]