1
0
mirror of https://github.com/exaloop/codon.git synced 2025-06-03 15:03:52 +08:00
codon/stdlib/collections.codon
Ibrahim Numanagić 5de233a64e
Dynamic Polymorphism (#58)
* Use Static[] for static inheritance

* Support .seq extension

* Fix #36

* Polymorphic typechecking; vtables [wip]

* v-table dispatch [wip]

* vtable routing [wip; bug]

* vtable routing [MVP]

* Fix texts

* Add union type support

* Update FAQs

* Clarify

* Add BSL license

* Add makeUnion

* Add IR UnionType

* Update union representation in LLVM

* Update README

* Update README.md

* Update README

* Update README.md

* Add benchmarks

* Add more benchmarks and README

* Add primes benchmark

* Update benchmarks

* Fix cpp

* Clean up list

* Update faq.md

* Add binary trees benchmark

* Add fannkuch benchmark

* Fix paths

* Add PyPy

* Abort on fail

* More benchmarks

* Add cpp word_count

* Update set_partition cpp

* Add nbody cpp

* Add TAQ cpp; fix word_count timing

* Update CODEOWNERS

* Update README

* Update README.md

* Update CODEOWNERS

* Fix bench script

* Update binary_trees.cpp

* Update taq.cpp

* Fix primes benchmark

* Add mandelbrot benchmark

* Fix OpenMP init

* Add Module::unsafeGetUnionType

* UnionType [wip] [skip ci]

* Integrate IR unions and Union

* UnionType refactor [skip ci]

* Update README.md

* Update docs

* UnionType [wip] [skip ci]

* UnionType and automatic unions

* Add Slack

* Update faq.md

* Refactor types

* New error reporting [wip]

* New error reporting [wip]

* peglib updates [wip] [skip_ci]

* Fix parsing issues

* Fix parsing issues

* Fix error reporting issues

* Make sure random module matches Python

* Update releases.md

* Fix tests

* Fix #59

* Fix #57

* Fix #50

* Fix #49

* Fix #26; Fix #51; Fix #47; Fix #49

* Fix collection extension methods

* Fix #62

* Handle *args/**kwargs with Callable[]; Fix #43

* Fix #43

* Fix Ptr.__sub__; Fix polymorphism issues

* Add typeinfo

* clang-format

* Upgrade fmtlib to v9; Use CPM for fmtlib; format spec support; __format__ support

* Use CPM for semver and toml++

* Remove extension check

* Revamp str methods

* Update str.zfill

* Fix thunk crashes [wip] [skip_ci]

* Fix str.__reversed__

* Fix count_with_max

* Fix vtable memory allocation issues

* Add poly AST tests

* Use PDQsort when stability does not matter

* Fix dotted imports; Fix  issues

* Fix kwargs passing to Python

* Fix #61

* Fix #37

* Add isinstance support for unions; Union methods return Union type if different

* clang-format

* Nicely format error tracebacks

* Fix build issues; clang-format

* Fix OpenMP init

* Fix OpenMP init

* Update README.md

* Fix tests

* Update license [skip ci]

* Update license [ci skip]

* Add copyright header to all source files

* Fix super(); Fix error recovery in ClassStmt

* Clean up whitespace [ci skip]

* Use Python 3.9 on CI

* Print info in random test

* Fix single unions

* Update random_test.codon

* Fix polymorhic thunk instantiation

* Fix random test

* Add operator.attrgetter and operator.methodcaller

* Add code documentation

* Update documentation

* Update README.md

* Fix tests

* Fix random init

Co-authored-by: A. R. Shajii <ars@ars.me>
2022-12-04 19:45:21 -05:00

432 lines
12 KiB
Python

# Copyright (C) 2022 Exaloop Inc. <https://exaloop.io>
from internal.types.optional import unwrap
class deque:
_arr: Array[T]
_head: int
_tail: int
_maxlen: int
T: type
def __init__(self, arr: Array[T], head: int, tail: int, maxlen: int):
self._arr = arr
self._head = head
self._tail = tail
self._maxlen = maxlen
def __init__(self):
self._arr = Array[T](16)
self._head = 0
self._tail = 0
self._maxlen = -1
def __init__(self, maxlen: int):
cap = 1
while cap < maxlen:
cap *= 2
self._arr = Array[T](cap)
self._head = 0
self._tail = 0
self._maxlen = maxlen
def __init__(self, it: Generator[T]):
self._arr = Array[T](16)
self._head = 0
self._tail = 0
self._maxlen = -1
for i in it:
self.append(i)
@property
def maxlen(self) -> int:
return self._maxlen
def _double_cap(self):
p = self._head
n = len(self._arr)
r = n - p
new_cap = n * 2
new_arr = Array[T](new_cap)
for i in range(r):
new_arr[i] = self._arr[p + i]
for i in range(p):
new_arr[i + r] = self._arr[i]
self._arr = new_arr
self._head = 0
self._tail = n
def _check_not_empty(self):
if not self:
raise IndexError("pop from an empty deque")
def __bool__(self) -> bool:
return self._head != self._tail
def __len__(self) -> int:
return (self._tail - self._head) & (len(self._arr) - 1)
def appendleft(self, x: T):
self._head = (self._head - 1) & (len(self._arr) - 1)
self._arr[self._head] = x
if self._maxlen >= 0 and len(self) > self._maxlen:
self.pop()
if self._head == self._tail:
self._double_cap()
def append(self, x: T):
self._arr[self._tail] = x
self._tail = (self._tail + 1) & (len(self._arr) - 1)
if self._maxlen >= 0 and len(self) > self._maxlen:
self.popleft()
if self._head == self._tail:
self._double_cap()
def popleft(self) -> T:
self._check_not_empty()
res = self._arr[self._head]
self._head = (self._head + 1) & (len(self._arr) - 1)
return res
def pop(self) -> T:
self._check_not_empty()
self._tail = (self._tail - 1) & (len(self._arr) - 1)
return self._arr[self._tail]
def clear(self):
self._head = 0
self._tail = 0
def __iter__(self) -> Generator[T]:
i = self._head
while i != self._tail:
yield self._arr[i]
i = (i + 1) & (len(self._arr) - 1)
def __contains__(self, x: T) -> bool:
for i in self:
if i == x:
return True
return False
def __deepcopy__(self) -> deque[T]:
return deque(i.__deepcopy__() for i in self)
def __copy__(self) -> deque[T]:
return deque[T](self._arr.__copy__(), self._head, self._tail, self._maxlen)
def copy(self) -> deque[T]:
return self.__copy__()
def __repr__(self) -> str:
return f"deque({repr(List[T](iter(self)))})"
def _idx_check(self, idx: int, msg: str):
if self._head == self._tail or idx >= len(self) or idx < 0:
raise IndexError(msg)
@property
def left(self) -> T:
self._idx_check(0, "list index out of range")
return self._arr[self._head]
def __getitem__(self, idx: int) -> T:
if idx < 0:
idx += len(self)
self._idx_check(idx, "list index out of range")
if self._head <= self._tail:
return self._arr[self._head + idx]
elif self._head + idx < len(self._arr):
return self._arr[self._head + idx]
else:
idx -= len(self._arr) - self._head
assert 0 <= idx < self._tail
return self._arr[idx]
@tuple
class _CounterItem:
element: T
count: int
T: type
def __eq__(self, other: _CounterItem[T]) -> bool:
return self.count == other.count
def __ne__(self, other: _CounterItem[T]) -> bool:
return self.count != other.count
def __lt__(self, other: _CounterItem[T]) -> bool:
return self.count < other.count
def __gt__(self, other: _CounterItem[T]) -> bool:
return self.count > other.count
def __le__(self, other: _CounterItem[T]) -> bool:
return self.count <= other.count
def __ge__(self, other: _CounterItem[T]) -> bool:
return self.count >= other.count
class Counter(Static[Dict[T, int]]):
T: type
def __init__(self, elements: Generator[T]):
self._init()
self.update(elements)
def __init__(self, other: Counter[T]):
self._init_from(other)
def __init__(self, other: Dict[T, int]):
self._init_from(other)
def elements(self) -> Generator[T]:
for k, v in self.items():
for i in range(v):
yield k
def most_common(self, n: Optional[int] = None) -> List[Tuple[T, int]]:
if len(self) == 0:
return List[_CounterItem](capacity=0)
if n is None:
v = List[_CounterItem](capacity=len(self))
for t in self.items():
v.append(t)
v.sort(reverse=True)
return v
else:
from heapq import heapify, heapreplace
n: int = n
if n == 1:
top: Optional[_CounterItem] = None
for t in self.items():
if top is None or t[1] > top.count:
top = t
return [unwrap(top)]
if n <= 0:
return List[_CounterItem](capacity=0)
result = List[_CounterItem](capacity=n)
for t in self.items():
if len(result) < n:
result.append(t)
if len(result) == n:
heapify(result)
else:
if result[0] < t:
heapreplace(result, t)
result.sort(reverse=True)
return result
def subtract(self, elements: Generator[T]):
for a in elements:
self.increment(a, -1)
def subtract(self, other: Counter[T]):
for k, v in other.items():
self.increment(k, -v)
def subtract(self, other: Dict[T, int]):
for k, v in other.items():
self.increment(k, -v)
def update(self, elements: Generator[T]):
for a in elements:
self.increment(a)
def update(self, other: Counter[T]):
for k, v in other.items():
self.increment(k, by=v)
def update(self, other: Dict[T, int]):
for k, v in other.items():
self.increment(k, by=v)
def update(self):
pass
def total(self) -> int:
m = 0
for v in self.values():
m += v
return m
def __getitem__(self, key: T) -> int:
return self.get(key, 0)
def __delitem__(self, key: T):
x = self._kh_get(key)
if x != self._kh_end():
self._kh_del(x)
def __eq__(self, other: Counter[T]) -> bool:
if self.__len__() != other.__len__():
return False
for k, v in self.items():
if k not in other or other[k] != v:
return False
return True
def __ne__(self, other: Counter[T]) -> bool:
return not (self == other)
def __copy__(self) -> Counter[T]:
return Counter[T](self)
def __iadd__(self, other: Counter[T]) -> Counter[T]:
for k, v in other.items():
self.increment(k, by=v)
self._del_non_positives()
return self
def __isub__(self, other: Counter[T]) -> Counter[T]:
for k, v in other.items():
self.increment(k, by=-v)
self._del_non_positives()
return self
def __iand__(self, other: Counter[T]) -> Counter[T]:
for k, v in other.items():
self[k] = min(self.get(k, 0), v)
self._del_non_positives()
return self
def __ior__(self, other: Counter[T]) -> Counter[T]:
self._del_non_positives()
for k, v in other.items():
self[k] = max(self.get(k, 0), v)
self._del_non_positives()
return self
def __pos__(self) -> Counter[T]:
result = Counter[T]()
result.resize(self._n_buckets)
for k, v in self.items():
if v > 0:
result[k] = v
return result
def __neg__(self) -> Counter[T]:
result = Counter[T]()
result.resize(self._n_buckets)
for k, v in self.items():
if v < 0:
result[k] = -v
return result
def __add__(self, other: Counter[T]) -> Counter[T]:
result = self.__copy__()
result += other
return result
def __sub__(self, other: Counter[T]) -> Counter[T]:
result = self.__copy__()
result -= other
return result
def __and__(self, other: Counter[T]) -> Counter[T]:
result = self.__copy__()
result &= other
return result
def __or__(self, other: Counter[T]) -> Counter[T]:
result = self.__copy__()
result |= other
return result
def __repr__(self):
return f"Counter({super().__repr__()})"
def __dict_do_op_throws__(self, key: T, other: Z, op: F, F: type, Z: type):
self.__dict_do_op__(key, other, 0, op)
def _del_non_positives(self):
for k, v in self.items():
if v <= 0:
del self[k]
@extend
class Dict:
def __init__(self: Dict[K, int], other: Counter[K]):
self._init_from(other)
class defaultdict(Static[Dict[K,V]]):
default_factory: S
K: type
V: type
S: TypeVar[Callable[[], V]]
def __init__(self: defaultdict[K, VV, Function[[], V]], VV: TypeVar[V]):
super().__init__()
self.default_factory = lambda: VV()
def __init__(self, f: S):
super().__init__()
self.default_factory = f
def __init__(self: defaultdict[K, VV, Function[[], V]], VV: TypeVar[V], other: Dict[K, V]):
super().__init__(other)
self.default_factory = lambda: VV()
def __init__(self, f: S, other: Dict[K, V]):
super().__init__(other)
self.default_factory = f
def __missing__(self, key: K):
default_value = self.default_factory()
self.__setitem__(key, default_value)
return default_value
def __getitem__(self, key: K) -> V:
if key not in self:
return self.__missing__(key)
return super().__getitem__(key)
def __dict_do_op_throws__(self, key: K, other: Z, op: F, F: type, Z: type):
x = self._kh_get(key)
if x == self._kh_end():
self.__missing__(key)
x = self._kh_get(key)
self._vals[x] = op(self._vals[x], other)
def copy(self):
d = defaultdict[K,V,S](self.default_factory)
d._init_from(self)
return d
def __copy__(self):
return self.copy()
def __deepcopy__(self):
d = defaultdict[K,V,S](self.default_factory)
for k,v in self.items():
d[k.__deepcopy__()] = v.__deepcopy__()
return d
def __eq__(self, other: defaultdict[K,V,S]) -> bool:
if self.__len__() != other.__len__():
return False
for k, v in self.items():
if k not in other or other[k] != v:
return False
return True
def __ne__(self, other: defaultdict[K,V,S]) -> bool:
return not (self == other)
def __repr__(self):
return f"defaultdict(<default factory of '{V.__name__}'>, {super().__repr__()})"
@extend
class Dict:
def __init__(self: Dict[K, V], other: defaultdict[K, V, S], S: type):
self._init_from(other)
def namedtuple(name: Static[str], args): # internal
pass