Faiss
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
/data/users/matthijs/github_faiss/faiss/MetaIndexes.cpp
1 
2 /**
3  * Copyright (c) 2015-present, Facebook, Inc.
4  * All rights reserved.
5  *
6  * This source code is licensed under the CC-by-NC license found in the
7  * LICENSE file in the root directory of this source tree.
8  */
9 
10 // Copyright 2004-present Facebook. All Rights Reserved
11 // -*- c++ -*-
12 
13 #include "MetaIndexes.h"
14 
15 #include <pthread.h>
16 
17 #include <cstdio>
18 
19 #include "FaissAssert.h"
20 #include "Heap.h"
21 
22 namespace faiss {
23 
24 /*****************************************************
25  * IndexIDMap implementation
26  *******************************************************/
27 
28 IndexIDMap::IndexIDMap (Index *index):
29  index (index),
30  own_fields (false)
31 {
32  FAISS_ASSERT (index->ntotal == 0 || !"index must be empty on input");
33  is_trained = index->is_trained;
34  metric_type = index->metric_type;
35  verbose = index->verbose;
36  d = index->d;
37  set_typename ();
38 }
39 
40 void IndexIDMap::add (idx_t, const float *)
41 {
42  FAISS_ASSERT (!"add does not make sense with IndexIDMap, "
43  "use add_with_ids");
44 }
45 
46 
47 void IndexIDMap::train (idx_t n, const float *x)
48 {
49  index->train (n, x);
50  is_trained = index->is_trained;
51 }
52 
54 {
55  index->reset ();
56  ntotal = 0;
57 }
58 
59 
60 void IndexIDMap::add_with_ids (idx_t n, const float * x, const long *xids)
61 {
62  index->add (n, x);
63  for (idx_t i = 0; i < n; i++)
64  id_map.push_back (xids[i]);
65  ntotal = index->ntotal;
66 }
67 
68 
69 void IndexIDMap::search (idx_t n, const float *x, idx_t k,
70  float *distances, idx_t *labels) const
71 {
72  index->search (n, x, k, distances, labels);
73  idx_t *li = labels;
74  for (idx_t i = 0; i < n * k; i++) {
75  li[i] = li[i] < 0 ? li[i] : id_map[li[i]];
76  }
77 }
78 
79 
80 
81 IndexIDMap::~IndexIDMap ()
82 {
83  if (own_fields) delete index;
84 }
85 
86 void IndexIDMap::set_typename ()
87 {
88  index_typename = "IDMap[" + index->index_typename + "]";
89 }
90 
91 
92 /*****************************************************
93  * IndexShards implementation
94  *******************************************************/
95 
96 // subroutines
97 namespace {
98 
99 
100 typedef Index::idx_t idx_t;
101 
102 
103 template<class Job>
104 struct Thread {
105  Job job;
106  pthread_t thread;
107 
108  Thread () {}
109 
110  explicit Thread (const Job & job): job(job) {}
111 
112  void start () {
113  pthread_create (&thread, nullptr, run, this);
114  }
115 
116  void wait () {
117  pthread_join (thread, nullptr);
118  }
119 
120  static void * run (void *arg) {
121  static_cast<Thread*> (arg)->job.run();
122  return nullptr;
123  }
124 
125 };
126 
127 
128 /// callback + thread management to train 1 shard
129 struct TrainJob {
130  IndexShards *index; // the relevant index
131  int no; // shard number
132  idx_t n; // train points
133  const float *x;
134 
135  void run ()
136  {
137  if (index->verbose)
138  printf ("begin train shard %d on %ld points\n", no, n);
139  index->shard_indexes [no]->train(n, x);
140  if (index->verbose)
141  printf ("end train shard %d\n", no);
142  }
143 
144 };
145 
146 struct AddJob {
147  IndexShards *index; // the relevant index
148  int no; // shard number
149 
150  idx_t n;
151  const float *x;
152  const idx_t *ids;
153 
154  void run ()
155  {
156  if (index->verbose)
157  printf ("begin add shard %d on %ld points\n", no, n);
158  if (ids)
159  index->shard_indexes[no]->add_with_ids (n, x, ids);
160  else
161  index->shard_indexes[no]->add (n, x);
162  if (index->verbose)
163  printf ("end add shard %d on %ld points\n", no, n);
164  }
165 };
166 
167 
168 
169 /// callback + thread management to query in 1 shard
170 struct QueryJob {
171  const IndexShards *index; // the relevant index
172  int no; // shard number
173 
174  // query params
175  idx_t n;
176  const float *x;
177  idx_t k;
178  float *distances;
179  idx_t *labels;
180 
181 
182  void run ()
183  {
184  if (index->verbose)
185  printf ("begin query shard %d on %ld points\n", no, n);
186  index->shard_indexes [no]->search (n, x, k,
187  distances, labels);
188  if (index->verbose)
189  printf ("end query shard %d\n", no);
190  }
191 
192 
193 };
194 
195 
196 
197 
198 // add translation to all valid labels
199 void translate_labels (long n, idx_t *labels, long translation)
200 {
201  if (translation == 0) return;
202  for (long i = 0; i < n; i++) {
203  if(labels[i] < 0) return;
204  labels[i] += translation;
205  }
206 }
207 
208 
209 /** merge result tables from several shards.
210  * @param all_distances size nshard * n * k
211  * @param all_labels idem
212  * @param translartions label translations to apply, size nshard
213  */
214 template <class C>
215 void merge_tables (long n, long k, long nshard,
216  float *distances, idx_t *labels,
217  const float *all_distances,
218  idx_t *all_labels,
219  const long *translations)
220 {
221  long shard_stride = n * k;
222 #pragma omp parallel for
223  for (long i = 0; i < n; i++) {
224  float *D = distances + i * k;
225  idx_t *I = labels + i * k;
226  const float *Ds = all_distances + i * k;
227  idx_t *Is = all_labels + i * k;
228  translate_labels (k, Is, translations[0]);
229  heap_heapify<C>(k, D, I, Ds, Is, k);
230  for (int s = 1; s < nshard; s++) {
231  Ds += shard_stride;
232  Is += shard_stride;
233  translate_labels (k, Is, translations[s]);
234  heap_addn<C> (k, D, I, Ds, Is, k);
235  }
236  heap_reorder<C>(k, D, I);
237  }
238 }
239 
240 
241 };
242 
243 
244 
245 
246 IndexShards::IndexShards (idx_t d, bool threaded, bool successive_ids):
247  Index (d), own_fields (false),
248  threaded (threaded), successive_ids (successive_ids)
249 {
250 
251 }
252 
253 void IndexShards::add_shard (Index *idx)
254 {
255  shard_indexes.push_back (idx);
256  sync_with_shard_indexes ();
257 }
258 
259 void IndexShards::sync_with_shard_indexes ()
260 {
261  if (shard_indexes.empty()) return;
262  Index * index0 = shard_indexes[0];
263  d = index0->d;
264  metric_type = index0->metric_type;
265  is_trained = index0->is_trained;
266  ntotal = index0->ntotal;
267  for (int i = 1; i < shard_indexes.size(); i++) {
268  Index * index = shard_indexes[i];
269  FAISS_ASSERT (metric_type == index->metric_type);
270  FAISS_ASSERT (d == index->d);
271  ntotal += index->ntotal;
272  }
273 }
274 
275 
276 void IndexShards::train (idx_t n, const float *x)
277 {
278 
279  // pre-alloc because we don't want reallocs
280  std::vector<Thread<TrainJob > > tss (shard_indexes.size());
281  int nt = 0;
282  for (int i = 0; i < shard_indexes.size(); i++) {
283  if(!shard_indexes[i]->is_trained) {
284  TrainJob ts = {this, i, n, x};
285  if (threaded) {
286  tss[nt] = Thread<TrainJob> (ts);
287  tss[nt++].start();
288  } else {
289  ts.run();
290  }
291  }
292  }
293  for (int i = 0; i < nt; i++) {
294  tss[i].wait();
295  }
296  sync_with_shard_indexes ();
297 }
298 
299 void IndexShards::add (idx_t n, const float *x)
300 {
301  add_with_ids (n, x, nullptr);
302 }
303 
304  /**
305  * Cases (successive_ids, xids):
306  * - true, non-NULL ERROR: it makes no sense to pass in ids and
307  * request them to be shifted
308  * - true, NULL OK, but should be called only once (calls add()
309  * on sub-indexes).
310  * - false, non-NULL OK: will call add_with_ids with passed in xids
311  * distributed evenly over shards
312  * - false, NULL OK: will call add_with_ids on each sub-index,
313  * starting at ntotal
314  */
315 
316 void IndexShards::add_with_ids (idx_t n, const float * x, const long *xids)
317 {
318 
319  FAISS_ASSERT(!(successive_ids && xids) ||
320  !"It makes no sense to pass in ids and request them to be shifted");
321 
322  if (successive_ids) {
323  FAISS_ASSERT(!xids ||
324  !"It makes no sense to pass in ids and request them to be shifted");
325  FAISS_ASSERT(ntotal == 0 ||
326  !"when adding to IndexShards with sucessive_ids, only add() "
327  "in a single pass is supported");
328  }
329 
330  long nshard = shard_indexes.size();
331  const long *ids = xids;
332  if (!ids && !successive_ids) {
333  long *aids = new long[n];
334  for (long i = 0; i < n; i++)
335  aids[i] = ntotal + i;
336  ids = aids;
337  }
338 
339  std::vector<Thread<AddJob > > asa (shard_indexes.size());
340  int nt = 0;
341  for (int i = 0; i < nshard; i++) {
342  long i0 = i * n / nshard;
343  long i1 = (i + 1) * n / nshard;
344 
345  AddJob as = {this, i,
346  i1 - i0, x + i0 * d,
347  ids ? ids + i0 : nullptr};
348  if (threaded) {
349  asa[nt] = Thread<AddJob>(as);
350  asa[nt++].start();
351  } else {
352  as.run();
353  }
354  }
355  for (int i = 0; i < nt; i++) {
356  asa[i].wait();
357  }
358  if (ids != xids) delete [] ids;
359  ntotal += n;
360 }
361 
362 
364 {
365  for (int i = 0; i < shard_indexes.size(); i++) {
366  shard_indexes[i]->reset ();
367  }
368  sync_with_shard_indexes ();
369 }
370 
372  idx_t n, const float *x, idx_t k,
373  float *distances, idx_t *labels) const
374 {
375  long nshard = shard_indexes.size();
376  float *all_distances = new float [nshard * k * n];
377  idx_t *all_labels = new idx_t [nshard * k * n];
378 
379 #if 1
380 
381  // pre-alloc because we don't want reallocs
382  std::vector<Thread<QueryJob> > qss (nshard);
383  for (int i = 0; i < nshard; i++) {
384  QueryJob qs = {
385  this, i, n, x, k,
386  all_distances + i * k * n,
387  all_labels + i * k * n
388  };
389  if (threaded) {
390  qss[i] = Thread<QueryJob> (qs);
391  qss[i].start();
392  } else {
393  qs.run();
394  }
395  }
396 
397  if (threaded) {
398  for (int i = 0; i < qss.size(); i++) {
399  qss[i].wait();
400  }
401  }
402 #else
403 
404  // pre-alloc because we don't want reallocs
405  std::vector<QueryJob> qss (nshard);
406  for (int i = 0; i < nshard; i++) {
407  QueryJob qs = {
408  this, i, n, x, k,
409  all_distances + i * k * n,
410  all_labels + i * k * n
411  };
412  if (threaded) {
413  qss[i] = qs;
414  } else {
415  qs.run();
416  }
417  }
418 
419  if (threaded) {
420 #pragma omp parallel for
421  for (int i = 0; i < qss.size(); i++) {
422  qss[i].run();
423  }
424  }
425 
426 #endif
427  std::vector<long> translations (nshard, 0);
428  if (successive_ids) {
429  translations[0] = 0;
430  for (int s = 0; s + 1 < nshard; s++)
431  translations [s + 1] = translations [s] +
432  shard_indexes [s]->ntotal;
433  }
434 
435  if (metric_type == METRIC_L2) {
436  merge_tables< CMax<float, idx_t> > (
437  n, k, nshard, distances, labels,
438  all_distances, all_labels, translations.data ());
439  } else {
440  merge_tables< CMin<float, idx_t> > (
441  n, k, nshard, distances, labels,
442  all_distances, all_labels, translations.data ());
443  }
444 
445  delete [] all_distances;
446  delete [] all_labels;
447 }
448 
449 
450 void IndexShards::set_typename ()
451 {
452 
453 }
454 
455 IndexShards::~IndexShards ()
456 {
457  if (own_fields) {
458  for (int s = 0; s < shard_indexes.size(); s++)
459  delete shard_indexes [s];
460  }
461 }
462 
463 
464 /*****************************************************
465  * IndexSplitVectors implementation
466  *******************************************************/
467 
468 
469 IndexSplitVectors::IndexSplitVectors (idx_t d, bool threaded):
470  Index (d), own_fields (false),
471  threaded (threaded), sum_d (0)
472 {
473 
474 }
475 
476 void IndexSplitVectors::add_sub_index (Index *index)
477 {
478  sub_indexes.push_back (index);
479  sync_with_sub_indexes ();
480 }
481 
482 void IndexSplitVectors::sync_with_sub_indexes ()
483 {
484  if (sub_indexes.empty()) return;
485  Index * index0 = sub_indexes[0];
486  sum_d = index0->d;
487  metric_type = index0->metric_type;
488  is_trained = index0->is_trained;
489  ntotal = index0->ntotal;
490  for (int i = 1; i < sub_indexes.size(); i++) {
491  Index * index = sub_indexes[i];
492  FAISS_ASSERT (metric_type == index->metric_type);
493  FAISS_ASSERT (ntotal == index->ntotal);
494  sum_d += index->d;
495  }
496 
497 }
498 
499 void IndexSplitVectors::add (idx_t n, const float *x)
500 {
501  FAISS_ASSERT (!"not implemented");
502 }
503 
504 namespace {
505 
506 /// callback + thread management to query in 1 shard
507 struct SplitQueryJob {
508  const IndexSplitVectors *index; // the relevant index
509  int no; // shard number
510 
511  // query params
512  idx_t n;
513  const float *x;
514  idx_t k;
515  float *distances;
516  idx_t *labels;
517 
518 
519  void run ()
520  {
521  if (index->verbose)
522  printf ("begin query shard %d on %ld points\n", no, n);
523  const Index * sub_index = index->sub_indexes[no];
524  long sub_d = sub_index->d, d = index->d;
525  idx_t ofs = 0;
526  for (int i = 0; i < no; i++) ofs += index->sub_indexes[i]->d;
527  float *sub_x = new float [sub_d * n];
528  for (idx_t i = 0; i < n; i++)
529  memcpy (sub_x + i * sub_d, x + ofs + i * d, sub_d * sizeof (sub_x));
530  sub_index->search (n, sub_x, k, distances, labels);
531  delete [] sub_x;
532  if (index->verbose)
533  printf ("end query shard %d\n", no);
534  }
535 
536 };
537 
538 
539 
540 }
541 
542 
543 
545  idx_t n, const float *x, idx_t k,
546  float *distances, idx_t *labels) const
547 {
548  FAISS_ASSERT (k == 1 || !"search implemented only for k=1");
549  FAISS_ASSERT (sum_d == d || !"not enough indexes compared to # dimensions");
550 
551  long nshard = sub_indexes.size();
552  float *all_distances = new float [nshard * k * n];
553  idx_t *all_labels = new idx_t [nshard * k * n];
554 
555  // pre-alloc because we don't want reallocs
556  std::vector<Thread<SplitQueryJob> > qss (nshard);
557  for (int i = 0; i < nshard; i++) {
558  SplitQueryJob qs = {
559  this, i, n, x, k,
560  i == 0 ? distances : all_distances + i * k * n,
561  i == 0 ? labels : all_labels + i * k * n
562  };
563  if (threaded) {
564  qss[i] = Thread<SplitQueryJob> (qs);
565  qss[i].start();
566  } else {
567  qs.run();
568  }
569  }
570 
571  if (threaded) {
572  for (int i = 0; i < qss.size(); i++) {
573  qss[i].wait();
574  }
575  }
576 
577  long factor = 1;
578  for (int i = 0; i < nshard; i++) {
579  if (i > 0) { // results of 0 are already in the table
580  const float *distances_i = all_distances + i * k * n;
581  const idx_t *labels_i = all_labels + i * k * n;
582  for (long j = 0; j < n; j++) {
583  if (labels[j] >= 0 && labels_i[j] >= 0) {
584  labels[j] += labels_i[j] * factor;
585  distances[j] += distances_i[j];
586  } else {
587  labels[j] = -1;
588  distances[j] = 0.0 / 0.0;
589  }
590  }
591  }
592  factor *= sub_indexes[i]->ntotal;
593  }
594  delete [] all_labels;
595  delete [] all_distances;
596 }
597 
598 
599 void IndexSplitVectors::train (idx_t n, const float *x)
600 {
601  FAISS_ASSERT (!"not implemented");
602 }
603 
605 {
606  FAISS_ASSERT (!"not implemented");
607 }
608 
609 void IndexSplitVectors::set_typename ()
610 {}
611 
612 IndexSplitVectors::~IndexSplitVectors ()
613 {
614  if (own_fields) {
615  for (int s = 0; s < sub_indexes.size(); s++)
616  delete sub_indexes [s];
617  }
618 }
619 
620 
621 
622 
623 
624 
625 }; // namespace faiss
virtual void train(idx_t n, const float *x) override
IndexShards(idx_t d, bool threaded=false, bool successive_ids=true)
virtual void reset()=0
removes all elements from the database.
virtual void search(idx_t n, const float *x, idx_t k, float *distances, idx_t *labels) const override
Definition: MetaIndexes.cpp:69
virtual void add_with_ids(idx_t n, const float *x, const long *xids) override
virtual void search(idx_t n, const float *x, idx_t k, float *distances, idx_t *labels) const override
virtual void reset() override
removes all elements from the database.
virtual void add(idx_t n, const float *x) override
this will fail. Use add_with_ids
Definition: MetaIndexes.cpp:40
int d
vector dimension
Definition: Index.h:66
std::vector< long > id_map
! whether pointers are deleted in destructo
Definition: MetaIndexes.h:29
virtual void add(idx_t n, const float *x)=0
virtual void train(idx_t n, const float *x) override
virtual void add(idx_t n, const float *x) override
supported only for sub-indices that implement add_with_ids
long idx_t
all indices are this type
Definition: Index.h:64
idx_t ntotal
total nb of indexed vectors
Definition: Index.h:67
bool verbose
verbosity level
Definition: Index.h:68
virtual void add(idx_t n, const float *x) override
bool threaded
should the sub-indexes be deleted along with this?
Definition: MetaIndexes.h:62
virtual void search(idx_t n, const float *x, idx_t k, float *distances, idx_t *labels) const =0
virtual void reset() override
removes all elements from the database.
virtual void reset() override
removes all elements from the database.
Definition: MetaIndexes.cpp:53
virtual void search(idx_t n, const float *x, idx_t k, float *distances, idx_t *labels) const override
MetricType metric_type
type of metric this index uses for search
Definition: Index.h:74
virtual void train(idx_t n, const float *x) override
Definition: MetaIndexes.cpp:47
bool is_trained
set if the Index does not require training, or if training is done already
Definition: Index.h:71
virtual void train(idx_t n, const float *x)
Definition: Index.h:92
virtual void add_with_ids(idx_t n, const float *x, const long *xids) override
Definition: MetaIndexes.cpp:60
IndexSplitVectors(idx_t d, bool threaded=false)
sum of dimensions seen so far
bool own_fields
! the sub-index
Definition: MetaIndexes.h:28