stdlib/heapq.codon

pull/13/head
Ishak Numanagić 2022-01-24 06:13:10 +01:00
parent 8612416424
commit 513d607df0
1 changed files with 42 additions and 27 deletions

View File

@ -1,9 +1,11 @@
# (c) 2022 Exaloop Inc. All rights reserved.
# TODO: heapq.merge
# 'heap' is a heap at all indices >= startpos, except possibly for pos. pos
# is the index of a leaf with a possibly out-of-order value. Restore the
# heap invariant.
def _siftdown[T](heap: List[T], startpos: int, pos: int):
def _siftdown(heap: List[T], startpos: int, pos: int, T: type) -> void:
newitem = heap[pos]
# Follow the path to the root, moving parents down until finding a place
# newitem fits.
@ -17,12 +19,13 @@ def _siftdown[T](heap: List[T], startpos: int, pos: int):
break
heap[pos] = newitem
def _siftup[T](heap: List[T], pos: int):
def _siftup(heap: List[T], pos: int, T: type) -> void:
endpos = len(heap)
startpos = pos
newitem = heap[pos]
# Bubble up the smaller child until hitting a leaf.
childpos = 2*pos + 1 # leftmost child position
childpos = 2 * pos + 1 # leftmost child position
while childpos < endpos:
# Set childpos to index of smaller child.
rightpos = childpos + 1
@ -31,14 +34,15 @@ def _siftup[T](heap: List[T], pos: int):
# Move the smaller child up.
heap[pos] = heap[childpos]
pos = childpos
childpos = 2*pos + 1
childpos = 2 * pos + 1
# The leaf at pos is empty now. Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
_siftdown(heap, startpos, pos)
def _siftdown_max[T](heap: List[T], startpos: int, pos: int):
'Maxheap variant of _siftdown'
def _siftdown_max(heap: List[T], startpos: int, pos: int, T: type) -> void:
"Maxheap variant of _siftdown"
newitem = heap[pos]
# Follow the path to the root, moving parents down until finding a place
# newitem fits.
@ -52,13 +56,14 @@ def _siftdown_max[T](heap: List[T], startpos: int, pos: int):
break
heap[pos] = newitem
def _siftup_max[T](heap: List[T], pos: int):
'Maxheap variant of _siftup'
def _siftup_max(heap: List[T], pos: int, T: type) -> void:
"Maxheap variant of _siftup"
endpos = len(heap)
startpos = pos
newitem = heap[pos]
# Bubble up the larger child until hitting a leaf.
childpos = 2*pos + 1 # leftmost child position
childpos = 2 * pos + 1 # leftmost child position
while childpos < endpos:
# Set childpos to index of larger child.
rightpos = childpos + 1
@ -67,20 +72,22 @@ def _siftup_max[T](heap: List[T], pos: int):
# Move the larger child up.
heap[pos] = heap[childpos]
pos = childpos
childpos = 2*pos + 1
childpos = 2 * pos + 1
# The leaf at pos is empty now. Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
_siftdown_max(heap, startpos, pos)
def heappush[T](heap: List[T], item: T):
def heappush(heap: List[T], item: T, T: type) -> void:
"""Push item onto heap, maintaining the heap invariant."""
heap.append(item)
_siftdown(heap, 0, len(heap)-1)
_siftdown(heap, 0, len(heap) - 1)
def heappop[T](heap: List[T]):
def heappop(heap: List[T], T: type) -> T:
"""Pop the smallest item off the heap, maintaining the heap invariant."""
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
returnitem = heap[0]
heap[0] = lastelt
@ -88,7 +95,8 @@ def heappop[T](heap: List[T]):
return returnitem
return lastelt
def heapreplace[T](heap: List[T], item: T):
def heapreplace(heap: List[T], item: T, T: type) -> T:
"""
Pop and return the current smallest value, and add the new item.
This is more efficient than heappop() followed by heappush(), and can be
@ -97,19 +105,21 @@ def heapreplace[T](heap: List[T], item: T):
this routine unless written as part of a conditional replacement:
``if item > heap[0]: item = heapreplace(heap, item)``.
"""
returnitem = heap[0] # raises appropriate IndexError if heap is empty
returnitem = heap[0] # raises appropriate IndexError if heap is empty
heap[0] = item
_siftup(heap, 0)
return returnitem
def heappushpop[T](heap: List[T], item: T):
def heappushpop(heap: List[T], item: T, T: type) -> T:
"""Fast version of a heappush followed by a heappop."""
if heap and heap[0] < item:
item, heap[0] = heap[0], item
_siftup(heap, 0)
return item
def heapify[T](x: List[T]):
def heapify(x: List[T], T: type) -> void:
"""Transform list into a heap, in-place, in $O(len(x))$ time."""
n = len(x)
# Transform bottom-up. The largest index there's any point to looking at
@ -117,12 +127,13 @@ def heapify[T](x: List[T]):
# or i < (n-1)/2. If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so
# j-1 is the largest, which is n//2 - 1. If n is odd = 2*j+1, this is
# (2*j+1-1)/2 = j so j-1 is the largest, and that's again n//2-1.
for i in reversed(range(n//2)):
for i in reversed(range(n // 2)):
_siftup(x, i)
def _heappop_max[T](heap: List[T]):
def _heappop_max(heap: List[T], T: type) -> T:
"""Maxheap version of a heappop."""
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
returnitem = heap[0]
heap[0] = lastelt
@ -130,20 +141,23 @@ def _heappop_max[T](heap: List[T]):
return returnitem
return lastelt
def _heapreplace_max[T](heap: List[T], item: T):
def _heapreplace_max(heap: List[T], item: T, T: type) -> T:
"""Maxheap version of a heappop followed by a heappush."""
returnitem = heap[0] # raises appropriate IndexError if heap is empty
returnitem = heap[0] # raises appropriate IndexError if heap is empty
heap[0] = item
_siftup_max(heap, 0)
return returnitem
def _heapify_max[T](x: List[T]):
def _heapify_max(x: List[T], T: type) -> void:
"""Transform list into a maxheap, in-place, in O(len(x)) time."""
n = len(x)
for i in reversed(range(n//2)):
for i in reversed(range(n // 2)):
_siftup_max(x, i)
def nsmallest[T](n: int, iterable: Generator[T], key = Optional[int]()):
def nsmallest(n: int, iterable: Generator[T], key=Optional[int](), T: type) -> List[T]:
"""Find the n smallest elements in a dataset.
Equivalent to: sorted(iterable, key=key)[:n]
"""
@ -217,7 +231,8 @@ def nsmallest[T](n: int, iterable: Generator[T], key = Optional[int]()):
result.sort()
return [elem for k, order, elem in result]
def nlargest[T](n: int, iterable: Generator[T], key = Optional[int]()):
def nlargest(n: int, iterable: Generator[T], key=Optional[int](), T: type) -> List[T]:
"""Find the n largest elements in a dataset.
Equivalent to: sorted(iterable, key=key, reverse=True)[:n]
"""