10 #include "OnDiskInvertedLists.h"
14 #include <unordered_set>
18 #include <sys/types.h>
20 #include "FaissAssert.h"
39 pthread_mutex_t mutex1;
40 pthread_cond_t level1_cv;
41 pthread_cond_t level2_cv;
42 pthread_cond_t level3_cv;
44 std::unordered_set<int> level1_holders;
50 pthread_mutex_init(&mutex1,
nullptr);
51 pthread_cond_init(&level1_cv,
nullptr);
52 pthread_cond_init(&level2_cv,
nullptr);
53 pthread_cond_init(&level3_cv,
nullptr);
55 level2_in_use =
false;
56 level3_in_use =
false;
60 pthread_cond_destroy(&level1_cv);
61 pthread_cond_destroy(&level2_cv);
62 pthread_cond_destroy(&level3_cv);
63 pthread_mutex_destroy(&mutex1);
67 pthread_mutex_lock(&mutex1);
68 while (level3_in_use || level1_holders.count(no) > 0) {
69 pthread_cond_wait(&level1_cv, &mutex1);
71 level1_holders.insert(no);
72 pthread_mutex_unlock(&mutex1);
75 void unlock_1(
int no) {
76 pthread_mutex_lock(&mutex1);
77 assert(level1_holders.count(no) == 1);
78 level1_holders.erase(no);
80 pthread_cond_signal(&level3_cv);
82 pthread_cond_broadcast(&level1_cv);
84 pthread_mutex_unlock(&mutex1);
88 pthread_mutex_lock(&mutex1);
91 pthread_cond_signal(&level3_cv);
93 while (level2_in_use) {
94 pthread_cond_wait(&level2_cv, &mutex1);
97 pthread_mutex_unlock(&mutex1);
101 pthread_mutex_lock(&mutex1);
102 level2_in_use =
false;
104 pthread_cond_signal(&level2_cv);
105 pthread_mutex_unlock(&mutex1);
109 pthread_mutex_lock(&mutex1);
110 level3_in_use =
true;
113 while (level1_holders.size() > n_level2) {
114 pthread_cond_wait(&level3_cv, &mutex1);
120 level3_in_use =
false;
122 pthread_cond_broadcast(&level1_cv);
123 pthread_mutex_unlock(&mutex1);
127 pthread_mutex_lock(&mutex1);
128 printf(
"State: level3_in_use=%d n_level2=%d level1_holders: [", level3_in_use, n_level2);
129 for (
int k : level1_holders) {
133 pthread_mutex_unlock(&mutex1);
150 std::vector<Thread> threads;
152 pthread_mutex_t mutex;
155 static int global_cs;
161 pthread_mutex_init (&mutex,
nullptr);
164 static void* prefetch_list (
void * arg) {
165 Thread *th =
static_cast<Thread*
>(arg);
167 size_t n = th->od->list_size(th->list_no);
169 const uint8_t *codes = th->od->get_codes(th->list_no);
171 for (
size_t i = 0; i < n;i++) {
174 const long *codes8 = (
const long*)codes;
175 long n8 = n * th->od->code_size / 8;
177 for (
size_t i = 0; i < n8;i++) {
184 void prefetch_lists (
const long *list_nos,
int n) {
185 pthread_mutex_lock (&mutex);
186 for (
auto &th: threads) {
187 if (th.list_no != -1) {
188 pthread_join (th.pth,
nullptr);
192 for (
int i = 0; i < n; i++) {
193 long list_no = list_nos[i];
194 Thread & th = threads[i];
195 if (list_no >= 0 && od->
list_size(list_no) > 0) {
196 th.list_no = list_no;
198 pthread_create (&th.pth,
nullptr, prefetch_list, &th);
203 pthread_mutex_unlock (&mutex);
206 ~OngoingPrefetch () {
207 pthread_mutex_lock (&mutex);
208 for (
auto &th: threads) {
209 if (th.list_no != -1) {
210 pthread_join (th.pth,
nullptr);
213 pthread_mutex_unlock (&mutex);
214 pthread_mutex_destroy (&mutex);
219 int OnDiskInvertedLists::OngoingPrefetch::global_cs = 0;
224 pf->prefetch_lists (list_nos, n);
234 void OnDiskInvertedLists::do_mmap ()
236 const char *rw_flags = read_only ?
"r" :
"rw+";
237 int prot = read_only ? PROT_READ : PROT_WRITE | PROT_READ;
238 FILE *f = fopen (filename.c_str(), rw_flags);
239 FAISS_THROW_IF_NOT_FMT (f,
"could not open %s in mode %s: %s",
240 filename.c_str(), rw_flags, strerror(errno));
242 ptr = (uint8_t*)mmap (
nullptr, totsize,
243 prot, MAP_SHARED, fileno (f), 0);
245 FAISS_THROW_IF_NOT_FMT (ptr != MAP_FAILED,
246 "could not mmap %s: %s",
253 void OnDiskInvertedLists::update_totsize (
size_t new_size)
257 if (ptr !=
nullptr) {
258 int err = munmap (ptr, totsize);
259 FAISS_THROW_IF_NOT_FMT (err == 0,
"mumap error: %s",
264 FILE *f = fopen (filename.c_str(),
"w");
265 FAISS_THROW_IF_NOT_FMT (f,
"could not open %s in mode W: %s",
266 filename.c_str(), strerror(errno));
270 if (new_size > totsize) {
271 if (!slots.empty() &&
272 slots.back().offset + slots.back().capacity == totsize) {
273 slots.back().capacity += new_size - totsize;
275 slots.push_back (Slot(totsize, new_size - totsize));
278 assert(!
"not implemented");
284 printf (
"resizing %s to %ld bytes\n", filename.c_str(), totsize);
286 int err = truncate (filename.c_str(), totsize);
288 FAISS_THROW_IF_NOT_FMT (err == 0,
"truncate %s to %ld: %s",
289 filename.c_str(), totsize,
303 #define INVALID_OFFSET (size_t)(-1)
305 OnDiskInvertedLists::List::List ():
306 size (0), capacity (0), offset (INVALID_OFFSET)
309 OnDiskInvertedLists::Slot::Slot (
size_t offset,
size_t capacity):
310 offset (offset), capacity (capacity)
313 OnDiskInvertedLists::Slot::Slot ():
314 offset (0), capacity (0)
319 OnDiskInvertedLists::OnDiskInvertedLists (
320 size_t nlist,
size_t code_size,
321 const char *filename):
330 lists.resize (nlist);
335 OnDiskInvertedLists::OnDiskInvertedLists ():
339 pf (new OngoingPrefetch (this))
343 OnDiskInvertedLists::~OnDiskInvertedLists ()
348 if (ptr !=
nullptr) {
349 int err = munmap (ptr, totsize);
350 FAISS_THROW_IF_NOT_FMT (err == 0,
362 return lists[list_no].size;
368 return ptr + lists[list_no].offset;
373 return (
const idx_t*)(ptr + lists[list_no].offset +
378 void OnDiskInvertedLists::update_entries (
379 size_t list_no,
size_t offset,
size_t n_entry,
380 const idx_t *ids_in,
const uint8_t *codes_in)
382 FAISS_THROW_IF_NOT (!read_only);
383 if (n_entry == 0)
return;
384 const List & l = lists[list_no];
385 assert (n_entry + offset <= l.size);
386 idx_t *ids =
const_cast<idx_t*
>(
get_ids (list_no));
387 memcpy (ids + offset, ids_in,
sizeof(ids_in[0]) * n_entry);
388 uint8_t *codes =
const_cast<uint8_t*
>(
get_codes (list_no));
389 memcpy (codes + offset *
code_size, codes_in, code_size * n_entry);
392 size_t OnDiskInvertedLists::add_entries (
393 size_t list_no,
size_t n_entry,
394 const idx_t* ids,
const uint8_t *code)
396 FAISS_THROW_IF_NOT (!read_only);
397 locks->lock_1 (list_no);
399 resize_locked (list_no, n_entry + o);
400 update_entries (list_no, o, n_entry, ids, code);
401 locks->unlock_1 (list_no);
405 void OnDiskInvertedLists::resize (
size_t list_no,
size_t new_size)
407 FAISS_THROW_IF_NOT (!read_only);
408 locks->lock_1 (list_no);
409 resize_locked (list_no, new_size);
410 locks->unlock_1 (list_no);
415 void OnDiskInvertedLists::resize_locked (
size_t list_no,
size_t new_size)
417 List & l = lists[list_no];
419 if (new_size <= l.capacity &&
420 new_size > l.capacity / 2) {
428 free_slot (l.offset, l.capacity);
435 new_l.size = new_size;
437 while (new_l.capacity < new_size) {
440 new_l.offset = allocate_slot (
441 new_l.capacity * (
sizeof(idx_t) +
code_size));
445 if (l.offset != new_l.offset) {
446 size_t n = std::min (new_size, l.size);
449 memcpy (ptr + new_l.offset + new_l.capacity *
code_size,
450 get_ids (list_no), n *
sizeof(idx_t));
454 lists[list_no] = new_l;
458 size_t OnDiskInvertedLists::allocate_slot (
size_t capacity) {
461 auto it = slots.begin();
462 while (it != slots.end() && it->capacity < capacity) {
466 if (it == slots.end()) {
468 size_t new_size = totsize == 0 ? 32 : totsize * 2;
469 while (new_size - totsize < capacity)
472 update_totsize(new_size);
475 while (it != slots.end() && it->capacity < capacity) {
478 assert (it != slots.end());
481 size_t o = it->offset;
482 if (it->capacity == capacity) {
486 it->capacity -= capacity;
487 it->offset += capacity;
495 void OnDiskInvertedLists::free_slot (
size_t offset,
size_t capacity) {
498 if (capacity == 0)
return;
500 auto it = slots.begin();
501 while (it != slots.end() && it->offset <= offset) {
505 size_t inf = 1UL << 60;
507 size_t end_prev = inf;
508 if (it != slots.begin()) {
511 end_prev = prev->offset + prev->capacity;
514 size_t begin_next = 1L << 60;
515 if (it != slots.end()) {
516 begin_next = it->offset;
519 assert (end_prev == inf || offset >= end_prev);
520 assert (offset + capacity <= begin_next);
522 if (offset == end_prev) {
525 if (offset + capacity == begin_next) {
526 prev->capacity += capacity + it->capacity;
529 prev->capacity += capacity;
532 if (offset + capacity == begin_next) {
533 it->offset -= capacity;
534 it->capacity += capacity;
536 slots.insert (it, Slot (offset, capacity));
548 size_t OnDiskInvertedLists::merge_from (
const InvertedLists **ils,
int n_il)
550 FAISS_THROW_IF_NOT_MSG (totsize == 0,
"works only on an empty InvertedLists");
552 std::vector<size_t> sizes (
nlist);
553 for (
int i = 0; i < n_il; i++) {
554 const InvertedLists *il = ils[i];
555 FAISS_THROW_IF_NOT (il->nlist ==
nlist && il->code_size ==
code_size);
557 for (
size_t j = 0; j <
nlist; j++) {
558 sizes [j] += il->list_size(j);
564 for (
size_t j = 0; j <
nlist; j++) {
567 lists[j].capacity = sizes[j];
568 lists[j].offset = cums;
569 cums += lists[j].capacity * (
sizeof(idx_t) +
code_size);
572 update_totsize (cums);
574 #pragma omp parallel for
575 for (
size_t j = 0; j <
nlist; j++) {
577 for (
int i = 0; i < n_il; i++) {
578 const InvertedLists *il = ils[i];
579 size_t n_entry = il->list_size(j);
581 update_entries (j, l.size - n_entry, n_entry,
585 assert (l.size == l.capacity);
const idx_t * get_ids(size_t list_no) const override
size_t code_size
code size per vector in bytes
long idx_t
all indices are this type
const uint8_t * get_codes(size_t list_no) const override
size_t list_size(size_t list_no) const override
get the size of a list
size_t nlist
number of possible key values
void prefetch_lists(const long *list_nos, int nlist) const override