747 lines
22 KiB
Python
747 lines
22 KiB
Python
#! /usr/bin/env python2
|
|
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
#
|
|
# This source code is licensed under the MIT license found in the
|
|
# LICENSE file in the root directory of this source tree.
|
|
|
|
from __future__ import print_function
|
|
import numpy as np
|
|
import time
|
|
import os
|
|
import sys
|
|
import faiss
|
|
import re
|
|
|
|
from multiprocessing.pool import ThreadPool
|
|
from datasets import ivecs_read
|
|
|
|
####################################################################
|
|
# Parse command line
|
|
####################################################################
|
|
|
|
|
|
def usage():
|
|
print("""
|
|
|
|
Usage: bench_gpu_1bn.py dataset indextype [options]
|
|
|
|
dataset: set of vectors to operate on.
|
|
Supported: SIFT1M, SIFT2M, ..., SIFT1000M or Deep1B
|
|
|
|
indextype: any index type supported by index_factory that runs on GPU.
|
|
|
|
General options
|
|
|
|
-ngpu ngpu nb of GPUs to use (default = all)
|
|
-tempmem N use N bytes of temporary GPU memory
|
|
-nocache do not read or write intermediate files
|
|
-float16 use 16-bit floats on the GPU side
|
|
|
|
Add options
|
|
|
|
-abs N split adds in blocks of no more than N vectors
|
|
-max_add N copy sharded dataset to CPU each max_add additions
|
|
(to avoid memory overflows with geometric reallocations)
|
|
-altadd Alternative add function, where the index is not stored
|
|
on GPU during add. Slightly faster for big datasets on
|
|
slow GPUs
|
|
|
|
Search options
|
|
|
|
-R R: nb of replicas of the same dataset (the dataset
|
|
will be copied across ngpu/R, default R=1)
|
|
-noptables do not use precomputed tables in IVFPQ.
|
|
-qbs N split queries in blocks of no more than N vectors
|
|
-nnn N search N neighbors for each query
|
|
-nprobe 4,16,64 try this number of probes
|
|
-knngraph instead of the standard setup for the dataset,
|
|
compute a k-nn graph with nnn neighbors per element
|
|
-oI xx%d.npy output the search result indices to this numpy file,
|
|
%d will be replaced with the nprobe
|
|
-oD xx%d.npy output the search result distances to this file
|
|
|
|
""", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
# default values
|
|
|
|
dbname = None
|
|
index_key = None
|
|
|
|
ngpu = faiss.get_num_gpus()
|
|
|
|
replicas = 1 # nb of replicas of sharded dataset
|
|
add_batch_size = 32768
|
|
query_batch_size = 16384
|
|
nprobes = [1 << l for l in range(9)]
|
|
knngraph = False
|
|
use_precomputed_tables = True
|
|
tempmem = -1 # if -1, use system default
|
|
max_add = -1
|
|
use_float16 = False
|
|
use_cache = True
|
|
nnn = 10
|
|
altadd = False
|
|
I_fname = None
|
|
D_fname = None
|
|
|
|
args = sys.argv[1:]
|
|
|
|
while args:
|
|
a = args.pop(0)
|
|
if a == '-h': usage()
|
|
elif a == '-ngpu': ngpu = int(args.pop(0))
|
|
elif a == '-R': replicas = int(args.pop(0))
|
|
elif a == '-noptables': use_precomputed_tables = False
|
|
elif a == '-abs': add_batch_size = int(args.pop(0))
|
|
elif a == '-qbs': query_batch_size = int(args.pop(0))
|
|
elif a == '-nnn': nnn = int(args.pop(0))
|
|
elif a == '-tempmem': tempmem = int(args.pop(0))
|
|
elif a == '-nocache': use_cache = False
|
|
elif a == '-knngraph': knngraph = True
|
|
elif a == '-altadd': altadd = True
|
|
elif a == '-float16': use_float16 = True
|
|
elif a == '-nprobe': nprobes = [int(x) for x in args.pop(0).split(',')]
|
|
elif a == '-max_add': max_add = int(args.pop(0))
|
|
elif not dbname: dbname = a
|
|
elif not index_key: index_key = a
|
|
else:
|
|
print("argument %s unknown" % a, file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
cacheroot = '/tmp/bench_gpu_1bn'
|
|
|
|
if not os.path.isdir(cacheroot):
|
|
print("%s does not exist, creating it" % cacheroot)
|
|
os.mkdir(cacheroot)
|
|
|
|
#################################################################
|
|
# Small Utility Functions
|
|
#################################################################
|
|
|
|
# we mem-map the biggest files to avoid having them in memory all at
|
|
# once
|
|
|
|
def mmap_fvecs(fname):
|
|
x = np.memmap(fname, dtype='int32', mode='r')
|
|
d = x[0]
|
|
return x.view('float32').reshape(-1, d + 1)[:, 1:]
|
|
|
|
def mmap_bvecs(fname):
|
|
x = np.memmap(fname, dtype='uint8', mode='r')
|
|
d = x[:4].view('int32')[0]
|
|
return x.reshape(-1, d + 4)[:, 4:]
|
|
|
|
|
|
def rate_limited_imap(f, l):
|
|
"""A threaded imap that does not produce elements faster than they
|
|
are consumed"""
|
|
pool = ThreadPool(1)
|
|
res = None
|
|
for i in l:
|
|
res_next = pool.apply_async(f, (i, ))
|
|
if res:
|
|
yield res.get()
|
|
res = res_next
|
|
yield res.get()
|
|
|
|
|
|
class IdentPreproc:
|
|
"""a pre-processor is either a faiss.VectorTransform or an IndentPreproc"""
|
|
|
|
def __init__(self, d):
|
|
self.d_in = self.d_out = d
|
|
|
|
def apply_py(self, x):
|
|
return x
|
|
|
|
|
|
def sanitize(x):
|
|
""" convert array to a c-contiguous float array """
|
|
return np.ascontiguousarray(x.astype('float32'))
|
|
|
|
|
|
def dataset_iterator(x, preproc, bs):
|
|
""" iterate over the lines of x in blocks of size bs"""
|
|
|
|
nb = x.shape[0]
|
|
block_ranges = [(i0, min(nb, i0 + bs))
|
|
for i0 in range(0, nb, bs)]
|
|
|
|
def prepare_block(i01):
|
|
i0, i1 = i01
|
|
xb = sanitize(x[i0:i1])
|
|
return i0, preproc.apply_py(xb)
|
|
|
|
return rate_limited_imap(prepare_block, block_ranges)
|
|
|
|
|
|
def eval_intersection_measure(gt_I, I):
|
|
""" measure intersection measure (used for knngraph)"""
|
|
inter = 0
|
|
rank = I.shape[1]
|
|
assert gt_I.shape[1] >= rank
|
|
for q in range(nq_gt):
|
|
inter += faiss.ranklist_intersection_size(
|
|
rank, faiss.swig_ptr(gt_I[q, :]),
|
|
rank, faiss.swig_ptr(I[q, :].astype('int64')))
|
|
return inter / float(rank * nq_gt)
|
|
|
|
|
|
#################################################################
|
|
# Prepare dataset
|
|
#################################################################
|
|
|
|
print("Preparing dataset", dbname)
|
|
|
|
if dbname.startswith('SIFT'):
|
|
# SIFT1M to SIFT1000M
|
|
dbsize = int(dbname[4:-1])
|
|
xb = mmap_bvecs('bigann/bigann_base.bvecs')
|
|
xq = mmap_bvecs('bigann/bigann_query.bvecs')
|
|
xt = mmap_bvecs('bigann/bigann_learn.bvecs')
|
|
|
|
# trim xb to correct size
|
|
xb = xb[:dbsize * 1000 * 1000]
|
|
|
|
gt_I = ivecs_read('bigann/gnd/idx_%dM.ivecs' % dbsize)
|
|
|
|
elif dbname == 'Deep1B':
|
|
xb = mmap_fvecs('deep1b/base.fvecs')
|
|
xq = mmap_fvecs('deep1b/deep1B_queries.fvecs')
|
|
xt = mmap_fvecs('deep1b/learn.fvecs')
|
|
# deep1B's train is is outrageously big
|
|
xt = xt[:10 * 1000 * 1000]
|
|
gt_I = ivecs_read('deep1b/deep1B_groundtruth.ivecs')
|
|
|
|
else:
|
|
print('unknown dataset', dbname, file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if knngraph:
|
|
# convert to knn-graph dataset
|
|
xq = xb
|
|
xt = xb
|
|
# we compute the ground-truth on this number of queries for validation
|
|
nq_gt = 10000
|
|
gt_sl = 100
|
|
|
|
# ground truth will be computed below
|
|
gt_I = None
|
|
|
|
|
|
print("sizes: B %s Q %s T %s gt %s" % (
|
|
xb.shape, xq.shape, xt.shape,
|
|
gt_I.shape if gt_I is not None else None))
|
|
|
|
|
|
|
|
#################################################################
|
|
# Parse index_key and set cache files
|
|
#
|
|
# The index_key is a valid factory key that would work, but we
|
|
# decompose the training to do it faster
|
|
#################################################################
|
|
|
|
|
|
pat = re.compile('(OPQ[0-9]+(_[0-9]+)?,|PCAR[0-9]+,)?' +
|
|
'(IVF[0-9]+),' +
|
|
'(PQ[0-9]+|Flat)')
|
|
|
|
matchobject = pat.match(index_key)
|
|
|
|
assert matchobject, 'could not parse ' + index_key
|
|
|
|
mog = matchobject.groups()
|
|
|
|
preproc_str = mog[0]
|
|
ivf_str = mog[2]
|
|
pqflat_str = mog[3]
|
|
|
|
ncent = int(ivf_str[3:])
|
|
|
|
prefix = ''
|
|
|
|
if knngraph:
|
|
gt_cachefile = '%s/BK_gt_%s.npy' % (cacheroot, dbname)
|
|
prefix = 'BK_'
|
|
# files must be kept distinct because the training set is not the
|
|
# same for the knngraph
|
|
|
|
if preproc_str:
|
|
preproc_cachefile = '%s/%spreproc_%s_%s.vectrans' % (
|
|
cacheroot, prefix, dbname, preproc_str[:-1])
|
|
else:
|
|
preproc_cachefile = None
|
|
preproc_str = ''
|
|
|
|
cent_cachefile = '%s/%scent_%s_%s%s.npy' % (
|
|
cacheroot, prefix, dbname, preproc_str, ivf_str)
|
|
|
|
index_cachefile = '%s/%s%s_%s%s,%s.index' % (
|
|
cacheroot, prefix, dbname, preproc_str, ivf_str, pqflat_str)
|
|
|
|
|
|
if not use_cache:
|
|
preproc_cachefile = None
|
|
cent_cachefile = None
|
|
index_cachefile = None
|
|
|
|
print("cachefiles:")
|
|
print(preproc_cachefile)
|
|
print(cent_cachefile)
|
|
print(index_cachefile)
|
|
|
|
|
|
#################################################################
|
|
# Wake up GPUs
|
|
#################################################################
|
|
|
|
print("preparing resources for %d GPUs" % ngpu)
|
|
|
|
gpu_resources = []
|
|
|
|
for i in range(ngpu):
|
|
res = faiss.StandardGpuResources()
|
|
if tempmem >= 0:
|
|
res.setTempMemory(tempmem)
|
|
gpu_resources.append(res)
|
|
|
|
|
|
def make_vres_vdev(i0=0, i1=-1):
|
|
" return vectors of device ids and resources useful for gpu_multiple"
|
|
vres = faiss.GpuResourcesVector()
|
|
vdev = faiss.IntVector()
|
|
if i1 == -1:
|
|
i1 = ngpu
|
|
for i in range(i0, i1):
|
|
vdev.push_back(i)
|
|
vres.push_back(gpu_resources[i])
|
|
return vres, vdev
|
|
|
|
|
|
#################################################################
|
|
# Prepare ground truth (for the knngraph)
|
|
#################################################################
|
|
|
|
|
|
def compute_GT():
|
|
print("compute GT")
|
|
t0 = time.time()
|
|
|
|
gt_I = np.zeros((nq_gt, gt_sl), dtype='int64')
|
|
gt_D = np.zeros((nq_gt, gt_sl), dtype='float32')
|
|
heaps = faiss.float_maxheap_array_t()
|
|
heaps.k = gt_sl
|
|
heaps.nh = nq_gt
|
|
heaps.val = faiss.swig_ptr(gt_D)
|
|
heaps.ids = faiss.swig_ptr(gt_I)
|
|
heaps.heapify()
|
|
bs = 10 ** 5
|
|
|
|
n, d = xb.shape
|
|
xqs = sanitize(xq[:nq_gt])
|
|
|
|
db_gt = faiss.IndexFlatL2(d)
|
|
vres, vdev = make_vres_vdev()
|
|
db_gt_gpu = faiss.index_cpu_to_gpu_multiple(
|
|
vres, vdev, db_gt)
|
|
|
|
# compute ground-truth by blocks of bs, and add to heaps
|
|
for i0, xsl in dataset_iterator(xb, IdentPreproc(d), bs):
|
|
db_gt_gpu.add(xsl)
|
|
D, I = db_gt_gpu.search(xqs, gt_sl)
|
|
I += i0
|
|
heaps.addn_with_ids(
|
|
gt_sl, faiss.swig_ptr(D), faiss.swig_ptr(I), gt_sl)
|
|
db_gt_gpu.reset()
|
|
print("\r %d/%d, %.3f s" % (i0, n, time.time() - t0), end=' ')
|
|
print()
|
|
heaps.reorder()
|
|
|
|
print("GT time: %.3f s" % (time.time() - t0))
|
|
return gt_I
|
|
|
|
|
|
if knngraph:
|
|
|
|
if gt_cachefile and os.path.exists(gt_cachefile):
|
|
print("load GT", gt_cachefile)
|
|
gt_I = np.load(gt_cachefile)
|
|
else:
|
|
gt_I = compute_GT()
|
|
if gt_cachefile:
|
|
print("store GT", gt_cachefile)
|
|
np.save(gt_cachefile, gt_I)
|
|
|
|
#################################################################
|
|
# Prepare the vector transformation object (pure CPU)
|
|
#################################################################
|
|
|
|
|
|
def train_preprocessor():
|
|
print("train preproc", preproc_str)
|
|
d = xt.shape[1]
|
|
t0 = time.time()
|
|
if preproc_str.startswith('OPQ'):
|
|
fi = preproc_str[3:-1].split('_')
|
|
m = int(fi[0])
|
|
dout = int(fi[1]) if len(fi) == 2 else d
|
|
preproc = faiss.OPQMatrix(d, m, dout)
|
|
elif preproc_str.startswith('PCAR'):
|
|
dout = int(preproc_str[4:-1])
|
|
preproc = faiss.PCAMatrix(d, dout, 0, True)
|
|
else:
|
|
assert False
|
|
preproc.train(sanitize(xt[:1000000]))
|
|
print("preproc train done in %.3f s" % (time.time() - t0))
|
|
return preproc
|
|
|
|
|
|
def get_preprocessor():
|
|
if preproc_str:
|
|
if not preproc_cachefile or not os.path.exists(preproc_cachefile):
|
|
preproc = train_preprocessor()
|
|
if preproc_cachefile:
|
|
print("store", preproc_cachefile)
|
|
faiss.write_VectorTransform(preproc, preproc_cachefile)
|
|
else:
|
|
print("load", preproc_cachefile)
|
|
preproc = faiss.read_VectorTransform(preproc_cachefile)
|
|
else:
|
|
d = xb.shape[1]
|
|
preproc = IdentPreproc(d)
|
|
return preproc
|
|
|
|
|
|
#################################################################
|
|
# Prepare the coarse quantizer
|
|
#################################################################
|
|
|
|
|
|
def train_coarse_quantizer(x, k, preproc):
|
|
d = preproc.d_out
|
|
clus = faiss.Clustering(d, k)
|
|
clus.verbose = True
|
|
# clus.niter = 2
|
|
clus.max_points_per_centroid = 10000000
|
|
|
|
print("apply preproc on shape", x.shape, 'k=', k)
|
|
t0 = time.time()
|
|
x = preproc.apply_py(sanitize(x))
|
|
print(" preproc %.3f s output shape %s" % (
|
|
time.time() - t0, x.shape))
|
|
|
|
vres, vdev = make_vres_vdev()
|
|
index = faiss.index_cpu_to_gpu_multiple(
|
|
vres, vdev, faiss.IndexFlatL2(d))
|
|
|
|
clus.train(x, index)
|
|
centroids = faiss.vector_float_to_array(clus.centroids)
|
|
|
|
return centroids.reshape(k, d)
|
|
|
|
|
|
def prepare_coarse_quantizer(preproc):
|
|
|
|
if cent_cachefile and os.path.exists(cent_cachefile):
|
|
print("load centroids", cent_cachefile)
|
|
centroids = np.load(cent_cachefile)
|
|
else:
|
|
nt = max(1000000, 256 * ncent)
|
|
print("train coarse quantizer...")
|
|
t0 = time.time()
|
|
centroids = train_coarse_quantizer(xt[:nt], ncent, preproc)
|
|
print("Coarse train time: %.3f s" % (time.time() - t0))
|
|
if cent_cachefile:
|
|
print("store centroids", cent_cachefile)
|
|
np.save(cent_cachefile, centroids)
|
|
|
|
coarse_quantizer = faiss.IndexFlatL2(preproc.d_out)
|
|
coarse_quantizer.add(centroids)
|
|
|
|
return coarse_quantizer
|
|
|
|
|
|
#################################################################
|
|
# Make index and add elements to it
|
|
#################################################################
|
|
|
|
|
|
def prepare_trained_index(preproc):
|
|
|
|
coarse_quantizer = prepare_coarse_quantizer(preproc)
|
|
d = preproc.d_out
|
|
if pqflat_str == 'Flat':
|
|
print("making an IVFFlat index")
|
|
idx_model = faiss.IndexIVFFlat(coarse_quantizer, d, ncent,
|
|
faiss.METRIC_L2)
|
|
else:
|
|
m = int(pqflat_str[2:])
|
|
assert m < 56 or use_float16, "PQ%d will work only with -float16" % m
|
|
print("making an IVFPQ index, m = ", m)
|
|
idx_model = faiss.IndexIVFPQ(coarse_quantizer, d, ncent, m, 8)
|
|
|
|
coarse_quantizer.this.disown()
|
|
idx_model.own_fields = True
|
|
|
|
# finish training on CPU
|
|
t0 = time.time()
|
|
print("Training vector codes")
|
|
x = preproc.apply_py(sanitize(xt[:1000000]))
|
|
idx_model.train(x)
|
|
print(" done %.3f s" % (time.time() - t0))
|
|
|
|
return idx_model
|
|
|
|
|
|
def compute_populated_index(preproc):
|
|
"""Add elements to a sharded index. Return the index and if available
|
|
a sharded gpu_index that contains the same data. """
|
|
|
|
indexall = prepare_trained_index(preproc)
|
|
|
|
co = faiss.GpuMultipleClonerOptions()
|
|
co.useFloat16 = use_float16
|
|
co.useFloat16CoarseQuantizer = False
|
|
co.usePrecomputed = use_precomputed_tables
|
|
co.indicesOptions = faiss.INDICES_CPU
|
|
co.verbose = True
|
|
co.reserveVecs = max_add if max_add > 0 else xb.shape[0]
|
|
co.shard = True
|
|
assert co.shard_type in (0, 1, 2)
|
|
vres, vdev = make_vres_vdev()
|
|
gpu_index = faiss.index_cpu_to_gpu_multiple(
|
|
vres, vdev, indexall, co)
|
|
|
|
print("add...")
|
|
t0 = time.time()
|
|
nb = xb.shape[0]
|
|
for i0, xs in dataset_iterator(xb, preproc, add_batch_size):
|
|
i1 = i0 + xs.shape[0]
|
|
gpu_index.add_with_ids(xs, np.arange(i0, i1))
|
|
if max_add > 0 and gpu_index.ntotal > max_add:
|
|
print("Flush indexes to CPU")
|
|
for i in range(ngpu):
|
|
index_src_gpu = faiss.downcast_index(gpu_index.at(i))
|
|
index_src = faiss.index_gpu_to_cpu(index_src_gpu)
|
|
print(" index %d size %d" % (i, index_src.ntotal))
|
|
index_src.copy_subset_to(indexall, 0, 0, nb)
|
|
index_src_gpu.reset()
|
|
index_src_gpu.reserveMemory(max_add)
|
|
gpu_index.sync_with_shard_indexes()
|
|
|
|
print('\r%d/%d (%.3f s) ' % (
|
|
i0, nb, time.time() - t0), end=' ')
|
|
sys.stdout.flush()
|
|
print("Add time: %.3f s" % (time.time() - t0))
|
|
|
|
print("Aggregate indexes to CPU")
|
|
t0 = time.time()
|
|
|
|
if hasattr(gpu_index, 'at'):
|
|
# it is a sharded index
|
|
for i in range(ngpu):
|
|
index_src = faiss.index_gpu_to_cpu(gpu_index.at(i))
|
|
print(" index %d size %d" % (i, index_src.ntotal))
|
|
index_src.copy_subset_to(indexall, 0, 0, nb)
|
|
else:
|
|
# simple index
|
|
index_src = faiss.index_gpu_to_cpu(gpu_index)
|
|
index_src.copy_subset_to(indexall, 0, 0, nb)
|
|
|
|
print(" done in %.3f s" % (time.time() - t0))
|
|
|
|
if max_add > 0:
|
|
# it does not contain all the vectors
|
|
gpu_index = None
|
|
|
|
return gpu_index, indexall
|
|
|
|
def compute_populated_index_2(preproc):
|
|
|
|
indexall = prepare_trained_index(preproc)
|
|
|
|
# set up a 3-stage pipeline that does:
|
|
# - stage 1: load + preproc
|
|
# - stage 2: assign on GPU
|
|
# - stage 3: add to index
|
|
|
|
stage1 = dataset_iterator(xb, preproc, add_batch_size)
|
|
|
|
vres, vdev = make_vres_vdev()
|
|
coarse_quantizer_gpu = faiss.index_cpu_to_gpu_multiple(
|
|
vres, vdev, indexall.quantizer)
|
|
|
|
def quantize(args):
|
|
(i0, xs) = args
|
|
_, assign = coarse_quantizer_gpu.search(xs, 1)
|
|
return i0, xs, assign.ravel()
|
|
|
|
stage2 = rate_limited_imap(quantize, stage1)
|
|
|
|
print("add...")
|
|
t0 = time.time()
|
|
nb = xb.shape[0]
|
|
|
|
for i0, xs, assign in stage2:
|
|
i1 = i0 + xs.shape[0]
|
|
if indexall.__class__ == faiss.IndexIVFPQ:
|
|
indexall.add_core_o(i1 - i0, faiss.swig_ptr(xs),
|
|
None, None, faiss.swig_ptr(assign))
|
|
elif indexall.__class__ == faiss.IndexIVFFlat:
|
|
indexall.add_core(i1 - i0, faiss.swig_ptr(xs), None,
|
|
faiss.swig_ptr(assign))
|
|
else:
|
|
assert False
|
|
|
|
print('\r%d/%d (%.3f s) ' % (
|
|
i0, nb, time.time() - t0), end=' ')
|
|
sys.stdout.flush()
|
|
print("Add time: %.3f s" % (time.time() - t0))
|
|
|
|
return None, indexall
|
|
|
|
|
|
|
|
def get_populated_index(preproc):
|
|
|
|
if not index_cachefile or not os.path.exists(index_cachefile):
|
|
if not altadd:
|
|
gpu_index, indexall = compute_populated_index(preproc)
|
|
else:
|
|
gpu_index, indexall = compute_populated_index_2(preproc)
|
|
if index_cachefile:
|
|
print("store", index_cachefile)
|
|
faiss.write_index(indexall, index_cachefile)
|
|
else:
|
|
print("load", index_cachefile)
|
|
indexall = faiss.read_index(index_cachefile)
|
|
gpu_index = None
|
|
|
|
co = faiss.GpuMultipleClonerOptions()
|
|
co.useFloat16 = use_float16
|
|
co.useFloat16CoarseQuantizer = False
|
|
co.usePrecomputed = use_precomputed_tables
|
|
co.indicesOptions = 0
|
|
co.verbose = True
|
|
co.shard = True # the replicas will be made "manually"
|
|
t0 = time.time()
|
|
print("CPU index contains %d vectors, move to GPU" % indexall.ntotal)
|
|
if replicas == 1:
|
|
|
|
if not gpu_index:
|
|
print("copying loaded index to GPUs")
|
|
vres, vdev = make_vres_vdev()
|
|
index = faiss.index_cpu_to_gpu_multiple(
|
|
vres, vdev, indexall, co)
|
|
else:
|
|
index = gpu_index
|
|
|
|
else:
|
|
del gpu_index # We override the GPU index
|
|
|
|
print("Copy CPU index to %d sharded GPU indexes" % replicas)
|
|
|
|
index = faiss.IndexReplicas()
|
|
|
|
for i in range(replicas):
|
|
gpu0 = ngpu * i / replicas
|
|
gpu1 = ngpu * (i + 1) / replicas
|
|
vres, vdev = make_vres_vdev(gpu0, gpu1)
|
|
|
|
print(" dispatch to GPUs %d:%d" % (gpu0, gpu1))
|
|
|
|
index1 = faiss.index_cpu_to_gpu_multiple(
|
|
vres, vdev, indexall, co)
|
|
index1.this.disown()
|
|
index.addIndex(index1)
|
|
index.own_fields = True
|
|
del indexall
|
|
print("move to GPU done in %.3f s" % (time.time() - t0))
|
|
return index
|
|
|
|
|
|
|
|
#################################################################
|
|
# Perform search
|
|
#################################################################
|
|
|
|
|
|
def eval_dataset(index, preproc):
|
|
|
|
ps = faiss.GpuParameterSpace()
|
|
ps.initialize(index)
|
|
|
|
nq_gt = gt_I.shape[0]
|
|
print("search...")
|
|
sl = query_batch_size
|
|
nq = xq.shape[0]
|
|
for nprobe in nprobes:
|
|
ps.set_index_parameter(index, 'nprobe', nprobe)
|
|
t0 = time.time()
|
|
|
|
if sl == 0:
|
|
D, I = index.search(preproc.apply_py(sanitize(xq)), nnn)
|
|
else:
|
|
I = np.empty((nq, nnn), dtype='int32')
|
|
D = np.empty((nq, nnn), dtype='float32')
|
|
|
|
inter_res = ''
|
|
|
|
for i0, xs in dataset_iterator(xq, preproc, sl):
|
|
print('\r%d/%d (%.3f s%s) ' % (
|
|
i0, nq, time.time() - t0, inter_res), end=' ')
|
|
sys.stdout.flush()
|
|
|
|
i1 = i0 + xs.shape[0]
|
|
Di, Ii = index.search(xs, nnn)
|
|
|
|
I[i0:i1] = Ii
|
|
D[i0:i1] = Di
|
|
|
|
if knngraph and not inter_res and i1 >= nq_gt:
|
|
ires = eval_intersection_measure(
|
|
gt_I[:, :nnn], I[:nq_gt])
|
|
inter_res = ', %.4f' % ires
|
|
|
|
t1 = time.time()
|
|
if knngraph:
|
|
ires = eval_intersection_measure(gt_I[:, :nnn], I[:nq_gt])
|
|
print(" probe=%-3d: %.3f s rank-%d intersection results: %.4f" % (
|
|
nprobe, t1 - t0, nnn, ires))
|
|
else:
|
|
print(" probe=%-3d: %.3f s" % (nprobe, t1 - t0), end=' ')
|
|
gtc = gt_I[:, :1]
|
|
nq = xq.shape[0]
|
|
for rank in 1, 10, 100:
|
|
if rank > nnn: continue
|
|
nok = (I[:, :rank] == gtc).sum()
|
|
print("1-R@%d: %.4f" % (rank, nok / float(nq)), end=' ')
|
|
print()
|
|
if I_fname:
|
|
I_fname_i = I_fname % I
|
|
print("storing", I_fname_i)
|
|
np.save(I, I_fname_i)
|
|
if D_fname:
|
|
D_fname_i = I_fname % I
|
|
print("storing", D_fname_i)
|
|
np.save(D, D_fname_i)
|
|
|
|
|
|
#################################################################
|
|
# Driver
|
|
#################################################################
|
|
|
|
|
|
preproc = get_preprocessor()
|
|
|
|
index = get_populated_index(preproc)
|
|
|
|
eval_dataset(index, preproc)
|
|
|
|
# make sure index is deleted before the resources
|
|
del index
|