Faiss
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
/data/users/hoss/faiss/WorkerThread.cpp
1 /**
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  */
7 
8 
9 #include "WorkerThread.h"
10 #include "FaissAssert.h"
11 #include <exception>
12 
13 namespace faiss {
14 
15 namespace {
16 
17 // Captures any exceptions thrown by the lambda and returns them via the promise
18 void runCallback(std::function<void()>& fn,
19  std::promise<bool>& promise) {
20  try {
21  fn();
22  promise.set_value(true);
23  } catch (...) {
24  promise.set_exception(std::current_exception());
25  }
26 }
27 
28 } // namespace
29 
30 WorkerThread::WorkerThread() :
31  wantStop_(false) {
32  startThread();
33 
34  // Make sure that the thread has started before continuing
35  add([](){}).get();
36 }
37 
39  stop();
41 }
42 
43 void
44 WorkerThread::startThread() {
45  thread_ = std::thread([this](){ threadMain(); });
46 }
47 
48 void
50  std::lock_guard<std::mutex> guard(mutex_);
51 
52  wantStop_ = true;
53  monitor_.notify_one();
54 }
55 
56 std::future<bool>
57 WorkerThread::add(std::function<void()> f) {
58  std::lock_guard<std::mutex> guard(mutex_);
59 
60  if (wantStop_) {
61  // The timer thread has been stopped, or we want to stop; we can't
62  // schedule anything else
63  std::promise<bool> p;
64  auto fut = p.get_future();
65 
66  // did not execute
67  p.set_value(false);
68  return fut;
69  }
70 
71  auto pr = std::promise<bool>();
72  auto fut = pr.get_future();
73 
74  queue_.emplace_back(std::make_pair(std::move(f), std::move(pr)));
75 
76  // Wake up our thread
77  monitor_.notify_one();
78  return fut;
79 }
80 
81 void
82 WorkerThread::threadMain() {
83  threadLoop();
84 
85  // Call all pending tasks
86  FAISS_ASSERT(wantStop_);
87 
88  // flush all pending operations
89  for (auto& f : queue_) {
90  runCallback(f.first, f.second);
91  }
92 }
93 
94 void
95 WorkerThread::threadLoop() {
96  while (true) {
97  std::pair<std::function<void()>, std::promise<bool>> data;
98 
99  {
100  std::unique_lock<std::mutex> lock(mutex_);
101 
102  while (!wantStop_ && queue_.empty()) {
103  monitor_.wait(lock);
104  }
105 
106  if (wantStop_) {
107  return;
108  }
109 
110  data = std::move(queue_.front());
111  queue_.pop_front();
112  }
113 
114  runCallback(data.first, data.second);
115  }
116 }
117 
118 void
120  try {
121  thread_.join();
122  } catch (...) {
123  }
124 }
125 
126 } // namespace
void stop()
Request that the worker thread stop itself.
void add(idx_t n, const float *x) override
std::future< bool > add(std::function< void()> f)