Faiss
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
WorkerThread.cpp
1 /**
2  * Copyright (c) 2015-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD+Patents license found in the
6  * LICENSE file in the root directory of this source tree.
7  */
8 
9 // Copyright 2004-present Facebook. All Rights Reserved.
10 
11 #include "WorkerThread.h"
12 #include "../../FaissAssert.h"
13 
14 namespace faiss { namespace gpu {
15 
16 WorkerThread::WorkerThread() :
17  wantStop_(false) {
18  startThread();
19 
20  // Make sure that the thread has started before continuing
21  add([](){}).get();
22 }
23 
25  stop();
27 }
28 
29 void
30 WorkerThread::startThread() {
31  thread_ = std::thread([this](){ threadMain(); });
32 }
33 
34 void
36  std::lock_guard<std::mutex> guard(mutex_);
37 
38  wantStop_ = true;
39  monitor_.notify_one();
40 }
41 
42 std::future<bool>
43 WorkerThread::add(std::function<void()> f) {
44  std::lock_guard<std::mutex> guard(mutex_);
45 
46  if (wantStop_) {
47  // The timer thread has been stopped, or we want to stop; we can't
48  // schedule anything else
49  std::promise<bool> p;
50  auto fut = p.get_future();
51 
52  // did not execute
53  p.set_value(false);
54  return fut;
55  }
56 
57  auto pr = std::promise<bool>();
58  auto fut = pr.get_future();
59 
60  queue_.emplace_back(std::make_pair(std::move(f), std::move(pr)));
61 
62  // Wake up our thread
63  monitor_.notify_one();
64  return fut;
65 }
66 
67 void
68 WorkerThread::threadMain() {
69  threadLoop();
70 
71  // Call all pending tasks
72  FAISS_ASSERT(wantStop_);
73 
74  for (auto& f : queue_) {
75  f.first();
76  f.second.set_value(true);
77  }
78 }
79 
80 void
81 WorkerThread::threadLoop() {
82  while (true) {
83  std::pair<std::function<void()>, std::promise<bool>> data;
84 
85  {
86  std::unique_lock<std::mutex> lock(mutex_);
87 
88  while (!wantStop_ && queue_.empty()) {
89  monitor_.wait(lock);
90  }
91 
92  if (wantStop_) {
93  return;
94  }
95 
96  data = std::move(queue_.front());
97  queue_.pop_front();
98  }
99 
100  data.first();
101  data.second.set_value(true);
102  }
103 }
104 
105 void
107  try {
108  thread_.join();
109  } catch (...) {
110  }
111 }
112 
113 } } // namespace
void stop()
Request that the worker thread stop itself.
std::future< bool > add(std::function< void()> f)