Faiss
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
/data/users/hoss/faiss/ThreadedIndex-inl.h
1 /**
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  */
7 
8 #include "FaissAssert.h"
9 #include <exception>
10 #include <iostream>
11 
12 namespace faiss {
13 
14 template <typename IndexT>
15 ThreadedIndex<IndexT>::ThreadedIndex(bool threaded)
16  // 0 is default dimension
17  : ThreadedIndex(0, threaded) {
18 }
19 
20 template <typename IndexT>
21 ThreadedIndex<IndexT>::ThreadedIndex(int d, bool threaded)
22  : IndexT(d),
23  own_fields(false),
24  isThreaded_(threaded) {
25  }
26 
27 template <typename IndexT>
28 ThreadedIndex<IndexT>::~ThreadedIndex() {
29  for (auto& p : indices_) {
30  if (isThreaded_) {
31  // should have worker thread
32  FAISS_ASSERT((bool) p.second);
33 
34  // This will also flush all pending work
35  p.second->stop();
36  p.second->waitForThreadExit();
37  } else {
38  // should not have worker thread
39  FAISS_ASSERT(!(bool) p.second);
40  }
41 
42  if (own_fields) {
43  delete p.first;
44  }
45  }
46 }
47 
48 template <typename IndexT>
49 void ThreadedIndex<IndexT>::addIndex(IndexT* index) {
50  // We inherit the dimension from the first index added to us if we don't have
51  // a set dimension
52  if (indices_.empty() && this->d == 0) {
53  this->d = index->d;
54  }
55 
56  // The new index must match our set dimension
57  FAISS_THROW_IF_NOT_FMT(this->d == index->d,
58  "addIndex: dimension mismatch for "
59  "newly added index; expecting dim %d, "
60  "new index has dim %d",
61  this->d, index->d);
62 
63  if (!indices_.empty()) {
64  auto& existing = indices_.front().first;
65 
66  FAISS_THROW_IF_NOT_MSG(index->metric_type == existing->metric_type,
67  "addIndex: newly added index is "
68  "of different metric type than old index");
69 
70  // Make sure this index is not duplicated
71  for (auto& p : indices_) {
72  FAISS_THROW_IF_NOT_MSG(p.first != index,
73  "addIndex: attempting to add index "
74  "that is already in the collection");
75  }
76  }
77 
78  indices_.emplace_back(
79  std::make_pair(
80  index,
81  std::unique_ptr<WorkerThread>(isThreaded_ ?
82  new WorkerThread : nullptr)));
83 
84  onAfterAddIndex(index);
85 }
86 
87 template <typename IndexT>
89  for (auto it = indices_.begin(); it != indices_.end(); ++it) {
90  if (it->first == index) {
91  // This is our index; stop the worker thread before removing it,
92  // to ensure that it has finished before function exit
93  if (isThreaded_) {
94  // should have worker thread
95  FAISS_ASSERT((bool) it->second);
96  it->second->stop();
97  it->second->waitForThreadExit();
98  } else {
99  // should not have worker thread
100  FAISS_ASSERT(!(bool) it->second);
101  }
102 
103  indices_.erase(it);
104  onAfterRemoveIndex(index);
105 
106  if (own_fields) {
107  delete index;
108  }
109 
110  return;
111  }
112  }
113 
114  // could not find our index
115  FAISS_THROW_MSG("IndexReplicas::removeIndex: index not found");
116 }
117 
118 template <typename IndexT>
119 void ThreadedIndex<IndexT>::runOnIndex(std::function<void(int, IndexT*)> f) {
120  if (isThreaded_) {
121  std::vector<std::future<bool>> v;
122 
123  for (int i = 0; i < this->indices_.size(); ++i) {
124  auto& p = this->indices_[i];
125  auto indexPtr = p.first;
126  v.emplace_back(p.second->add([f, i, indexPtr](){ f(i, indexPtr); }));
127  }
128 
129  waitAndHandleFutures(v);
130  } else {
131  // Multiple exceptions may be thrown; gather them as we encounter them,
132  // while letting everything else run to completion
133  std::vector<std::pair<int, std::exception_ptr>> exceptions;
134 
135  for (int i = 0; i < this->indices_.size(); ++i) {
136  auto& p = this->indices_[i];
137  try {
138  f(i, p.first);
139  } catch (...) {
140  exceptions.emplace_back(std::make_pair(i, std::current_exception()));
141  }
142  }
143 
144  handleExceptions(exceptions);
145  }
146 }
147 
148 template <typename IndexT>
150  std::function<void(int, const IndexT*)> f) const {
151  const_cast<ThreadedIndex<IndexT>*>(this)->runOnIndex(
152  [f](int i, IndexT* idx){ f(i, idx); });
153 }
154 
155 template <typename IndexT>
157  runOnIndex([](int, IndexT* index){ index->reset(); });
158  this->ntotal = 0;
159  this->is_trained = false;
160 }
161 
162 template <typename IndexT>
163 void
165 }
166 
167 template <typename IndexT>
168 void
170 }
171 
172 template <typename IndexT>
173 void
174 ThreadedIndex<IndexT>::waitAndHandleFutures(std::vector<std::future<bool>>& v) {
175  // Blocking wait for completion for all of the indices, capturing any
176  // exceptions that are generated
177  std::vector<std::pair<int, std::exception_ptr>> exceptions;
178 
179  for (int i = 0; i < v.size(); ++i) {
180  auto& fut = v[i];
181 
182  try {
183  fut.get();
184  } catch (...) {
185  exceptions.emplace_back(std::make_pair(i, std::current_exception()));
186  }
187  }
188 
189  handleExceptions(exceptions);
190 }
191 
192 } // namespace
void removeIndex(IndexT *index)
void handleExceptions(std::vector< std::pair< int, std::exception_ptr >> &exceptions)
void addIndex(IndexT *index)
void runOnIndex(std::function< void(int, IndexT *)> f)
virtual void onAfterRemoveIndex(IndexT *index)
Called just after an index is removed.
virtual void onAfterAddIndex(IndexT *index)
Called just after an index is added.
size_t d
size of the input vectors