Faiss
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
/data/users/matthijs/github_faiss/faiss/MetaIndexes.cpp
1 /**
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the CC-by-NC license found in the
6  * LICENSE file in the root directory of this source tree.
7  */
8 
9 // Copyright 2004-present Facebook. All Rights Reserved
10 // -*- c++ -*-
11 
12 #include "MetaIndexes.h"
13 
14 #include <pthread.h>
15 
16 #include <cstdio>
17 
18 #include "FaissAssert.h"
19 #include "Heap.h"
20 #include "AuxIndexStructures.h"
21 
22 
23 namespace faiss {
24 
25 /*****************************************************
26  * IndexIDMap implementation
27  *******************************************************/
28 
29 IndexIDMap::IndexIDMap (Index *index):
30  index (index),
31  own_fields (false)
32 {
33  FAISS_THROW_IF_NOT_MSG (index->ntotal == 0, "index must be empty on input");
34  is_trained = index->is_trained;
35  metric_type = index->metric_type;
36  verbose = index->verbose;
37  d = index->d;
38 }
39 
40 void IndexIDMap::add (idx_t, const float *)
41 {
42  FAISS_THROW_MSG ("add does not make sense with IndexIDMap, "
43  "use add_with_ids");
44 }
45 
46 
47 void IndexIDMap::train (idx_t n, const float *x)
48 {
49  index->train (n, x);
50  is_trained = index->is_trained;
51 }
52 
54 {
55  index->reset ();
56  ntotal = 0;
57 }
58 
59 
60 void IndexIDMap::add_with_ids (idx_t n, const float * x, const long *xids)
61 {
62  index->add (n, x);
63  for (idx_t i = 0; i < n; i++)
64  id_map.push_back (xids[i]);
65  ntotal = index->ntotal;
66 }
67 
68 
69 void IndexIDMap::search (idx_t n, const float *x, idx_t k,
70  float *distances, idx_t *labels) const
71 {
72  index->search (n, x, k, distances, labels);
73  idx_t *li = labels;
74  for (idx_t i = 0; i < n * k; i++) {
75  li[i] = li[i] < 0 ? li[i] : id_map[li[i]];
76  }
77 }
78 
79 namespace {
80 
81 struct IDTranslatedSelector: IDSelector {
82  const std::vector <long> & id_map;
83  const IDSelector & sel;
84  IDTranslatedSelector (const std::vector <long> & id_map,
85  const IDSelector & sel):
86  id_map (id_map), sel (sel)
87  {}
88  bool is_member(idx_t id) const override {
89  return sel.is_member(id_map[id]);
90  }
91 };
92 
93 }
94 
96 {
97  // remove in sub-index first
98  IDTranslatedSelector sel2 = {id_map, sel};
99  long nremove = index->remove_ids (sel2);
100 
101  long j = 0;
102  for (idx_t i = 0; i < ntotal; i++) {
103  if (sel.is_member (id_map[i])) {
104  // remove
105  } else {
106  id_map[j] = id_map[i];
107  j++;
108  }
109  }
110  FAISS_ASSERT (j == index->ntotal);
111  ntotal = j;
112  return nremove;
113 }
114 
115 
116 
117 
118 IndexIDMap::~IndexIDMap ()
119 {
120  if (own_fields) delete index;
121 }
122 
123 
124 
125 /*****************************************************
126  * IndexShards implementation
127  *******************************************************/
128 
129 // subroutines
130 namespace {
131 
132 
133 typedef Index::idx_t idx_t;
134 
135 
136 template<class Job>
137 struct Thread {
138  Job job;
139  pthread_t thread;
140 
141  Thread () {}
142 
143  explicit Thread (const Job & job): job(job) {}
144 
145  void start () {
146  pthread_create (&thread, nullptr, run, this);
147  }
148 
149  void wait () {
150  pthread_join (thread, nullptr);
151  }
152 
153  static void * run (void *arg) {
154  static_cast<Thread*> (arg)->job.run();
155  return nullptr;
156  }
157 
158 };
159 
160 
161 /// callback + thread management to train 1 shard
162 struct TrainJob {
163  IndexShards *index; // the relevant index
164  int no; // shard number
165  idx_t n; // train points
166  const float *x;
167 
168  void run ()
169  {
170  if (index->verbose)
171  printf ("begin train shard %d on %ld points\n", no, n);
172  index->shard_indexes [no]->train(n, x);
173  if (index->verbose)
174  printf ("end train shard %d\n", no);
175  }
176 
177 };
178 
179 struct AddJob {
180  IndexShards *index; // the relevant index
181  int no; // shard number
182 
183  idx_t n;
184  const float *x;
185  const idx_t *ids;
186 
187  void run ()
188  {
189  if (index->verbose)
190  printf ("begin add shard %d on %ld points\n", no, n);
191  if (ids)
192  index->shard_indexes[no]->add_with_ids (n, x, ids);
193  else
194  index->shard_indexes[no]->add (n, x);
195  if (index->verbose)
196  printf ("end add shard %d on %ld points\n", no, n);
197  }
198 };
199 
200 
201 
202 /// callback + thread management to query in 1 shard
203 struct QueryJob {
204  const IndexShards *index; // the relevant index
205  int no; // shard number
206 
207  // query params
208  idx_t n;
209  const float *x;
210  idx_t k;
211  float *distances;
212  idx_t *labels;
213 
214 
215  void run ()
216  {
217  if (index->verbose)
218  printf ("begin query shard %d on %ld points\n", no, n);
219  index->shard_indexes [no]->search (n, x, k,
220  distances, labels);
221  if (index->verbose)
222  printf ("end query shard %d\n", no);
223  }
224 
225 
226 };
227 
228 
229 
230 
231 // add translation to all valid labels
232 void translate_labels (long n, idx_t *labels, long translation)
233 {
234  if (translation == 0) return;
235  for (long i = 0; i < n; i++) {
236  if(labels[i] < 0) return;
237  labels[i] += translation;
238  }
239 }
240 
241 
242 /** merge result tables from several shards.
243  * @param all_distances size nshard * n * k
244  * @param all_labels idem
245  * @param translartions label translations to apply, size nshard
246  */
247 
248 template <class C>
249 void merge_tables (long n, long k, long nshard,
250  float *distances, idx_t *labels,
251  const float *all_distances,
252  idx_t *all_labels,
253  const long *translations)
254 {
255  if(k == 0) {
256  return;
257  }
258 
259  long stride = n * k;
260 #pragma omp parallel
261  {
262  std::vector<int> buf (2 * nshard);
263  int * pointer = buf.data();
264  int * shard_ids = pointer + nshard;
265  std::vector<float> buf2 (nshard);
266  float * heap_vals = buf2.data();
267 #pragma omp for
268  for (long i = 0; i < n; i++) {
269  // the heap maps values to the shard where they are
270  // produced.
271  const float *D_in = all_distances + i * k;
272  const idx_t *I_in = all_labels + i * k;
273  int heap_size = 0;
274 
275  for (long s = 0; s < nshard; s++) {
276  pointer[s] = 0;
277  if (I_in[stride * s] >= 0)
278  heap_push<C> (++heap_size, heap_vals, shard_ids,
279  D_in[stride * s], s);
280  }
281 
282  float *D = distances + i * k;
283  idx_t *I = labels + i * k;
284 
285  for (int j = 0; j < k; j++) {
286  if (heap_size == 0) {
287  I[j] = -1;
288  D[j] = C::neutral();
289  } else {
290  // pop best element
291  int s = shard_ids[0];
292  int & p = pointer[s];
293  D[j] = heap_vals[0];
294  I[j] = I_in[stride * s + p] + translations[s];
295 
296  heap_pop<C> (heap_size--, heap_vals, shard_ids);
297  p++;
298  if (p < k && I_in[stride * s + p] >= 0)
299  heap_push<C> (++heap_size, heap_vals, shard_ids,
300  D_in[stride * s + p], s);
301  }
302  }
303  }
304  }
305 }
306 
307 
308 };
309 
310 
311 
312 
313 IndexShards::IndexShards (idx_t d, bool threaded, bool successive_ids):
314  Index (d), own_fields (false),
315  threaded (threaded), successive_ids (successive_ids)
316 {
317 
318 }
319 
320 void IndexShards::add_shard (Index *idx)
321 {
322  shard_indexes.push_back (idx);
323  sync_with_shard_indexes ();
324 }
325 
326 void IndexShards::sync_with_shard_indexes ()
327 {
328  if (shard_indexes.empty()) return;
329  Index * index0 = shard_indexes[0];
330  d = index0->d;
331  metric_type = index0->metric_type;
332  is_trained = index0->is_trained;
333  ntotal = index0->ntotal;
334  for (int i = 1; i < shard_indexes.size(); i++) {
335  Index * index = shard_indexes[i];
336  FAISS_THROW_IF_NOT (metric_type == index->metric_type);
337  FAISS_THROW_IF_NOT (d == index->d);
338  ntotal += index->ntotal;
339  }
340 }
341 
342 
343 void IndexShards::train (idx_t n, const float *x)
344 {
345 
346  // pre-alloc because we don't want reallocs
347  std::vector<Thread<TrainJob > > tss (shard_indexes.size());
348  int nt = 0;
349  for (int i = 0; i < shard_indexes.size(); i++) {
350  if(!shard_indexes[i]->is_trained) {
351  TrainJob ts = {this, i, n, x};
352  if (threaded) {
353  tss[nt] = Thread<TrainJob> (ts);
354  tss[nt++].start();
355  } else {
356  ts.run();
357  }
358  }
359  }
360  for (int i = 0; i < nt; i++) {
361  tss[i].wait();
362  }
363  sync_with_shard_indexes ();
364 }
365 
366 void IndexShards::add (idx_t n, const float *x)
367 {
368  add_with_ids (n, x, nullptr);
369 }
370 
371  /**
372  * Cases (successive_ids, xids):
373  * - true, non-NULL ERROR: it makes no sense to pass in ids and
374  * request them to be shifted
375  * - true, NULL OK, but should be called only once (calls add()
376  * on sub-indexes).
377  * - false, non-NULL OK: will call add_with_ids with passed in xids
378  * distributed evenly over shards
379  * - false, NULL OK: will call add_with_ids on each sub-index,
380  * starting at ntotal
381  */
382 
383 void IndexShards::add_with_ids (idx_t n, const float * x, const long *xids)
384 {
385 
386  FAISS_THROW_IF_NOT_MSG(!(successive_ids && xids),
387  "It makes no sense to pass in ids and "
388  "request them to be shifted");
389 
390  if (successive_ids) {
391  FAISS_THROW_IF_NOT_MSG(!xids,
392  "It makes no sense to pass in ids and "
393  "request them to be shifted");
394  FAISS_THROW_IF_NOT_MSG(ntotal == 0,
395  "when adding to IndexShards with sucessive_ids, "
396  "only add() in a single pass is supported");
397  }
398 
399  long nshard = shard_indexes.size();
400  const long *ids = xids;
401  ScopeDeleter<long> del;
402  if (!ids && !successive_ids) {
403  long *aids = new long[n];
404  for (long i = 0; i < n; i++)
405  aids[i] = ntotal + i;
406  ids = aids;
407  del.set (ids);
408  }
409 
410  std::vector<Thread<AddJob > > asa (shard_indexes.size());
411  int nt = 0;
412  for (int i = 0; i < nshard; i++) {
413  long i0 = i * n / nshard;
414  long i1 = (i + 1) * n / nshard;
415 
416  AddJob as = {this, i,
417  i1 - i0, x + i0 * d,
418  ids ? ids + i0 : nullptr};
419  if (threaded) {
420  asa[nt] = Thread<AddJob>(as);
421  asa[nt++].start();
422  } else {
423  as.run();
424  }
425  }
426  for (int i = 0; i < nt; i++) {
427  asa[i].wait();
428  }
429  ntotal += n;
430 }
431 
432 
433 
434 
435 
437 {
438  for (int i = 0; i < shard_indexes.size(); i++) {
439  shard_indexes[i]->reset ();
440  }
441  sync_with_shard_indexes ();
442 }
443 
445  idx_t n, const float *x, idx_t k,
446  float *distances, idx_t *labels) const
447 {
448  long nshard = shard_indexes.size();
449  float *all_distances = new float [nshard * k * n];
450  idx_t *all_labels = new idx_t [nshard * k * n];
451  ScopeDeleter<float> del (all_distances);
452  ScopeDeleter<idx_t> del2 (all_labels);
453 
454 #if 1
455 
456  // pre-alloc because we don't want reallocs
457  std::vector<Thread<QueryJob> > qss (nshard);
458  for (int i = 0; i < nshard; i++) {
459  QueryJob qs = {
460  this, i, n, x, k,
461  all_distances + i * k * n,
462  all_labels + i * k * n
463  };
464  if (threaded) {
465  qss[i] = Thread<QueryJob> (qs);
466  qss[i].start();
467  } else {
468  qs.run();
469  }
470  }
471 
472  if (threaded) {
473  for (int i = 0; i < qss.size(); i++) {
474  qss[i].wait();
475  }
476  }
477 #else
478 
479  // pre-alloc because we don't want reallocs
480  std::vector<QueryJob> qss (nshard);
481  for (int i = 0; i < nshard; i++) {
482  QueryJob qs = {
483  this, i, n, x, k,
484  all_distances + i * k * n,
485  all_labels + i * k * n
486  };
487  if (threaded) {
488  qss[i] = qs;
489  } else {
490  qs.run();
491  }
492  }
493 
494  if (threaded) {
495 #pragma omp parallel for
496  for (int i = 0; i < qss.size(); i++) {
497  qss[i].run();
498  }
499  }
500 
501 #endif
502  std::vector<long> translations (nshard, 0);
503  if (successive_ids) {
504  translations[0] = 0;
505  for (int s = 0; s + 1 < nshard; s++)
506  translations [s + 1] = translations [s] +
507  shard_indexes [s]->ntotal;
508  }
509 
510  if (metric_type == METRIC_L2) {
511  merge_tables< CMin<float, int> > (
512  n, k, nshard, distances, labels,
513  all_distances, all_labels, translations.data ());
514  } else {
515  merge_tables< CMax<float, int> > (
516  n, k, nshard, distances, labels,
517  all_distances, all_labels, translations.data ());
518  }
519 
520 }
521 
522 
523 
524 IndexShards::~IndexShards ()
525 {
526  if (own_fields) {
527  for (int s = 0; s < shard_indexes.size(); s++)
528  delete shard_indexes [s];
529  }
530 }
531 
532 
533 /*****************************************************
534  * IndexSplitVectors implementation
535  *******************************************************/
536 
537 
538 IndexSplitVectors::IndexSplitVectors (idx_t d, bool threaded):
539  Index (d), own_fields (false),
540  threaded (threaded), sum_d (0)
541 {
542 
543 }
544 
545 void IndexSplitVectors::add_sub_index (Index *index)
546 {
547  sub_indexes.push_back (index);
548  sync_with_sub_indexes ();
549 }
550 
551 void IndexSplitVectors::sync_with_sub_indexes ()
552 {
553  if (sub_indexes.empty()) return;
554  Index * index0 = sub_indexes[0];
555  sum_d = index0->d;
556  metric_type = index0->metric_type;
557  is_trained = index0->is_trained;
558  ntotal = index0->ntotal;
559  for (int i = 1; i < sub_indexes.size(); i++) {
560  Index * index = sub_indexes[i];
561  FAISS_THROW_IF_NOT (metric_type == index->metric_type);
562  FAISS_THROW_IF_NOT (ntotal == index->ntotal);
563  sum_d += index->d;
564  }
565 
566 }
567 
568 void IndexSplitVectors::add (idx_t n, const float *x)
569 {
570  FAISS_THROW_MSG ("not implemented");
571 }
572 
573 namespace {
574 
575 /// callback + thread management to query in 1 shard
576 struct SplitQueryJob {
577  const IndexSplitVectors *index; // the relevant index
578  int no; // shard number
579 
580  // query params
581  idx_t n;
582  const float *x;
583  idx_t k;
584  float *distances;
585  idx_t *labels;
586 
587 
588  void run ()
589  {
590  if (index->verbose)
591  printf ("begin query shard %d on %ld points\n", no, n);
592  const Index * sub_index = index->sub_indexes[no];
593  long sub_d = sub_index->d, d = index->d;
594  idx_t ofs = 0;
595  for (int i = 0; i < no; i++) ofs += index->sub_indexes[i]->d;
596  float *sub_x = new float [sub_d * n];
597  ScopeDeleter<float> del (sub_x);
598  for (idx_t i = 0; i < n; i++)
599  memcpy (sub_x + i * sub_d, x + ofs + i * d, sub_d * sizeof (sub_x));
600  sub_index->search (n, sub_x, k, distances, labels);
601  if (index->verbose)
602  printf ("end query shard %d\n", no);
603  }
604 
605 };
606 
607 
608 
609 }
610 
611 
612 
614  idx_t n, const float *x, idx_t k,
615  float *distances, idx_t *labels) const
616 {
617  FAISS_THROW_IF_NOT_MSG (k == 1,
618  "search implemented only for k=1");
619  FAISS_THROW_IF_NOT_MSG (sum_d == d,
620  "not enough indexes compared to # dimensions");
621 
622  long nshard = sub_indexes.size();
623  float *all_distances = new float [nshard * k * n];
624  idx_t *all_labels = new idx_t [nshard * k * n];
625  ScopeDeleter<float> del (all_distances);
626  ScopeDeleter<idx_t> del2 (all_labels);
627 
628  // pre-alloc because we don't want reallocs
629  std::vector<Thread<SplitQueryJob> > qss (nshard);
630  for (int i = 0; i < nshard; i++) {
631  SplitQueryJob qs = {
632  this, i, n, x, k,
633  i == 0 ? distances : all_distances + i * k * n,
634  i == 0 ? labels : all_labels + i * k * n
635  };
636  if (threaded) {
637  qss[i] = Thread<SplitQueryJob> (qs);
638  qss[i].start();
639  } else {
640  qs.run();
641  }
642  }
643 
644  if (threaded) {
645  for (int i = 0; i < qss.size(); i++) {
646  qss[i].wait();
647  }
648  }
649 
650  long factor = 1;
651  for (int i = 0; i < nshard; i++) {
652  if (i > 0) { // results of 0 are already in the table
653  const float *distances_i = all_distances + i * k * n;
654  const idx_t *labels_i = all_labels + i * k * n;
655  for (long j = 0; j < n; j++) {
656  if (labels[j] >= 0 && labels_i[j] >= 0) {
657  labels[j] += labels_i[j] * factor;
658  distances[j] += distances_i[j];
659  } else {
660  labels[j] = -1;
661  distances[j] = 0.0 / 0.0;
662  }
663  }
664  }
665  factor *= sub_indexes[i]->ntotal;
666  }
667 
668 }
669 
670 
671 void IndexSplitVectors::train (idx_t n, const float *x)
672 {
673  FAISS_THROW_MSG ("not implemented");
674 }
675 
677 {
678  FAISS_THROW_MSG ("not implemented");
679 }
680 
681 
682 IndexSplitVectors::~IndexSplitVectors ()
683 {
684  if (own_fields) {
685  for (int s = 0; s < sub_indexes.size(); s++)
686  delete sub_indexes [s];
687  }
688 }
689 
690 
691 
692 
693 
694 
695 }; // namespace faiss
void train(idx_t n, const float *x) override
IndexShards(idx_t d, bool threaded=false, bool successive_ids=true)
virtual void reset()=0
removes all elements from the database.
void search(idx_t n, const float *x, idx_t k, float *distances, idx_t *labels) const override
Definition: MetaIndexes.cpp:69
void add_with_ids(idx_t n, const float *x, const long *xids) override
void search(idx_t n, const float *x, idx_t k, float *distances, idx_t *labels) const override
void reset() override
removes all elements from the database.
void add(idx_t n, const float *x) override
this will fail. Use add_with_ids
Definition: MetaIndexes.cpp:40
int d
vector dimension
Definition: Index.h:64
std::vector< long > id_map
! whether pointers are deleted in destructo
Definition: MetaIndexes.h:28
virtual void add(idx_t n, const float *x)=0
void train(idx_t n, const float *x) override
void add(idx_t n, const float *x) override
supported only for sub-indices that implement add_with_ids
long idx_t
all indices are this type
Definition: Index.h:62
idx_t ntotal
total nb of indexed vectors
Definition: Index.h:65
bool verbose
verbosity level
Definition: Index.h:66
void add(idx_t n, const float *x) override
virtual long remove_ids(const IDSelector &sel)
Definition: Index.cpp:37
bool threaded
should the sub-indexes be deleted along with this?
Definition: MetaIndexes.h:64
virtual void search(idx_t n, const float *x, idx_t k, float *distances, idx_t *labels) const =0
void reset() override
removes all elements from the database.
void reset() override
removes all elements from the database.
Definition: MetaIndexes.cpp:53
void search(idx_t n, const float *x, idx_t k, float *distances, idx_t *labels) const override
long remove_ids(const IDSelector &sel) override
remove ids adapted to IndexFlat
Definition: MetaIndexes.cpp:95
MetricType metric_type
type of metric this index uses for search
Definition: Index.h:72
void train(idx_t n, const float *x) override
Definition: MetaIndexes.cpp:47
bool is_trained
set if the Index does not require training, or if training is done already
Definition: Index.h:69
virtual void train(idx_t n, const float *x)
Definition: Index.h:89
void add_with_ids(idx_t n, const float *x, const long *xids) override
Definition: MetaIndexes.cpp:60
IndexSplitVectors(idx_t d, bool threaded=false)
sum of dimensions seen so far
bool own_fields
! the sub-index
Definition: MetaIndexes.h:27