faiss/tests/test_contrib.py

427 lines
13 KiB
Python
Raw Normal View History

# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
2020-08-04 04:15:02 +08:00
import faiss
import unittest
import numpy as np
import platform
2020-08-04 04:15:02 +08:00
from faiss.contrib import datasets
from faiss.contrib import inspect_tools
from faiss.contrib import evaluation
from faiss.contrib import ivf_tools
from common_faiss_tests import get_dataset_2
try:
from faiss.contrib.exhaustive_search import \
knn_ground_truth, knn, range_ground_truth, \
range_search_max_results, exponential_query_iterator
except:
pass # Submodule import broken in python 2.
2020-08-04 04:15:02 +08:00
@unittest.skipIf(platform.python_version_tuple()[0] < '3', \
'Submodule import broken in python 2.')
2020-08-04 04:15:02 +08:00
class TestComputeGT(unittest.TestCase):
def test_compute_GT(self):
d = 64
xt, xb, xq = get_dataset_2(d, 0, 10000, 100)
index = faiss.IndexFlatL2(d)
index.add(xb)
Dref, Iref = index.search(xq, 10)
# iterator function on the matrix
def matrix_iterator(xb, bs):
for i0 in range(0, xb.shape[0], bs):
yield xb[i0:i0 + bs]
Dnew, Inew = knn_ground_truth(xq, matrix_iterator(xb, 1000), 10)
np.testing.assert_array_equal(Iref, Inew)
# decimal = 4 required when run on GPU
np.testing.assert_almost_equal(Dref, Dnew, decimal=4)
class TestDatasets(unittest.TestCase):
"""here we test only the synthetic dataset. Datasets that require
disk or manifold access are in
//deeplearning/projects/faiss-forge/test_faiss_datasets/:test_faiss_datasets
"""
def test_synthetic(self):
ds = datasets.SyntheticDataset(32, 1000, 2000, 10)
xq = ds.get_queries()
self.assertEqual(xq.shape, (10, 32))
xb = ds.get_database()
self.assertEqual(xb.shape, (2000, 32))
ds.check_sizes()
def test_synthetic_ip(self):
ds = datasets.SyntheticDataset(32, 1000, 2000, 10, "IP")
index = faiss.IndexFlatIP(32)
index.add(ds.get_database())
np.testing.assert_array_equal(
ds.get_groundtruth(100),
index.search(ds.get_queries(), 100)[1]
)
def test_synthetic_iterator(self):
ds = datasets.SyntheticDataset(32, 1000, 2000, 10)
xb = ds.get_database()
xb2 = []
for xbi in ds.database_iterator():
xb2.append(xbi)
xb2 = np.vstack(xb2)
np.testing.assert_array_equal(xb, xb2)
class TestExhaustiveSearch(unittest.TestCase):
def test_knn_cpu(self):
xb = np.random.rand(200, 32).astype('float32')
xq = np.random.rand(100, 32).astype('float32')
index = faiss.IndexFlatL2(32)
index.add(xb)
Dref, Iref = index.search(xq, 10)
Dnew, Inew = knn(xq, xb, 10)
assert np.all(Inew == Iref)
assert np.allclose(Dref, Dnew)
index = faiss.IndexFlatIP(32)
index.add(xb)
Dref, Iref = index.search(xq, 10)
Dnew, Inew = knn(xq, xb, 10, metric=faiss.METRIC_INNER_PRODUCT)
assert np.all(Inew == Iref)
assert np.allclose(Dref, Dnew)
def do_test_range(self, metric):
ds = datasets.SyntheticDataset(32, 0, 1000, 10)
xq = ds.get_queries()
xb = ds.get_database()
D, I = faiss.knn(xq, xb, 10, metric=metric)
threshold = float(D[:, -1].mean())
index = faiss.IndexFlat(32, metric)
index.add(xb)
ref_lims, ref_D, ref_I = index.range_search(xq, threshold)
new_lims, new_D, new_I = range_ground_truth(
xq, ds.database_iterator(bs=100), threshold, ngpu=0,
metric_type=metric)
evaluation.test_ref_range_results(
ref_lims, ref_D, ref_I,
new_lims, new_D, new_I
)
def test_range_L2(self):
self.do_test_range(faiss.METRIC_L2)
def test_range_IP(self):
self.do_test_range(faiss.METRIC_INNER_PRODUCT)
def test_query_iterator(self, metric=faiss.METRIC_L2):
ds = datasets.SyntheticDataset(32, 0, 1000, 1000)
xq = ds.get_queries()
xb = ds.get_database()
D, I = faiss.knn(xq, xb, 10, metric=metric)
threshold = float(D[:, -1].mean())
print(threshold)
index = faiss.IndexFlat(32, metric)
index.add(xb)
ref_lims, ref_D, ref_I = index.range_search(xq, threshold)
def matrix_iterator(xb, bs):
for i0 in range(0, xb.shape[0], bs):
yield xb[i0:i0 + bs]
# check repro OK
_, new_lims, new_D, new_I = range_search_max_results(
index, matrix_iterator(xq, 100), threshold, max_results=1e10)
evaluation.test_ref_range_results(
ref_lims, ref_D, ref_I,
new_lims, new_D, new_I
)
max_res = ref_lims[-1] // 2
new_threshold, new_lims, new_D, new_I = range_search_max_results(
index, matrix_iterator(xq, 100), threshold, max_results=max_res)
self.assertLessEqual(new_lims[-1], max_res)
ref_lims, ref_D, ref_I = index.range_search(xq, new_threshold)
evaluation.test_ref_range_results(
ref_lims, ref_D, ref_I,
new_lims, new_D, new_I
)
class TestInspect(unittest.TestCase):
def test_LinearTransform(self):
# training data
xt = np.random.rand(1000, 20).astype('float32')
# test data
x = np.random.rand(10, 20).astype('float32')
# make the PCA matrix
pca = faiss.PCAMatrix(20, 10)
pca.train(xt)
# apply it to test data
yref = pca.apply_py(x)
A, b = inspect_tools.get_LinearTransform_matrix(pca)
# verify
ynew = x @ A.T + b
np.testing.assert_array_almost_equal(yref, ynew)
def test_IndexFlat(self):
xb = np.random.rand(13, 20).astype('float32')
index = faiss.IndexFlatL2(20)
index.add(xb)
np.testing.assert_array_equal(
xb, inspect_tools.get_flat_data(index)
)
class TestRangeEval(unittest.TestCase):
def test_precision_recall(self):
Iref = [
[1, 2, 3],
[5, 6],
[],
[]
]
Inew = [
[1, 2],
[6, 7],
[1],
[]
]
lims_ref = np.cumsum([0] + [len(x) for x in Iref])
Iref = np.hstack(Iref)
lims_new = np.cumsum([0] + [len(x) for x in Inew])
Inew = np.hstack(Inew)
precision, recall = evaluation.range_PR(lims_ref, Iref, lims_new, Inew)
print(precision, recall)
self.assertEqual(precision, 0.6)
self.assertEqual(recall, 0.6)
def test_PR_multiple(self):
metric = faiss.METRIC_L2
ds = datasets.SyntheticDataset(32, 1000, 1000, 10)
xq = ds.get_queries()
xb = ds.get_database()
# good for ~10k results
threshold = 15
index = faiss.IndexFlat(32, metric)
index.add(xb)
ref_lims, ref_D, ref_I = index.range_search(xq, threshold)
# now make a slightly suboptimal index
index2 = faiss.index_factory(32, "PCA16,Flat")
index2.train(ds.get_train())
index2.add(xb)
# PCA reduces distances so will have more results
new_lims, new_D, new_I = index2.range_search(xq, threshold)
all_thr = np.array([5.0, 10.0, 12.0, 15.0])
for mode in "overall", "average":
ref_precisions = np.zeros_like(all_thr)
ref_recalls = np.zeros_like(all_thr)
for i, thr in enumerate(all_thr):
lims2, _, I2 = evaluation.filter_range_results(
new_lims, new_D, new_I, thr)
prec, recall = evaluation.range_PR(
ref_lims, ref_I, lims2, I2, mode=mode)
ref_precisions[i] = prec
ref_recalls[i] = recall
precisions, recalls = evaluation.range_PR_multiple_thresholds(
ref_lims, ref_I,
new_lims, new_D, new_I, all_thr,
mode=mode
)
np.testing.assert_array_almost_equal(ref_precisions, precisions)
np.testing.assert_array_almost_equal(ref_recalls, recalls)
class TestPreassigned(unittest.TestCase):
def test_float(self):
ds = datasets.SyntheticDataset(128, 2000, 2000, 200)
d = ds.d
xt = ds.get_train()
xq = ds.get_queries()
xb = ds.get_database()
# define alternative quantizer on the 20 first dims of vectors
km = faiss.Kmeans(20, 50)
km.train(xt[:, :20].copy())
alt_quantizer = km.index
index = faiss.index_factory(d, "IVF50,PQ16np")
index.by_residual = False
# (optional) fake coarse quantizer
fake_centroids = np.zeros((index.nlist, index.d), dtype="float32")
index.quantizer.add(fake_centroids)
# train the PQ part
index.train(xt)
# add elements xb
a = alt_quantizer.search(xb[:, :20].copy(), 1)[1].ravel()
ivf_tools.add_preassigned(index, xb, a)
# search elements xq, increase nprobe, check 4 first results w/ groundtruth
prev_inter_perf = 0
for nprobe in 1, 10, 20:
index.nprobe = nprobe
a = alt_quantizer.search(xq[:, :20].copy(), index.nprobe)[1]
D, I = ivf_tools.search_preassigned(index, xq, 4, a)
inter_perf = (I == ds.get_groundtruth()[:, :4]).sum() / I.size
self.assertTrue(inter_perf >= prev_inter_perf)
prev_inter_perf = inter_perf
# test range search
index.nprobe = 20
a = alt_quantizer.search(xq[:, :20].copy(), index.nprobe)[1]
# just to find a reasonable radius
D, I = ivf_tools.search_preassigned(index, xq, 4, a)
radius = D.max() * 1.01
lims, DR, IR = ivf_tools.range_search_preassigned(index, xq, radius, a)
# with that radius the k-NN results are a subset of the range search results
for q in range(len(xq)):
l0, l1 = lims[q], lims[q + 1]
self.assertTrue(set(I[q]) <= set(IR[l0:l1]))
def test_binary(self):
ds = datasets.SyntheticDataset(128, 2000, 2000, 200)
d = ds.d
xt = ds.get_train()
xq = ds.get_queries()
xb = ds.get_database()
# define alternative quantizer on the 20 first dims of vectors (will be in float)
km = faiss.Kmeans(20, 50)
km.train(xt[:, :20].copy())
alt_quantizer = km.index
binarizer = faiss.index_factory(d, "ITQ,LSHt")
binarizer.train(xt)
xb_bin = binarizer.sa_encode(xb)
xq_bin = binarizer.sa_encode(xq)
index = faiss.index_binary_factory(d, "BIVF200")
fake_centroids = np.zeros((index.nlist, index.d // 8), dtype="uint8")
index.quantizer.add(fake_centroids)
index.is_trained = True
# add elements xb
a = alt_quantizer.search(xb[:, :20].copy(), 1)[1].ravel()
ivf_tools.add_preassigned(index, xb_bin, a)
# search elements xq, increase nprobe, check 4 first results w/ groundtruth
prev_inter_perf = 0
for nprobe in 1, 10, 20:
index.nprobe = nprobe
a = alt_quantizer.search(xq[:, :20].copy(), index.nprobe)[1]
D, I = ivf_tools.search_preassigned(index, xq_bin, 4, a)
inter_perf = (I == ds.get_groundtruth()[:, :4]).sum() / I.size
self.assertTrue(inter_perf >= prev_inter_perf)
prev_inter_perf = inter_perf
# test range search
index.nprobe = 20
a = alt_quantizer.search(xq[:, :20].copy(), index.nprobe)[1]
# just to find a reasonable radius
D, I = ivf_tools.search_preassigned(index, xq_bin, 4, a)
radius = int(D.max() + 1)
lims, DR, IR = ivf_tools.range_search_preassigned(index, xq_bin, radius, a)
# with that radius the k-NN results are a subset of the range search results
for q in range(len(xq)):
l0, l1 = lims[q], lims[q + 1]
self.assertTrue(set(I[q]) <= set(IR[l0:l1]))
class TestRangeSearchMaxResults(unittest.TestCase):
def do_test(self, metric_type):
ds = datasets.SyntheticDataset(32, 0, 1000, 200)
index = faiss.IndexFlat(ds.d, metric_type)
index.add(ds.get_database())
# find a reasonable radius
D, _ = index.search(ds.get_queries(), 10)
radius0 = float(np.median(D[:, -1]))
# baseline = search with that radius
lims_ref, Dref, Iref = index.range_search(ds.get_queries(), radius0)
# now see if using just the total number of results, we can get back the same
# result table
query_iterator = exponential_query_iterator(ds.get_queries())
init_radius = 1e10 if metric_type == faiss.METRIC_L2 else -1e10
radius1, lims_new, Dnew, Inew = range_search_max_results(
index, query_iterator, init_radius, min_results=Dref.size, clip_to_min=True
)
evaluation.test_ref_range_results(
lims_ref, Dref, Iref,
lims_new, Dnew, Inew
)
def test_L2(self):
self.do_test(faiss.METRIC_L2)
def test_IP(self):
self.do_test(faiss.METRIC_INNER_PRODUCT)