mirror of https://github.com/exaloop/codon.git
stdlib/itertools.codon
parent
0b1b6a6450
commit
9c36a5df14
|
@ -1,127 +1,10 @@
|
|||
def combinations(pool, r: int):
|
||||
"""
|
||||
Return successive r-length combinations of elements in the iterable.
|
||||
# (c) 2022 Exaloop Inc. All rights reserved.
|
||||
|
||||
combinations(range(4), 3) --> (0,1,2), (0,1,3), (0,2,3), (1,2,3)
|
||||
"""
|
||||
def combinations_helper(pool_list, r):
|
||||
n = len(pool_list)
|
||||
if r > n:
|
||||
return
|
||||
indices = list(range(r))
|
||||
yield [pool_list[i] for i in indices]
|
||||
while True:
|
||||
b = -1
|
||||
for i in reversed(range(r)):
|
||||
if indices[i] != i + n - r:
|
||||
b = i
|
||||
break
|
||||
if b == -1:
|
||||
return
|
||||
indices[b] += 1
|
||||
for j in range(b+1, r):
|
||||
indices[j] = indices[j-1] + 1
|
||||
yield [pool_list[i] for i in indices]
|
||||
# Infinite iterators
|
||||
|
||||
if r < 0:
|
||||
raise ValueError("r must be non-negative")
|
||||
if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"):
|
||||
return combinations_helper(pool, r)
|
||||
else:
|
||||
return combinations_helper([a for a in pool], r)
|
||||
|
||||
def combinations_with_replacement(pool, r: int):
|
||||
"""
|
||||
Return successive r-length combinations of elements in the iterable
|
||||
allowing individual elements to have successive repeats.
|
||||
"""
|
||||
def combinations_with_replacement_helper(pool_list, r):
|
||||
n = len(pool_list)
|
||||
if not n and r:
|
||||
return
|
||||
indices = [0 for _ in range(r)]
|
||||
yield [pool_list[i] for i in indices]
|
||||
while True:
|
||||
b = -1
|
||||
for i in reversed(range(r)):
|
||||
if indices[i] != n - 1:
|
||||
b = i
|
||||
break
|
||||
if b == -1:
|
||||
return
|
||||
newval = indices[b] + 1
|
||||
for j in range(r - b):
|
||||
indices[b+j] = newval
|
||||
yield [pool_list[i] for i in indices]
|
||||
|
||||
if r < 0:
|
||||
raise ValueError("r must be non-negative")
|
||||
if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"):
|
||||
return combinations_with_replacement_helper(pool, r)
|
||||
else:
|
||||
return combinations_with_replacement_helper([a for a in pool], r)
|
||||
|
||||
@tuple
|
||||
class islice:
|
||||
"""
|
||||
Make an iterator that returns selected elements from the iterable.
|
||||
"""
|
||||
|
||||
def __new__[T](iterable: Generator[T], stop: Optional[int]):
|
||||
if stop and ~stop < 0:
|
||||
raise ValueError("Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize.")
|
||||
i = 0
|
||||
for x in iterable:
|
||||
if stop and i >= ~stop:
|
||||
break
|
||||
yield x
|
||||
i += 1
|
||||
|
||||
def __new__[T](iterable: Generator[T], start: Optional[int], stop: Optional[int], step: Optional[int] = None):
|
||||
from sys import maxsize
|
||||
startx = 0
|
||||
stopx = maxsize
|
||||
stepx = 1
|
||||
have_stop = False
|
||||
|
||||
if start:
|
||||
startx = ~start
|
||||
if stop:
|
||||
stopx = ~stop
|
||||
if step:
|
||||
stepx = ~step
|
||||
|
||||
if startx < 0 or stopx < 0:
|
||||
raise ValueError("Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize.")
|
||||
elif stepx < 0:
|
||||
raise ValueError("Step for islice() must be a positive integer or None.")
|
||||
|
||||
it = range(startx, stopx, stepx)
|
||||
N = len(it)
|
||||
idx = 0
|
||||
b = -1
|
||||
|
||||
if N == 0:
|
||||
for i, element in zip(range(start), iterable):
|
||||
pass
|
||||
return
|
||||
|
||||
nexti = it[0]
|
||||
for i, element in enumerate(iterable):
|
||||
if i == nexti:
|
||||
yield element
|
||||
idx += 1
|
||||
if idx >= N:
|
||||
b = i
|
||||
break
|
||||
nexti = it[idx]
|
||||
|
||||
if b >= 0:
|
||||
for i, element in zip(range(b + 1, stopx), iterable):
|
||||
pass
|
||||
|
||||
@inline
|
||||
def count(start = 0, step = 1):
|
||||
def count(start: T = 0, step: T = 1, T: type) -> Generator[T]:
|
||||
"""
|
||||
Return a count object whose ``__next__`` method returns consecutive values.
|
||||
"""
|
||||
|
@ -130,20 +13,9 @@ def count(start = 0, step = 1):
|
|||
yield n
|
||||
n += step
|
||||
|
||||
@inline
|
||||
def repeat(object, times: Optional[int] = None):
|
||||
"""
|
||||
Make an iterator that returns a given object over and over again.
|
||||
"""
|
||||
if times is None:
|
||||
while True:
|
||||
yield object
|
||||
else:
|
||||
for i in range(~times):
|
||||
yield object
|
||||
|
||||
@inline
|
||||
def cycle(iterable):
|
||||
def cycle(iterable: Generator[T], T: type) -> Generator[T]:
|
||||
"""
|
||||
Cycles repeatedly through an iterable.
|
||||
"""
|
||||
|
@ -153,87 +25,24 @@ def cycle(iterable):
|
|||
saved.append(element)
|
||||
while saved:
|
||||
for element in saved:
|
||||
yield element
|
||||
yield element
|
||||
|
||||
|
||||
@inline
|
||||
def compress(data, selectors):
|
||||
def repeat(object: T, times: Optional[int] = None, T: type) -> Generator[T]:
|
||||
"""
|
||||
Return data elements corresponding to true selector elements.
|
||||
Forms a shorter iterator from selected data elements using the selectors to
|
||||
choose the data elements.
|
||||
Make an iterator that returns a given object over and over again.
|
||||
"""
|
||||
for d,s in zip(data, selectors):
|
||||
if s:
|
||||
yield d
|
||||
|
||||
@inline
|
||||
def dropwhile(predicate, iterable):
|
||||
"""
|
||||
Drop items from the iterable while predicate(item) is true.
|
||||
Afterwards, return every element until the iterable is exhausted.
|
||||
"""
|
||||
b = False
|
||||
for x in iterable:
|
||||
if not b and not predicate(x):
|
||||
b = True
|
||||
if b:
|
||||
yield x
|
||||
|
||||
@inline
|
||||
def takewhile(predicate, iterable):
|
||||
"""
|
||||
Return successive entries from an iterable as long as the predicate evaluates to true for each entry.
|
||||
"""
|
||||
for x in iterable:
|
||||
if predicate(x):
|
||||
yield x
|
||||
else:
|
||||
break
|
||||
|
||||
@inline
|
||||
def filterfalse(predicate, iterable):
|
||||
"""
|
||||
Return those items of iterable for which function(item) is false.
|
||||
"""
|
||||
for x in iterable:
|
||||
if not predicate(x):
|
||||
yield x
|
||||
|
||||
def permutations(pool, r: Optional[int] = None):
|
||||
"""
|
||||
Return successive r-length permutations of elements in the iterable.
|
||||
"""
|
||||
def permutations_helper(pool_list, r):
|
||||
n = len(pool_list)
|
||||
rx = ~r if r else n
|
||||
if rx > n:
|
||||
return
|
||||
|
||||
indices = list(range(n))
|
||||
cycles = list(range(n, n - rx, -1))
|
||||
yield [pool_list[i] for i in indices[:rx]]
|
||||
while n:
|
||||
b = -1
|
||||
for i in reversed(range(rx)):
|
||||
cycles[i] -= 1
|
||||
if cycles[i] == 0:
|
||||
indices = indices[:i] + indices[i+1:] + indices[i:i+1]
|
||||
cycles[i] = n - i
|
||||
else:
|
||||
b = i
|
||||
j = cycles[i]
|
||||
indices[i], indices[-j] = indices[-j], indices[i]
|
||||
yield [pool_list[i] for i in indices[:rx]]
|
||||
break
|
||||
if b == -1:
|
||||
return
|
||||
|
||||
if r is not None and ~r < 0:
|
||||
raise ValueError("r must be non-negative")
|
||||
if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"):
|
||||
return permutations_helper(pool, r)
|
||||
if times is None:
|
||||
while True:
|
||||
yield object
|
||||
else:
|
||||
return permutations_helper([a for a in pool], r)
|
||||
for i in range(times):
|
||||
yield object
|
||||
|
||||
|
||||
# Iterators terminating on the shortest input sequence
|
||||
|
||||
|
||||
@tuple
|
||||
class accumulate:
|
||||
|
@ -241,8 +50,9 @@ class accumulate:
|
|||
Make an iterator that returns accumulated sums, or accumulated results
|
||||
of other binary functions (specified via the optional func argument).
|
||||
"""
|
||||
|
||||
@inline
|
||||
def __new__(iterable, func = lambda a, b: a + b, initial = 0):
|
||||
def __new__(iterable: Generator[T], func=lambda a, b: a + b, initial=0, T: type):
|
||||
total = initial
|
||||
yield total
|
||||
for element in iterable:
|
||||
|
@ -250,18 +60,20 @@ class accumulate:
|
|||
yield total
|
||||
|
||||
@inline
|
||||
def __new__(iterable, func = lambda a, b: a + b):
|
||||
def __new__(iterable: Generator[T], func=lambda a, b: a + b, T: type):
|
||||
total = None
|
||||
for element in iterable:
|
||||
total = element if total is None else func(~total, element)
|
||||
yield ~total
|
||||
|
||||
|
||||
@tuple
|
||||
class chain:
|
||||
"""
|
||||
Make an iterator that returns elements from the first iterable until it is exhausted,
|
||||
then proceeds to the next iterable, until all of the iterables are exhausted.
|
||||
"""
|
||||
|
||||
@inline
|
||||
def __new__(*iterables):
|
||||
for it in iterables:
|
||||
|
@ -274,18 +86,52 @@ class chain:
|
|||
for element in it:
|
||||
yield element
|
||||
|
||||
|
||||
@inline
|
||||
def starmap(function, iterable):
|
||||
def compress(
|
||||
data: Generator[T], selectors: Generator[B], T: type, B: type
|
||||
) -> Generator[T]:
|
||||
"""
|
||||
Return an iterator whose values are returned from the function
|
||||
evaluated with an argument tuple taken from the given sequence.
|
||||
Return data elements corresponding to true selector elements.
|
||||
Forms a shorter iterator from selected data elements using the selectors to
|
||||
choose the data elements.
|
||||
"""
|
||||
for args in iterable:
|
||||
yield function(*args)
|
||||
for d, s in zip(data, selectors):
|
||||
if s:
|
||||
yield d
|
||||
|
||||
|
||||
@inline
|
||||
def dropwhile(
|
||||
predicate: Callable[[T], bool], iterable: Generator[T], T: type
|
||||
) -> Generator[T]:
|
||||
"""
|
||||
Drop items from the iterable while predicate(item) is true.
|
||||
Afterwards, return every element until the iterable is exhausted.
|
||||
"""
|
||||
b = False
|
||||
for x in iterable:
|
||||
if not b and not predicate(x):
|
||||
b = True
|
||||
if b:
|
||||
yield x
|
||||
|
||||
|
||||
@inline
|
||||
def filterfalse(
|
||||
predicate: Callable[[T], bool], iterable: Generator[T], T: type
|
||||
) -> Generator[T]:
|
||||
"""
|
||||
Return those items of iterable for which function(item) is false.
|
||||
"""
|
||||
for x in iterable:
|
||||
if not predicate(x):
|
||||
yield x
|
||||
|
||||
|
||||
# TODO: fix this once Optional[Callable] lands
|
||||
@inline
|
||||
def groupby(iterable, key = Optional[int]()):
|
||||
def groupby(iterable, key=Optional[int]()):
|
||||
"""
|
||||
Make an iterator that returns consecutive keys and groups from the iterable.
|
||||
"""
|
||||
|
@ -305,6 +151,119 @@ def groupby(iterable, key = Optional[int]()):
|
|||
yield ~currkey, group
|
||||
|
||||
|
||||
@tuple
|
||||
class islice:
|
||||
"""
|
||||
Make an iterator that returns selected elements from the iterable.
|
||||
"""
|
||||
|
||||
def __new__(iterable: Generator[T], stop: Optional[int], T: type) -> Generator[T]:
|
||||
if stop and ~stop < 0:
|
||||
raise ValueError(
|
||||
"Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize."
|
||||
)
|
||||
i = 0
|
||||
for x in iterable:
|
||||
if stop and i >= ~stop:
|
||||
break
|
||||
yield x
|
||||
i += 1
|
||||
|
||||
def __new__(
|
||||
iterable: Generator[T],
|
||||
start: Optional[int],
|
||||
stop: Optional[int],
|
||||
step: Optional[int] = None,
|
||||
T: type,
|
||||
) -> Generator[T]:
|
||||
from sys import maxsize
|
||||
|
||||
start: int = 0 if not start else ~start
|
||||
stop: int = maxsize if not stop else ~stop
|
||||
step: int = 1 if not step else ~step
|
||||
have_stop = False
|
||||
|
||||
if start < 0 or stop < 0:
|
||||
raise ValueError(
|
||||
"Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize."
|
||||
)
|
||||
elif step < 0:
|
||||
raise ValueError("Step for islice() must be a positive integer or None.")
|
||||
|
||||
it = range(start, stop, step)
|
||||
N = len(it)
|
||||
idx = 0
|
||||
b = -1
|
||||
|
||||
if N == 0:
|
||||
for i, element in zip(range(start), iterable):
|
||||
pass
|
||||
return
|
||||
|
||||
nexti = it[0]
|
||||
for i, element in enumerate(iterable):
|
||||
if i == nexti:
|
||||
yield element
|
||||
idx += 1
|
||||
if idx >= N:
|
||||
b = i
|
||||
break
|
||||
nexti = it[idx]
|
||||
|
||||
if b >= 0:
|
||||
for i, element in zip(range(b + 1, stop), iterable):
|
||||
pass
|
||||
|
||||
|
||||
@inline
|
||||
def starmap(function, iterable):
|
||||
"""
|
||||
Return an iterator whose values are returned from the function
|
||||
evaluated with an argument tuple taken from the given sequence.
|
||||
"""
|
||||
for args in iterable:
|
||||
yield function(*args)
|
||||
|
||||
|
||||
@inline
|
||||
def takewhile(
|
||||
predicate: Callable[[T], bool], iterable: Generator[T], T: type
|
||||
) -> Generator[T]:
|
||||
"""
|
||||
Return successive entries from an iterable as long as the predicate evaluates to true for each entry.
|
||||
"""
|
||||
for x in iterable:
|
||||
if predicate(x):
|
||||
yield x
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
def tee(iterable: Generator[T], n: int = 2, T: type) -> List[Generator[T]]:
|
||||
"""
|
||||
Return n independent iterators from a single iterable.
|
||||
"""
|
||||
from collections import deque
|
||||
|
||||
it = iter(iterable)
|
||||
deques = [deque[T]() for i in range(n)]
|
||||
|
||||
def gen(mydeque: deque[T], T: type) -> Generator[T]:
|
||||
while True:
|
||||
if not mydeque: # when the local deque is empty
|
||||
if it.__done__():
|
||||
return
|
||||
it.__resume__()
|
||||
if it.__done__():
|
||||
return
|
||||
newval = it.next()
|
||||
for d in deques: # load it to all the deques
|
||||
d.append(newval)
|
||||
yield mydeque.popleft()
|
||||
|
||||
return [gen(d) for d in deques]
|
||||
|
||||
|
||||
@tuple
|
||||
class zip_longest:
|
||||
"""
|
||||
|
@ -313,11 +272,12 @@ class zip_longest:
|
|||
with fillvalue. Iteration continues until the longest iterable is
|
||||
exhausted.
|
||||
"""
|
||||
|
||||
@inline
|
||||
def __new__(*args, fillvalue):
|
||||
if staticlen(args) == 2:
|
||||
a = iter(args[0])
|
||||
b = iter(args[1])
|
||||
def __new__(*iterables, fillvalue):
|
||||
if staticlen(iterables) == 2:
|
||||
a = iter(iterables[0])
|
||||
b = iter(iterables[1])
|
||||
a_done = False
|
||||
b_done = False
|
||||
|
||||
|
@ -326,7 +286,8 @@ class zip_longest:
|
|||
b_val = fillvalue
|
||||
if not b_done:
|
||||
b_done = b.done()
|
||||
if not b_done: b_val = b.next()
|
||||
if not b_done:
|
||||
b_val = b.next()
|
||||
yield a_val, b_val
|
||||
|
||||
if not b_done:
|
||||
|
@ -336,7 +297,7 @@ class zip_longest:
|
|||
a.destroy()
|
||||
b.destroy()
|
||||
else:
|
||||
iterators = tuple(iter(it) for it in args)
|
||||
iterators = tuple(iter(it) for it in iterables)
|
||||
num_active = len(iterators)
|
||||
if not num_active:
|
||||
return
|
||||
|
@ -375,11 +336,129 @@ class zip_longest:
|
|||
for it in iters:
|
||||
it.destroy()
|
||||
|
||||
|
||||
# Combinatoric iterators
|
||||
|
||||
|
||||
def combinations(pool: Generator[T], r: int, T: type) -> Generator[List[T]]:
|
||||
"""
|
||||
Return successive r-length combinations of elements in the iterable.
|
||||
|
||||
combinations(range(4), 3) --> (0,1,2), (0,1,3), (0,2,3), (1,2,3)
|
||||
"""
|
||||
|
||||
def combinations_helper(pool: List[T], r: int, T: type) -> Generator[List[T]]:
|
||||
n = len(pool)
|
||||
if r > n:
|
||||
return
|
||||
indices = list(range(r))
|
||||
yield [pool[i] for i in indices]
|
||||
while True:
|
||||
b = -1
|
||||
for i in reversed(range(r)):
|
||||
if indices[i] != i + n - r:
|
||||
b = i
|
||||
break
|
||||
if b == -1:
|
||||
return
|
||||
indices[b] += 1
|
||||
for j in range(b + 1, r):
|
||||
indices[j] = indices[j - 1] + 1
|
||||
yield [pool[i] for i in indices]
|
||||
|
||||
if r < 0:
|
||||
raise ValueError("r must be non-negative")
|
||||
if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"):
|
||||
return combinations_helper(pool, r)
|
||||
else:
|
||||
return combinations_helper([a for a in pool], r)
|
||||
|
||||
|
||||
def combinations_with_replacement(
|
||||
pool: Generator[T], r: int, T: type
|
||||
) -> Generator[List[T]]:
|
||||
"""
|
||||
Return successive r-length combinations of elements in the iterable
|
||||
allowing individual elements to have successive repeats.
|
||||
"""
|
||||
|
||||
def combinations_with_replacement_helper(
|
||||
pool: List[T], r: int, T: type
|
||||
) -> Generator[List[T]]:
|
||||
n = len(pool)
|
||||
if not n and r:
|
||||
return
|
||||
indices = [0 for _ in range(r)]
|
||||
yield [pool[i] for i in indices]
|
||||
while True:
|
||||
b = -1
|
||||
for i in reversed(range(r)):
|
||||
if indices[i] != n - 1:
|
||||
b = i
|
||||
break
|
||||
if b == -1:
|
||||
return
|
||||
newval = indices[b] + 1
|
||||
for j in range(r - b):
|
||||
indices[b + j] = newval
|
||||
yield [pool[i] for i in indices]
|
||||
|
||||
if r < 0:
|
||||
raise ValueError("r must be non-negative")
|
||||
if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"):
|
||||
return combinations_with_replacement_helper(pool, r)
|
||||
else:
|
||||
return combinations_with_replacement_helper([a for a in pool], r)
|
||||
|
||||
|
||||
def permutations(
|
||||
pool: Generator[T], r: Optional[int] = None, T: type
|
||||
) -> Generator[List[T]]:
|
||||
"""
|
||||
Return successive r-length permutations of elements in the iterable.
|
||||
"""
|
||||
|
||||
def permutations_helper(
|
||||
pool: List[T], r: Optional[int], T: type
|
||||
) -> Generator[List[T]]:
|
||||
n = len(pool)
|
||||
r: int = ~r if r else n
|
||||
if r > n:
|
||||
return
|
||||
|
||||
indices = list(range(n))
|
||||
cycles = list(range(n, n - r, -1))
|
||||
yield [pool[i] for i in indices[:r]]
|
||||
while n:
|
||||
b = -1
|
||||
for i in reversed(range(r)):
|
||||
cycles[i] -= 1
|
||||
if cycles[i] == 0:
|
||||
indices = indices[:i] + indices[i + 1 :] + indices[i : i + 1]
|
||||
cycles[i] = n - i
|
||||
else:
|
||||
b = i
|
||||
j = cycles[i]
|
||||
indices[i], indices[-j] = indices[-j], indices[i]
|
||||
yield [pool[i] for i in indices[:r]]
|
||||
break
|
||||
if b == -1:
|
||||
return
|
||||
|
||||
if r is not None and ~r < 0:
|
||||
raise ValueError("r must be non-negative")
|
||||
if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"):
|
||||
return permutations_helper(pool, r)
|
||||
else:
|
||||
return permutations_helper([a for a in pool], r)
|
||||
|
||||
|
||||
@tuple
|
||||
class product:
|
||||
"""
|
||||
Cartesian product of input iterables.
|
||||
"""
|
||||
|
||||
@inline
|
||||
def __new__(*args):
|
||||
if staticlen(args) == 0:
|
||||
|
@ -397,27 +476,6 @@ class product:
|
|||
pools = [list(pool) for _ in range(repeat) for pool in args]
|
||||
result = [List[type(pools[0][0])]()]
|
||||
for pool in pools:
|
||||
result = [x+[y] for x in result for y in pool]
|
||||
result = [x + [y] for x in result for y in pool]
|
||||
for prod in result:
|
||||
yield prod
|
||||
|
||||
def tee[T](iterable: Generator[T], n: int = 2):
|
||||
"""
|
||||
Return n independent iterators from a single iterable.
|
||||
"""
|
||||
from collections import deque
|
||||
it = iter(iterable)
|
||||
deques = [deque[T]() for i in range(n)]
|
||||
def gen(mydeque):
|
||||
while True:
|
||||
if not mydeque: # when the local deque is empty
|
||||
if it.__done__():
|
||||
return
|
||||
it.__resume__()
|
||||
if it.__done__():
|
||||
return
|
||||
newval = it.next()
|
||||
for d in deques: # load it to all the deques
|
||||
d.append(newval)
|
||||
yield mydeque.popleft()
|
||||
return [gen(d) for d in deques]
|
||||
|
|
Loading…
Reference in New Issue