OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
BufferMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
22 
23 #include <algorithm>
24 #include <iomanip>
25 #include <limits>
26 
29 #include "Logger/Logger.h"
30 #include "Shared/measure.h"
31 
32 using namespace std;
33 
34 namespace Buffer_Namespace {
35 
36 std::string BufferMgr::keyToString(const ChunkKey& key) {
37  std::ostringstream oss;
38 
39  oss << " key: ";
40  for (auto sub_key : key) {
41  oss << sub_key << ",";
42  }
43  return oss.str();
44 }
45 
47 BufferMgr::BufferMgr(const int device_id,
48  const size_t max_buffer_pool_size,
49  const size_t min_slab_size,
50  const size_t max_slab_size,
51  const size_t page_size,
52  AbstractBufferMgr* parent_mgr)
53  : AbstractBufferMgr(device_id)
54  , max_buffer_pool_size_(max_buffer_pool_size)
55  , min_slab_size_(min_slab_size)
56  , max_slab_size_(max_slab_size)
57  , page_size_(page_size)
58  , num_pages_allocated_(0)
59  , allocations_capped_(false)
60  , parent_mgr_(parent_mgr)
61  , max_buffer_id_(0)
62  , buffer_epoch_(0) {
64  CHECK(page_size_ > 0);
65  // TODO change checks on run-time configurable slab size variables to exceptions
66  CHECK(min_slab_size_ > 0);
67  CHECK(max_slab_size_ > 0);
71 
76  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
77  // fail - this is the high water mark
78 }
79 
82  clear();
83 }
84 
88  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
89  // fail - this is the high water mark
90  allocations_capped_ = false;
91 }
92 
94  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
95  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
96  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
97 
98  for (auto& buf : chunk_index_) {
99  delete buf.second->buffer;
100  }
101 
102  chunk_index_.clear();
103  slabs_.clear();
104  slab_segments_.clear();
105  unsized_segs_.clear();
106  buffer_epoch_ = 0;
107 }
108 
111  const size_t chunk_page_size,
112  const size_t initial_size) {
113  // LOG(INFO) << printMap();
114  size_t actual_chunk_page_size = chunk_page_size;
115  if (actual_chunk_page_size == 0) {
116  actual_chunk_page_size = page_size_;
117  }
118 
119  // chunk_page_size is just for recording dirty pages
120  {
121  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
122  CHECK(chunk_index_.find(chunk_key) == chunk_index_.end());
123  BufferSeg buffer_seg(BufferSeg(-1, 0, USED));
124  buffer_seg.chunk_key = chunk_key;
125  std::lock_guard<std::mutex> unsizedSegsLock(unsized_segs_mutex_);
126  unsized_segs_.push_back(buffer_seg); // race condition?
127  chunk_index_[chunk_key] =
128  std::prev(unsized_segs_.end(),
129  1); // need to do this before allocating Buffer because doing so could
130  // change the segment used
131  }
132  // following should be safe outside the lock b/c first thing Buffer
133  // constructor does is pin (and its still in unsized segs at this point
134  // so can't be evicted)
135  try {
136  allocateBuffer(chunk_index_[chunk_key], actual_chunk_page_size, initial_size);
137  } catch (const OutOfMemory&) {
138  auto buffer_it = chunk_index_.find(chunk_key);
139  CHECK(buffer_it != chunk_index_.end());
140  buffer_it->second->buffer =
141  nullptr; // constructor failed for the buffer object so make sure to mark it null
142  // so deleteBuffer doesn't try to delete it
143  deleteBuffer(chunk_key);
144  throw;
145  }
146  CHECK(initial_size == 0 || chunk_index_[chunk_key]->buffer->getMemoryPtr());
147  // chunk_index_[chunk_key]->buffer->pin();
148  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
149  return chunk_index_[chunk_key]->buffer;
150 }
151 
152 BufferList::iterator BufferMgr::evict(BufferList::iterator& evict_start,
153  const size_t num_pages_requested,
154  const int slab_num) {
155  // We can assume here that buffer for evictStart either doesn't exist
156  // (evictStart is first buffer) or was not free, so don't need ot merge
157  // it
158  auto evict_it = evict_start;
159  size_t num_pages = 0;
160  size_t start_page = evict_start->start_page;
161  while (num_pages < num_pages_requested) {
162  if (evict_it->mem_status == USED) {
163  CHECK(evict_it->buffer->getPinCount() < 1);
164  }
165  num_pages += evict_it->num_pages;
166  if (evict_it->mem_status == USED && evict_it->chunk_key.size() > 0) {
167  chunk_index_.erase(evict_it->chunk_key);
168  }
169  if (evict_it->buffer != nullptr) {
170  // If we don't delete buffers here then we lose reference to them later and cause a
171  // memleak.
172  delete evict_it->buffer;
173  }
174  evict_it = slab_segments_[slab_num].erase(
175  evict_it); // erase operations returns next iterator - safe if we ever move
176  // to a vector (as opposed to erase(evict_it++)
177  }
178  BufferSeg data_seg(
179  start_page, num_pages_requested, USED, buffer_epoch_++); // until we can
180  // data_seg.pinCount++;
181  data_seg.slab_num = slab_num;
182  auto data_seg_it =
183  slab_segments_[slab_num].insert(evict_it, data_seg); // Will insert before evict_it
184  if (num_pages_requested < num_pages) {
185  size_t excess_pages = num_pages - num_pages_requested;
186  if (evict_it != slab_segments_[slab_num].end() &&
187  evict_it->mem_status == FREE) { // need to merge with current page
188  evict_it->start_page = start_page + num_pages_requested;
189  evict_it->num_pages += excess_pages;
190  } else { // need to insert a free seg before evict_it for excess_pages
191  BufferSeg free_seg(start_page + num_pages_requested, excess_pages, FREE);
192  slab_segments_[slab_num].insert(evict_it, free_seg);
193  }
194  }
195  return data_seg_it;
196 }
197 
198 BufferList::iterator BufferMgr::reserveBuffer(
199  BufferList::iterator& seg_it,
200  const size_t num_bytes) { // assumes buffer is already pinned
201 
202  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
203  size_t num_pages_extra_needed = num_pages_requested - seg_it->num_pages;
204 
205  if (num_pages_requested < seg_it->num_pages) {
206  // We already have enough pages in existing segment
207  return seg_it;
208  }
209  // First check for free segment after seg_it
210  int slab_num = seg_it->slab_num;
211  if (slab_num >= 0) { // not dummy page
212  BufferList::iterator next_it = std::next(seg_it);
213  if (next_it != slab_segments_[slab_num].end() && next_it->mem_status == FREE &&
214  next_it->num_pages >= num_pages_extra_needed) {
215  // Then we can just use the next BufferSeg which happens to be free
216  size_t leftover_pages = next_it->num_pages - num_pages_extra_needed;
217  seg_it->num_pages = num_pages_requested;
218  next_it->num_pages = leftover_pages;
219  next_it->start_page = seg_it->start_page + seg_it->num_pages;
220  return seg_it;
221  }
222  }
223  // If we're here then we couldn't keep buffer in existing slot
224  // need to find new segment, copy data over, and then delete old
225  auto new_seg_it = findFreeBuffer(num_bytes);
226 
227  // Below should be in copy constructor for BufferSeg?
228  new_seg_it->buffer = seg_it->buffer;
229  new_seg_it->chunk_key = seg_it->chunk_key;
230  int8_t* old_mem = new_seg_it->buffer->mem_;
231  new_seg_it->buffer->mem_ =
232  slabs_[new_seg_it->slab_num] + new_seg_it->start_page * page_size_;
233 
234  // now need to copy over memory
235  // only do this if the old segment is valid (i.e. not new w/ unallocated buffer
236  if (seg_it->start_page >= 0 && seg_it->buffer->mem_ != 0) {
237  new_seg_it->buffer->writeData(old_mem,
238  new_seg_it->buffer->size(),
239  0,
240  new_seg_it->buffer->getType(),
241  device_id_);
242  }
243  // Decrement pin count to reverse effect above
244  removeSegment(seg_it);
245  {
246  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
247  chunk_index_[new_seg_it->chunk_key] = new_seg_it;
248  }
249 
250  return new_seg_it;
251 }
252 
253 BufferList::iterator BufferMgr::findFreeBufferInSlab(const size_t slab_num,
254  const size_t num_pages_requested) {
255  for (auto buffer_it = slab_segments_[slab_num].begin();
256  buffer_it != slab_segments_[slab_num].end();
257  ++buffer_it) {
258  if (buffer_it->mem_status == FREE && buffer_it->num_pages >= num_pages_requested) {
259  // startPage doesn't change
260  size_t excess_pages = buffer_it->num_pages - num_pages_requested;
261  buffer_it->num_pages = num_pages_requested;
262  buffer_it->mem_status = USED;
263  buffer_it->last_touched = buffer_epoch_++;
264  buffer_it->slab_num = slab_num;
265  if (excess_pages > 0) {
266  BufferSeg free_seg(
267  buffer_it->start_page + num_pages_requested, excess_pages, FREE);
268  auto temp_it = buffer_it; // this should make a copy and not be a reference
269  // - as we do not want to increment buffer_it
270  temp_it++;
271  slab_segments_[slab_num].insert(temp_it, free_seg);
272  }
273  return buffer_it;
274  }
275  }
276  // If here then we did not find a free buffer of sufficient size in this slab,
277  // return the end iterator
278  return slab_segments_[slab_num].end();
279 }
280 
281 BufferList::iterator BufferMgr::findFreeBuffer(size_t num_bytes) {
282  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
283  if (num_pages_requested > max_num_pages_per_slab_) {
284  throw TooBigForSlab(num_bytes);
285  }
286 
287  size_t num_slabs = slab_segments_.size();
288 
289  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
290  auto seg_it = findFreeBufferInSlab(slab_num, num_pages_requested);
291  if (seg_it != slab_segments_[slab_num].end()) {
292  return seg_it;
293  }
294  }
295 
296  // If we're here then we didn't find a free segment of sufficient size
297  // First we see if we can add another slab
299  try {
301  if (pagesLeft < current_max_slab_page_size_) {
302  current_max_slab_page_size_ = pagesLeft;
303  }
304  if (num_pages_requested <=
305  current_max_slab_page_size_) { // don't try to allocate if the
306  // new slab won't be big enough
307  auto alloc_ms = measure<>::execution(
309  LOG(INFO) << "ALLOCATION slab of " << current_max_slab_page_size_ << " pages ("
310  << current_max_slab_page_size_ * page_size_ << "B) created in "
311  << alloc_ms << " ms " << getStringMgrType() << ":" << device_id_;
312  } else {
313  break;
314  }
315  // if here then addSlab succeeded
316  num_pages_allocated_ += current_max_slab_page_size_;
317  return findFreeBufferInSlab(
318  num_slabs,
319  num_pages_requested); // has to succeed since we made sure to request a slab
320  // big enough to accomodate request
321  } catch (std::runtime_error& error) { // failed to allocate slab
322  LOG(INFO) << "ALLOCATION Attempted slab of " << current_max_slab_page_size_
323  << " pages (" << current_max_slab_page_size_ * page_size_ << "B) failed "
324  << getStringMgrType() << ":" << device_id_;
325  // check if there is any point halving currentMaxSlabSize and trying again
326  // if the request wont fit in half available then let try once at full size
327  // if we have already tries at full size and failed then break as
328  // there could still be room enough for other later request but
329  // not for his current one
330  if (num_pages_requested > current_max_slab_page_size_ / 2 &&
331  current_max_slab_page_size_ != num_pages_requested) {
332  current_max_slab_page_size_ = num_pages_requested;
333  } else {
336  (min_num_pages_per_slab_)) { // should be a constant
337  allocations_capped_ = true;
338  // dump out the slabs and their sizes
339  LOG(INFO) << "ALLOCATION Capped " << current_max_slab_page_size_
340  << " Minimum size = " << (min_num_pages_per_slab_) << " "
341  << getStringMgrType() << ":" << device_id_;
342  }
343  }
344  }
345  }
346 
348  throw FailedToCreateFirstSlab(num_bytes);
349  }
350 
351  // If here then we can't add a slab - so we need to evict
352 
353  size_t min_score = std::numeric_limits<size_t>::max();
354  // We're going for lowest score here, like golf
355  // This is because score is the sum of the lastTouched score for all pages evicted.
356  // Evicting fewer pages and older pages will lower the score
357  BufferList::iterator best_eviction_start = slab_segments_[0].end();
358  int best_eviction_start_slab = -1;
359  int slab_num = 0;
360 
361  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
362  ++slab_it, ++slab_num) {
363  for (auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) {
364  // Note there are some shortcuts we could take here - like we should never consider
365  // a USED buffer coming after a free buffer as we would have used the FREE buffer,
366  // but we won't worry about this for now
367 
368  // We can't evict pinned buffers - only normal usedbuffers
369 
370  // if (buffer_it->mem_status == FREE || buffer_it->buffer->getPinCount() == 0) {
371  size_t page_count = 0;
372  size_t score = 0;
373  bool solution_found = false;
374  auto evict_it = buffer_it;
375  for (; evict_it != slab_segments_[slab_num].end(); ++evict_it) {
376  // pinCount should never go up - only down because we have
377  // global lock on buffer pool and pin count only increments
378  // on getChunk
379  if (evict_it->mem_status == USED && evict_it->buffer->getPinCount() > 0) {
380  break;
381  }
382  page_count += evict_it->num_pages;
383  if (evict_it->mem_status == USED) {
384  // MAT changed from
385  // score += evictIt->lastTouched;
386  // Issue was thrashing when going from 8M fragment size chunks back to 64M
387  // basically the large chunks were being evicted prior to small as many small
388  // chunk score was larger than one large chunk so it always would evict a large
389  // chunk so under memory pressure a query would evict its own current chunks and
390  // cause reloads rather than evict several smaller unused older chunks.
391  score = std::max(score, static_cast<size_t>(evict_it->last_touched));
392  }
393  if (page_count >= num_pages_requested) {
394  solution_found = true;
395  break;
396  }
397  }
398  if (solution_found && score < min_score) {
399  min_score = score;
400  best_eviction_start = buffer_it;
401  best_eviction_start_slab = slab_num;
402  } else if (evict_it == slab_segments_[slab_num].end()) {
403  // this means that every segment after this will fail as well, so our search has
404  // proven futile
405  // throw std::runtime_error ("Couldn't evict chunks to get free space");
406  break;
407  // in reality we should try to rearrange the buffer to get more contiguous free
408  // space
409  }
410  // other possibility is ending at PINNED - do nothing in this case
411  //}
412  }
413  }
414  if (best_eviction_start == slab_segments_[0].end()) {
415  LOG(ERROR) << "ALLOCATION failed to find " << num_bytes << "B throwing out of memory "
416  << getStringMgrType() << ":" << device_id_;
417  VLOG(2) << printSlabs();
418  throw OutOfMemory(num_bytes);
419  }
420  LOG(INFO) << "ALLOCATION failed to find " << num_bytes << "B free. Forcing Eviction."
421  << " Eviction start " << best_eviction_start->start_page
422  << " Number pages requested " << num_pages_requested
423  << " Best Eviction Start Slab " << best_eviction_start_slab << " "
424  << getStringMgrType() << ":" << device_id_;
425  best_eviction_start =
426  evict(best_eviction_start, num_pages_requested, best_eviction_start_slab);
427  return best_eviction_start;
428 }
429 
430 std::string BufferMgr::printSlab(size_t slab_num) {
431  std::ostringstream tss;
432  // size_t lastEnd = 0;
433  tss << "Slab St.Page Pages Touch" << std::endl;
434  for (auto segment : slab_segments_[slab_num]) {
435  tss << setfill(' ') << setw(4) << slab_num;
436  // tss << " BSN: " << setfill(' ') << setw(2) << segment.slab_num;
437  tss << setfill(' ') << setw(8) << segment.start_page;
438  tss << setfill(' ') << setw(8) << segment.num_pages;
439  // tss << " GAP: " << setfill(' ') << setw(7) << segment.start_page - lastEnd;
440  // lastEnd = segment.start_page + segment.num_pages;
441  tss << setfill(' ') << setw(7) << segment.last_touched;
442  // tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
443  if (segment.mem_status == FREE) {
444  tss << " FREE"
445  << " ";
446  } else {
447  tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
448  tss << " USED - Chunk: ";
449 
450  for (auto&& key_elem : segment.chunk_key) {
451  tss << key_elem << ",";
452  }
453  }
454  tss << std::endl;
455  }
456  return tss.str();
457 }
458 
459 std::string BufferMgr::printSlabs() {
460  std::ostringstream tss;
461  tss << std::endl
462  << "Slabs Contents: "
463  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
464  size_t num_slabs = slab_segments_.size();
465  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
466  tss << printSlab(slab_num);
467  }
468  tss << "--------------------" << std::endl;
469  return tss.str();
470 }
471 
473  std::lock_guard<std::mutex> lock(global_mutex_);
474  bool pinned_exists = false;
475  for (auto& segment_list : slab_segments_) {
476  for (auto& segment : segment_list) {
477  if (segment.mem_status == FREE) {
478  // no need to free
479  } else if (segment.buffer->getPinCount() < 1) {
480  deleteBuffer(segment.chunk_key, true);
481  } else {
482  pinned_exists = true;
483  }
484  }
485  }
486  if (!pinned_exists) {
487  // lets actually clear the buffer from memory
488  LOG(INFO) << getStringMgrType() << ":" << device_id_ << " clear slab memory";
489  freeAllMem();
490  clear();
491  reinit();
492  } else {
493  LOG(INFO) << getStringMgrType() << ":" << device_id_ << " keep slab memory (pinned).";
494  }
495 }
496 
497 // return the maximum size this buffer can be in bytes
500 }
501 
502 // return how large the buffer are currently allocated
505 }
506 
507 //
509  return allocations_capped_;
510 }
511 
513  return page_size_;
514 }
515 
516 // return the size of the chunks in use in bytes
518  size_t in_use = 0;
519  for (auto& segment_list : slab_segments_) {
520  for (auto& segment : segment_list) {
521  if (segment.mem_status != FREE) {
522  in_use += segment.num_pages * page_size_;
523  }
524  }
525  }
526  return in_use;
527 }
528 
529 std::string BufferMgr::printSeg(BufferList::iterator& seg_it) {
530  std::ostringstream tss;
531  tss << "SN: " << setfill(' ') << setw(2) << seg_it->slab_num;
532  tss << " SP: " << setfill(' ') << setw(7) << seg_it->start_page;
533  tss << " NP: " << setfill(' ') << setw(7) << seg_it->num_pages;
534  tss << " LT: " << setfill(' ') << setw(7) << seg_it->last_touched;
535  tss << " PC: " << setfill(' ') << setw(2) << seg_it->buffer->getPinCount();
536  if (seg_it->mem_status == FREE) {
537  tss << " FREE"
538  << " ";
539  } else {
540  tss << " USED - Chunk: ";
541  for (auto vec_it = seg_it->chunk_key.begin(); vec_it != seg_it->chunk_key.end();
542  ++vec_it) {
543  tss << *vec_it << ",";
544  }
545  tss << std::endl;
546  }
547  return tss.str();
548 }
549 
550 std::string BufferMgr::printMap() {
551  std::ostringstream tss;
552  int seg_num = 1;
553  tss << std::endl
554  << "Map Contents: "
555  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
556  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
557  for (auto seg_it = chunk_index_.begin(); seg_it != chunk_index_.end();
558  ++seg_it, ++seg_num) {
559  // tss << "Map Entry " << seg_num << ": ";
560  // for (auto vec_it = seg_it->first.begin(); vec_it != seg_it->first.end();
561  // ++vec_it)
562  // {
563  // tss << *vec_it << ",";
564  // }
565  // tss << " " << std::endl;
566  tss << printSeg(seg_it->second);
567  }
568  tss << "--------------------" << std::endl;
569  return tss.str();
570 }
571 
573  int seg_num = 1;
574  int slab_num = 1;
575  LOG(INFO) << std::endl << " " << getStringMgrType() << ":" << device_id_;
576  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
577  ++slab_it, ++slab_num) {
578  LOG(INFO) << "Slab Num: " << slab_num << " " << getStringMgrType() << ":"
579  << device_id_;
580  for (auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) {
581  LOG(INFO) << "Segment: " << seg_num << " " << getStringMgrType() << ":"
582  << device_id_;
583  printSeg(seg_it);
584  LOG(INFO) << " " << getStringMgrType() << ":" << device_id_;
585  }
586  LOG(INFO) << "--------------------"
587  << " " << getStringMgrType() << ":" << device_id_;
588  }
589 }
590 
592  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
593  if (chunk_index_.find(key) == chunk_index_.end()) {
594  return false;
595  } else {
596  return true;
597  }
598 }
599 
601 void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) {
602  // Note: purge is unused
603  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
604 
605  // lookup the buffer for the Chunk in chunk_index_
606  auto buffer_it = chunk_index_.find(key);
607  CHECK(buffer_it != chunk_index_.end());
608  auto seg_it = buffer_it->second;
609  chunk_index_.erase(buffer_it);
610  chunk_index_lock.unlock();
611  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
612  if (seg_it->buffer) {
613  delete seg_it->buffer; // Delete Buffer for segment
614  seg_it->buffer = 0;
615  }
616  removeSegment(seg_it);
617 }
618 
619 void BufferMgr::deleteBuffersWithPrefix(const ChunkKey& key_prefix, const bool) {
620  // Note: purge is unused
621  // lookup the buffer for the Chunk in chunk_index_
622  std::lock_guard<std::mutex> sized_segs_lock(
623  sized_segs_mutex_); // Take this lock early to prevent deadlock with
624  // reserveBuffer which needs segs_mutex_ and then
625  // chunk_index_mutex_
626  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
627  auto prefix_upper_bound = key_prefix;
628  prefix_upper_bound.emplace_back(std::numeric_limits<ChunkKey::value_type>::max());
629  for (auto buffer_it = chunk_index_.lower_bound(key_prefix),
630  end_chunk_it = chunk_index_.upper_bound(prefix_upper_bound);
631  buffer_it != end_chunk_it;) {
632  auto seg_it = buffer_it->second;
633  if (seg_it->buffer) {
634  if (seg_it->buffer->getPinCount() != 0) {
635  // leave the buffer and buffer segment in place, they are in use elsewhere. once
636  // unpinned, the buffer will be inaccessible and evicted
637  buffer_it++;
638  continue;
639  }
640  delete seg_it->buffer; // Delete Buffer for segment
641  seg_it->buffer = nullptr;
642  }
643  removeSegment(seg_it);
644  chunk_index_.erase(buffer_it++);
645  }
646 }
647 
648 void BufferMgr::removeSegment(BufferList::iterator& seg_it) {
649  // Note: does not delete buffer as this may be moved somewhere else
650  int slab_num = seg_it->slab_num;
651  // cout << "Slab num: " << slabNum << endl;
652  if (slab_num < 0) {
653  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
654  unsized_segs_.erase(seg_it);
655  } else {
656  if (seg_it != slab_segments_[slab_num].begin()) {
657  auto prev_it = std::prev(seg_it);
658  // LOG(INFO) << "PrevIt: " << " " << getStringMgrType() << ":" << device_id_;
659  // printSeg(prev_it);
660  if (prev_it->mem_status == FREE) {
661  seg_it->start_page = prev_it->start_page;
662  seg_it->num_pages += prev_it->num_pages;
663  slab_segments_[slab_num].erase(prev_it);
664  }
665  }
666  auto next_it = std::next(seg_it);
667  if (next_it != slab_segments_[slab_num].end()) {
668  if (next_it->mem_status == FREE) {
669  seg_it->num_pages += next_it->num_pages;
670  slab_segments_[slab_num].erase(next_it);
671  }
672  }
673  seg_it->mem_status = FREE;
674  // seg_it->pinCount = 0;
675  seg_it->buffer = 0;
676  }
677 }
678 
680  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
681  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
682 
683  for (auto& chunk_itr : chunk_index_) {
684  // checks that buffer is actual chunk (not just buffer) and is dirty
685  auto& buffer_itr = chunk_itr.second;
686  if (buffer_itr->chunk_key[0] != -1 && buffer_itr->buffer->isDirty()) {
687  parent_mgr_->putBuffer(buffer_itr->chunk_key, buffer_itr->buffer);
688  buffer_itr->buffer->clearDirtyBits();
689  }
690  }
691 }
692 
693 void BufferMgr::checkpoint(const int db_id, const int tb_id) {
694  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
695  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
696 
697  ChunkKey key_prefix;
698  key_prefix.push_back(db_id);
699  key_prefix.push_back(tb_id);
700  auto start_chunk_it = chunk_index_.lower_bound(key_prefix);
701  if (start_chunk_it == chunk_index_.end()) {
702  return;
703  }
704 
705  auto buffer_it = start_chunk_it;
706  while (buffer_it != chunk_index_.end() &&
707  std::search(buffer_it->first.begin(),
708  buffer_it->first.begin() + key_prefix.size(),
709  key_prefix.begin(),
710  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
711  if (buffer_it->second->chunk_key[0] != -1 &&
712  buffer_it->second->buffer->isDirty()) { // checks that buffer is actual chunk
713  // (not just buffer) and is dirty
714 
715  parent_mgr_->putBuffer(buffer_it->second->chunk_key, buffer_it->second->buffer);
716  buffer_it->second->buffer->clearDirtyBits();
717  }
718  buffer_it++;
719  }
720 }
721 
724 AbstractBuffer* BufferMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
725  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
726 
727  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
728  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
729  auto buffer_it = chunk_index_.find(key);
730  bool found_buffer = buffer_it != chunk_index_.end();
731  chunk_index_lock.unlock();
732  if (found_buffer) {
733  CHECK(buffer_it->second->buffer);
734  buffer_it->second->buffer->pin();
735  sized_segs_lock.unlock();
736 
737  buffer_it->second->last_touched = buffer_epoch_++; // race
738 
739  auto buffer_size = buffer_it->second->buffer->size();
740  if (buffer_size < num_bytes) {
741  // need to fetch part of buffer we don't have - up to numBytes
742  VLOG(1) << ToString(getMgrType())
743  << ": Fetching buffer from parent manager. Reason: increased buffer size. "
744  "Buffer size: "
745  << buffer_size << ", num bytes to fetch: " << num_bytes
746  << ", chunk key: " << keyToString(key);
747  parent_mgr_->fetchBuffer(key, buffer_it->second->buffer, num_bytes);
748  }
749  return buffer_it->second->buffer;
750  } else { // If wasn't in pool then we need to fetch it
751  sized_segs_lock.unlock();
752  // createChunk pins for us
753  AbstractBuffer* buffer = createBuffer(key, page_size_, num_bytes);
754  try {
755  VLOG(1) << ToString(getMgrType())
756  << ": Fetching buffer from parent manager. Reason: cache miss. Num bytes "
757  "to fetch: "
758  << num_bytes << ", chunk key: " << keyToString(key);
759  parent_mgr_->fetchBuffer(
760  key, buffer, num_bytes); // this should put buffer in a BufferSegment
761  } catch (const foreign_storage::ForeignStorageException& error) {
762  deleteBuffer(key); // buffer failed to load, ensure it is cleaned up
763  LOG(WARNING) << "Get chunk - Could not load chunk " << keyToString(key)
764  << " from foreign storage. Error was " << error.what();
765  throw;
766  } catch (const std::exception& error) {
767  LOG(FATAL) << "Get chunk - Could not find chunk " << keyToString(key)
768  << " in buffer pool or parent buffer pools. Error was " << error.what();
769  }
770  return buffer;
771  }
772 }
773 
775  AbstractBuffer* dest_buffer,
776  const size_t num_bytes) {
777  std::unique_lock<std::mutex> lock(global_mutex_); // granular lock
778  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
779  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
780 
781  auto buffer_it = chunk_index_.find(key);
782  bool found_buffer = buffer_it != chunk_index_.end();
783  chunk_index_lock.unlock();
784  AbstractBuffer* buffer;
785  if (!found_buffer) {
786  sized_segs_lock.unlock();
787  CHECK(parent_mgr_ != 0);
788  buffer = createBuffer(key, page_size_, num_bytes); // will pin buffer
789  try {
790  VLOG(1) << ToString(getMgrType())
791  << ": Fetching buffer from parent manager. Reason: cache miss. Num bytes "
792  "to fetch: "
793  << num_bytes << ", chunk key: " << keyToString(key);
794  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
795  } catch (const foreign_storage::ForeignStorageException& error) {
796  deleteBuffer(key); // buffer failed to load, ensure it is cleaned up
797  LOG(WARNING) << "Could not fetch parent chunk " << keyToString(key)
798  << " from foreign storage. Error was " << error.what();
799  throw;
800  } catch (std::runtime_error& error) {
801  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key)
802  << " error: " << error.what();
803  }
804  } else {
805  buffer = buffer_it->second->buffer;
806  buffer->pin();
807  auto buffer_size = buffer->size();
808  if (num_bytes > buffer_size) {
809  try {
810  VLOG(1) << ToString(getMgrType())
811  << ": Fetching buffer from parent manager. Reason: increased buffer "
812  "size. Buffer size: "
813  << buffer_size << ", num bytes to fetch: " << num_bytes
814  << ", chunk key: " << keyToString(key);
815  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
816  } catch (const foreign_storage::ForeignStorageException& error) {
817  LOG(WARNING) << "Could not fetch parent chunk " << keyToString(key)
818  << " from foreign storage. Error was " << error.what();
819  throw;
820  } catch (std::runtime_error& error) {
821  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key)
822  << " error: " << error.what();
823  }
824  }
825  sized_segs_lock.unlock();
826  }
827  lock.unlock();
828  buffer->copyTo(dest_buffer, num_bytes);
829  buffer->unPin();
830 }
831 
833  AbstractBuffer* src_buffer,
834  const size_t num_bytes) {
835  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
836  auto buffer_it = chunk_index_.find(key);
837  bool found_buffer = buffer_it != chunk_index_.end();
838  chunk_index_lock.unlock();
839  AbstractBuffer* buffer;
840  if (!found_buffer) {
841  buffer = createBuffer(key, page_size_);
842  } else {
843  buffer = buffer_it->second->buffer;
844  }
845  size_t old_buffer_size = buffer->size();
846  size_t new_buffer_size = num_bytes == 0 ? src_buffer->size() : num_bytes;
847  CHECK(!buffer->isDirty());
848 
849  if (src_buffer->isUpdated()) {
850  //@todo use dirty flags to only flush pages of chunk that need to
851  // be flushed
852  buffer->write((int8_t*)src_buffer->getMemoryPtr(),
853  new_buffer_size,
854  0,
855  src_buffer->getType(),
856  src_buffer->getDeviceId());
857  } else if (src_buffer->isAppended()) {
858  CHECK(old_buffer_size < new_buffer_size);
859  buffer->append((int8_t*)src_buffer->getMemoryPtr() + old_buffer_size,
860  new_buffer_size - old_buffer_size,
861  src_buffer->getType(),
862  src_buffer->getDeviceId());
863  } else {
864  UNREACHABLE();
865  }
866  src_buffer->clearDirtyBits();
867  buffer->syncEncoder(src_buffer);
868  return buffer;
869 }
870 
872  std::lock_guard<std::mutex> lock(buffer_id_mutex_);
873  return max_buffer_id_++;
874 }
875 
877 AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) {
878  std::lock_guard<std::mutex> lock(global_mutex_);
879  ChunkKey chunk_key = {-1, getBufferId()};
880  return createBuffer(chunk_key, page_size_, num_bytes);
881 }
882 
884  std::lock_guard<std::mutex> lock(global_mutex_); // hack for now
885  Buffer* casted_buffer = dynamic_cast<Buffer*>(buffer);
886  if (casted_buffer == 0) {
887  LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
888  }
889  deleteBuffer(casted_buffer->seg_it_->chunk_key);
890 }
891 
893  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
894  return chunk_index_.size();
895 }
896 
897 size_t BufferMgr::size() {
898  return num_pages_allocated_;
899 }
900 
902  return max_buffer_pool_size_;
903 }
904 
906  return max_slab_size_;
907 }
908 
910  const ChunkKey& key_prefix) {
911  LOG(FATAL) << "getChunkMetadataVecForPrefix not supported for BufferMgr.";
912 }
913 
914 const std::vector<BufferList>& BufferMgr::getSlabSegments() {
915  return slab_segments_;
916 }
917 
918 void BufferMgr::removeTableRelatedDS(const int db_id, const int table_id) {
919  UNREACHABLE();
920 }
921 } // namespace Buffer_Namespace
size_t getAllocated() override
Definition: BufferMgr.cpp:503
AbstractBufferMgr * parent_mgr_
Definition: BufferMgr.h:211
std::vector< int > ChunkKey
Definition: types.h:36
~BufferMgr() override
Destructor.
Definition: BufferMgr.cpp:81
virtual void allocateBuffer(BufferList::iterator seg_it, const size_t page_size, const size_t num_bytes)=0
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t page_size=0, const size_t initial_size=0) override
Creates a chunk with the specified key and page size.
Definition: BufferMgr.cpp:110
BufferList::iterator seg_it_
Definition: Buffer.h:163
BufferList::iterator findFreeBuffer(size_t num_bytes)
Gets a buffer of required size and returns an iterator to it.
Definition: BufferMgr.cpp:281
const size_t max_buffer_pool_size_
Definition: BufferMgr.h:172
size_t getMaxSize() override
Definition: BufferMgr.cpp:498
void syncEncoder(const AbstractBuffer *src_buffer)
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:832
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
size_t getNumChunks() override
Definition: BufferMgr.cpp:892
#define LOG(tag)
Definition: Logger.h:285
virtual void addSlab(const size_t slab_size)=0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
void removeSegment(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:648
unsigned int buffer_epoch_
Definition: BufferMgr.h:213
#define UNREACHABLE()
Definition: Logger.h:338
std::vector< BufferList > slab_segments_
Definition: BufferMgr.h:180
BufferList::iterator findFreeBufferInSlab(const size_t slab_num, const size_t num_pages_requested)
Definition: BufferMgr.cpp:253
size_t size()
Returns the total number of bytes allocated.
Definition: BufferMgr.cpp:897
std::map< ChunkKey, BufferList::iterator > chunk_index_
Definition: BufferMgr.h:204
std::string printSeg(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:529
const size_t min_slab_size_
max number of bytes allocated for the buffer pool
Definition: BufferMgr.h:173
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
Definition: BufferMgr.cpp:198
bool isAllocationCapped() override
Definition: BufferMgr.cpp:508
std::string printSlabs() override
Definition: BufferMgr.cpp:459
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
Definition: BufferMgr.cpp:591
This file includes the class specification for the buffer manager (BufferMgr), and related data struc...
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t num_bytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: BufferMgr.cpp:724
BufferList::iterator evict(BufferList::iterator &evict_start, const size_t num_pages_requested, const int slab_num)
Definition: BufferMgr.cpp:152
void free(AbstractBuffer *buffer) override
Definition: BufferMgr.cpp:883
void checkpoint() override
Definition: BufferMgr.cpp:679
std::mutex unsized_segs_mutex_
Definition: BufferMgr.h:200
void removeTableRelatedDS(const int db_id, const int table_id) override
Definition: BufferMgr.cpp:918
const std::vector< BufferList > & getSlabSegments()
Definition: BufferMgr.cpp:914
void deleteBuffersWithPrefix(const ChunkKey &key_prefix, const bool purge=true) override
Definition: BufferMgr.cpp:619
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
size_t getInUseSize() override
Definition: BufferMgr.cpp:517
bool g_enable_watchdog false
Definition: Execute.cpp:80
#define CHECK(condition)
Definition: Logger.h:291
AbstractBuffer * alloc(const size_t num_bytes=0) override
client is responsible for deleting memory allocated for b-&gt;mem_
Definition: BufferMgr.cpp:877
const size_t max_slab_size_
Definition: BufferMgr.h:175
std::string printSlab(size_t slab_num)
Definition: BufferMgr.cpp:430
void fetchBuffer(const ChunkKey &key, AbstractBuffer *dest_buffer, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:774
std::string keyToString(const ChunkKey &key)
Definition: BufferMgr.cpp:36
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata_vec, const ChunkKey &key_prefix) override
Definition: BufferMgr.cpp:909
std::vector< int8_t * > slabs_
Definition: BufferMgr.h:178
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: BufferMgr.cpp:601
#define VLOG(n)
Definition: Logger.h:388
Note(s): Forbid Copying Idiom 4.1.
Definition: Buffer.h:42
virtual void freeAllMem()=0