mirror of https://github.com/exaloop/codon.git
229 lines
6.7 KiB
Python
229 lines
6.7 KiB
Python
import heapq
|
|
from random import random, randrange
|
|
|
|
|
|
@test
|
|
def check_invariant(heap):
|
|
# Check the heap invariant.
|
|
for pos, item in enumerate(heap):
|
|
if pos: # pos 0 has no parent
|
|
parentpos = (pos - 1) >> 1
|
|
assert heap[parentpos] <= item
|
|
|
|
|
|
@test
|
|
def test_heapify():
|
|
for size in list(range(30)) + [20000]:
|
|
heap = [random() for dummy in range(size)]
|
|
heapq.heapify(heap)
|
|
check_invariant(heap)
|
|
|
|
|
|
@test
|
|
def test_naive_nbest():
|
|
data = [randrange(2000) for i in range(1000)]
|
|
heap = []
|
|
for item in data:
|
|
heapq.heappush(heap, item)
|
|
if len(heap) > 10:
|
|
heapq.heappop(heap)
|
|
heap.sort()
|
|
assert heap == sorted(data)[-10:]
|
|
|
|
|
|
def heapiter(heap):
|
|
# An iterator returning a heap's elements, smallest-first.
|
|
while heap:
|
|
yield heapq.heappop(heap)
|
|
|
|
|
|
@test
|
|
def test_nbest():
|
|
# Less-naive "N-best" algorithm, much faster (if len(data) is big
|
|
# enough <wink>) than sorting all of data. However, if we had a max
|
|
# heap instead of a min heap, it could go faster still via
|
|
# heapify'ing all of data (linear time), then doing 10 heappops
|
|
# (10 log-time steps).
|
|
data = [randrange(2000) for i in range(1000)]
|
|
heap = data[:10]
|
|
heapq.heapify(heap)
|
|
for item in data[10:]:
|
|
if item > heap[0]: # this gets rarer the longer we run
|
|
heapq.heapreplace(heap, item)
|
|
assert list(heapiter(heap)) == sorted(data)[-10:]
|
|
|
|
|
|
@test
|
|
def test_nbest_with_pushpop():
|
|
data = [randrange(2000) for i in range(1000)]
|
|
heap = data[:10]
|
|
heapq.heapify(heap)
|
|
for item in data[10:]:
|
|
heapq.heappushpop(heap, item)
|
|
assert list(heapiter(heap)) == sorted(data)[-10:]
|
|
assert heapq.heappushpop(List[str](), "x") == "x"
|
|
|
|
|
|
@test
|
|
def test_heappushpop():
|
|
h = List[int]()
|
|
x = heapq.heappushpop(h, 10)
|
|
assert (h, x) == (List[int](), 10)
|
|
|
|
h = [10]
|
|
x = heapq.heappushpop(h, 9)
|
|
assert (h, x) == ([10], 9)
|
|
|
|
h = [10]
|
|
x = heapq.heappushpop(h, 11)
|
|
assert (h, x) == ([11], 10)
|
|
|
|
|
|
@test
|
|
def test_heappop_max():
|
|
# _heapop_max has an optimization for one-item lists which isn't
|
|
# covered in other tests, so test that case explicitly here
|
|
h = [3, 2]
|
|
assert heapq._heappop_max(h) == 3
|
|
assert heapq._heappop_max(h) == 2
|
|
|
|
|
|
@test
|
|
def test_heapsort():
|
|
# Exercise everything with repeated heapsort checks
|
|
for trial in range(100):
|
|
size = randrange(50)
|
|
data = [randrange(25) for i in range(size)]
|
|
heap = None
|
|
if trial & 1: # Half of the time, use heapify
|
|
heap = data[:]
|
|
heapq.heapify(heap)
|
|
else: # The rest of the time, use heappush
|
|
heap = []
|
|
for item in data:
|
|
heapq.heappush(heap, item)
|
|
heap_sorted = [heapq.heappop(heap) for i in range(size)]
|
|
assert heap_sorted == sorted(data)
|
|
|
|
|
|
"""
|
|
def test_merge(self):
|
|
inputs = []
|
|
for i in range(randrange(25)):
|
|
row = []
|
|
for j in range(randrange(100)):
|
|
tup = choice('ABC'), randrange(-500, 500)
|
|
row.append(tup)
|
|
inputs.append(row)
|
|
|
|
for key in [None, itemgetter(0), itemgetter(1), itemgetter(1, 0)]:
|
|
for reverse in [False, True]:
|
|
seqs = []
|
|
for seq in inputs:
|
|
seqs.append(sorted(seq, key=key, reverse=reverse))
|
|
self.assertEqual(sorted(chain(*inputs), key=key, reverse=reverse),
|
|
list(heapq.merge(*seqs, key=key, reverse=reverse)))
|
|
self.assertEqual(list(heapq.merge()), [])
|
|
|
|
def test_empty_merges(self):
|
|
# Merging two empty lists (with or without a key) should produce
|
|
# another empty list.
|
|
self.assertEqual(list(heapq.merge([], [])), [])
|
|
self.assertEqual(list(heapq.merge([], [], key=lambda: 6)), [])
|
|
|
|
def test_merge_does_not_suppress_index_error(self):
|
|
# Issue 19018: Heapq.merge suppresses IndexError from user generator
|
|
def iterable():
|
|
s = list(range(10))
|
|
for i in range(20):
|
|
yield s[i] # IndexError when i > 10
|
|
with self.assertRaises(IndexError):
|
|
list(heapq.merge(iterable(), iterable()))
|
|
|
|
def test_merge_stability(self):
|
|
class Int(int):
|
|
pass
|
|
inputs = [[], [], [], []]
|
|
for i in range(20000):
|
|
stream = randrange(4)
|
|
x = randrange(500)
|
|
obj = Int(x)
|
|
obj.pair = (x, stream)
|
|
inputs[stream].append(obj)
|
|
for stream in inputs:
|
|
stream.sort()
|
|
result = [i.pair for i in heapq.merge(*inputs)]
|
|
self.assertEqual(result, sorted(result))
|
|
"""
|
|
|
|
|
|
def mykey(x: Tuple[int, int]):
|
|
return (x[0] * 547 % 2000, x[1] * 547 % 2000)
|
|
|
|
|
|
@test
|
|
def test_nsmallest():
|
|
data = [(randrange(2000), i) for i in range(1000)]
|
|
for n in (10,):
|
|
assert list(heapq.nsmallest(n, data)) == sorted(data)[: min(n, len(data))]
|
|
assert (
|
|
list(heapq.nsmallest(n, data, key=mykey))
|
|
== sorted(data, key=mykey)[: min(n, len(data))]
|
|
)
|
|
assert list(heapq.nsmallest(n, data)) == sorted(data)[: min(n, len(data))]
|
|
|
|
|
|
@test
|
|
def test_nlargest():
|
|
data = [(randrange(2000), i) for i in range(1000)]
|
|
for n in (0, 1, 2, 10, 100, 400, 999, 1000, 1100):
|
|
assert list(heapq.nlargest(n, data)) == sorted(data)[::-1][: min(n, len(data))]
|
|
assert (
|
|
list(heapq.nlargest(n, data, key=mykey))
|
|
== sorted(data, key=mykey)[::-1][: min(n, len(data))]
|
|
)
|
|
assert list(heapq.nlargest(n, data)) == sorted(data)[::-1][: min(n, len(data))]
|
|
|
|
|
|
@test
|
|
def test_comparison_operator():
|
|
def hsort(data: List[float], T: type):
|
|
data2 = [T(x) for x in data]
|
|
heapq.heapify(data2)
|
|
return [heapq.heappop(data2).x for i in range(len(data))]
|
|
|
|
class LT:
|
|
x: float
|
|
|
|
def __init__(self, x: float):
|
|
self.x = x
|
|
|
|
def __lt__(self, other: LT):
|
|
return self.x > other.x
|
|
|
|
data = [random() for i in range(100)]
|
|
target = list(reversed(sorted(data)))
|
|
assert hsort(data, LT) == target
|
|
|
|
|
|
@test
|
|
def test_merge():
|
|
assert [0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25] == list(heapq.merge([1, 3, 5, 7], [0, 2, 4, 8], [5, 10, 15, 20], List[int](), [25]))
|
|
assert ['dog', 'cat', 'fish', 'horse', 'kangaroo'] == list(heapq.merge(['dog', 'horse'], ['cat', 'fish', 'kangaroo'], key=len))
|
|
|
|
assert [25, 20, 15, 10, 8, 7, 5, 5, 4, 3, 2, 1, 0] == list(heapq.merge([7, 5, 3, 1], [8, 4, 2, 0], [20, 15, 10, 5], List[int](), [25], reverse=True))
|
|
assert ['kangaroo', 'horse', 'fish', 'dog', 'cat'] == list(heapq.merge(['horse', 'dog'], ['kangaroo', 'fish', 'cat'], key=len, reverse=True))
|
|
|
|
|
|
test_heapify()
|
|
test_naive_nbest()
|
|
test_nsmallest()
|
|
test_nlargest()
|
|
test_nbest()
|
|
test_nbest_with_pushpop()
|
|
test_heappushpop()
|
|
test_heappop_max()
|
|
test_heapsort()
|
|
# test_comparison_operator()
|
|
test_merge()
|