OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
BufferMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 
24 #include <algorithm>
25 #include <iomanip>
26 #include <limits>
27 
30 #include "Logger/Logger.h"
31 #include "Shared/measure.h"
32 
33 using namespace std;
34 
35 namespace Buffer_Namespace {
36 
37 std::string BufferMgr::keyToString(const ChunkKey& key) {
38  std::ostringstream oss;
39 
40  oss << " key: ";
41  for (auto sub_key : key) {
42  oss << sub_key << ",";
43  }
44  return oss.str();
45 }
46 
48 BufferMgr::BufferMgr(const int device_id,
49  const size_t max_buffer_pool_size,
50  const size_t min_slab_size,
51  const size_t max_slab_size,
52  const size_t page_size,
53  AbstractBufferMgr* parent_mgr)
54  : AbstractBufferMgr(device_id)
55  , max_buffer_pool_size_(max_buffer_pool_size)
56  , min_slab_size_(min_slab_size)
57  , max_slab_size_(max_slab_size)
58  , page_size_(page_size)
59  , num_pages_allocated_(0)
60  , allocations_capped_(false)
61  , parent_mgr_(parent_mgr)
62  , max_buffer_id_(0)
63  , buffer_epoch_(0) {
65  CHECK(page_size_ > 0);
66  // TODO change checks on run-time configurable slab size variables to exceptions
67  CHECK(min_slab_size_ > 0);
68  CHECK(max_slab_size_ > 0);
72 
77  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
78  // fail - this is the high water mark
79 }
80 
83  clear();
84 }
85 
89  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
90  // fail - this is the high water mark
91  allocations_capped_ = false;
92 }
93 
95  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
96  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
97  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
98 
99  for (auto& buf : chunk_index_) {
100  delete buf.second->buffer;
101  }
102 
103  chunk_index_.clear();
104  slabs_.clear();
105  slab_segments_.clear();
106  unsized_segs_.clear();
107  buffer_epoch_ = 0;
108 }
109 
112  const size_t chunk_page_size,
113  const size_t initial_size) {
114  // LOG(INFO) << printMap();
115  size_t actual_chunk_page_size = chunk_page_size;
116  if (actual_chunk_page_size == 0) {
117  actual_chunk_page_size = page_size_;
118  }
119 
120  // chunk_page_size is just for recording dirty pages
121  {
122  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
123  CHECK(chunk_index_.find(chunk_key) == chunk_index_.end());
124  BufferSeg buffer_seg(BufferSeg(-1, 0, USED));
125  buffer_seg.chunk_key = chunk_key;
126  std::lock_guard<std::mutex> unsizedSegsLock(unsized_segs_mutex_);
127  unsized_segs_.push_back(buffer_seg); // race condition?
128  chunk_index_[chunk_key] =
129  std::prev(unsized_segs_.end(),
130  1); // need to do this before allocating Buffer because doing so could
131  // change the segment used
132  }
133  // following should be safe outside the lock b/c first thing Buffer
134  // constructor does is pin (and its still in unsized segs at this point
135  // so can't be evicted)
136  try {
137  allocateBuffer(chunk_index_[chunk_key], actual_chunk_page_size, initial_size);
138  } catch (const OutOfMemory&) {
139  auto buffer_it = chunk_index_.find(chunk_key);
140  CHECK(buffer_it != chunk_index_.end());
141  buffer_it->second->buffer =
142  nullptr; // constructor failed for the buffer object so make sure to mark it null
143  // so deleteBuffer doesn't try to delete it
144  deleteBuffer(chunk_key);
145  throw;
146  }
147  CHECK(initial_size == 0 || chunk_index_[chunk_key]->buffer->getMemoryPtr());
148  // chunk_index_[chunk_key]->buffer->pin();
149  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
150  return chunk_index_[chunk_key]->buffer;
151 }
152 
153 BufferList::iterator BufferMgr::evict(BufferList::iterator& evict_start,
154  const size_t num_pages_requested,
155  const int slab_num) {
156  // We can assume here that buffer for evictStart either doesn't exist
157  // (evictStart is first buffer) or was not free, so don't need ot merge
158  // it
159  auto evict_it = evict_start;
160  size_t num_pages = 0;
161  size_t start_page = evict_start->start_page;
162  while (num_pages < num_pages_requested) {
163  if (evict_it->mem_status == USED) {
164  CHECK(evict_it->buffer->getPinCount() < 1);
165  }
166  num_pages += evict_it->num_pages;
167  if (evict_it->mem_status == USED && evict_it->chunk_key.size() > 0) {
168  chunk_index_.erase(evict_it->chunk_key);
169  }
170  if (evict_it->buffer != nullptr) {
171  // If we don't delete buffers here then we lose reference to them later and cause a
172  // memleak.
173  delete evict_it->buffer;
174  }
175  evict_it = slab_segments_[slab_num].erase(
176  evict_it); // erase operations returns next iterator - safe if we ever move
177  // to a vector (as opposed to erase(evict_it++)
178  }
179  BufferSeg data_seg(
180  start_page, num_pages_requested, USED, buffer_epoch_++); // until we can
181  // data_seg.pinCount++;
182  data_seg.slab_num = slab_num;
183  auto data_seg_it =
184  slab_segments_[slab_num].insert(evict_it, data_seg); // Will insert before evict_it
185  if (num_pages_requested < num_pages) {
186  size_t excess_pages = num_pages - num_pages_requested;
187  if (evict_it != slab_segments_[slab_num].end() &&
188  evict_it->mem_status == FREE) { // need to merge with current page
189  evict_it->start_page = start_page + num_pages_requested;
190  evict_it->num_pages += excess_pages;
191  } else { // need to insert a free seg before evict_it for excess_pages
192  BufferSeg free_seg(start_page + num_pages_requested, excess_pages, FREE);
193  slab_segments_[slab_num].insert(evict_it, free_seg);
194  }
195  }
196  return data_seg_it;
197 }
198 
199 BufferList::iterator BufferMgr::reserveBuffer(
200  BufferList::iterator& seg_it,
201  const size_t num_bytes) { // assumes buffer is already pinned
202 
203  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
204  size_t num_pages_extra_needed = num_pages_requested - seg_it->num_pages;
205 
206  if (num_pages_requested < seg_it->num_pages) {
207  // We already have enough pages in existing segment
208  return seg_it;
209  }
210  // First check for free segment after seg_it
211  int slab_num = seg_it->slab_num;
212  if (slab_num >= 0) { // not dummy page
213  BufferList::iterator next_it = std::next(seg_it);
214  if (next_it != slab_segments_[slab_num].end() && next_it->mem_status == FREE &&
215  next_it->num_pages >= num_pages_extra_needed) {
216  // Then we can just use the next BufferSeg which happens to be free
217  size_t leftover_pages = next_it->num_pages - num_pages_extra_needed;
218  seg_it->num_pages = num_pages_requested;
219  next_it->num_pages = leftover_pages;
220  next_it->start_page = seg_it->start_page + seg_it->num_pages;
221  return seg_it;
222  }
223  }
224  // If we're here then we couldn't keep buffer in existing slot
225  // need to find new segment, copy data over, and then delete old
226  auto new_seg_it = findFreeBuffer(num_bytes);
227 
228  // Below should be in copy constructor for BufferSeg?
229  new_seg_it->buffer = seg_it->buffer;
230  new_seg_it->chunk_key = seg_it->chunk_key;
231  int8_t* old_mem = new_seg_it->buffer->mem_;
232  new_seg_it->buffer->mem_ =
233  slabs_[new_seg_it->slab_num] + new_seg_it->start_page * page_size_;
234 
235  // now need to copy over memory
236  // only do this if the old segment is valid (i.e. not new w/ unallocated buffer
237  if (seg_it->start_page >= 0 && seg_it->buffer->mem_ != 0) {
238  new_seg_it->buffer->writeData(old_mem,
239  new_seg_it->buffer->size(),
240  0,
241  new_seg_it->buffer->getType(),
242  device_id_);
243  }
244  // Decrement pin count to reverse effect above
245  removeSegment(seg_it);
246  {
247  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
248  chunk_index_[new_seg_it->chunk_key] = new_seg_it;
249  }
250 
251  return new_seg_it;
252 }
253 
254 BufferList::iterator BufferMgr::findFreeBufferInSlab(const size_t slab_num,
255  const size_t num_pages_requested) {
256  for (auto buffer_it = slab_segments_[slab_num].begin();
257  buffer_it != slab_segments_[slab_num].end();
258  ++buffer_it) {
259  if (buffer_it->mem_status == FREE && buffer_it->num_pages >= num_pages_requested) {
260  // startPage doesn't change
261  size_t excess_pages = buffer_it->num_pages - num_pages_requested;
262  buffer_it->num_pages = num_pages_requested;
263  buffer_it->mem_status = USED;
264  buffer_it->last_touched = buffer_epoch_++;
265  buffer_it->slab_num = slab_num;
266  if (excess_pages > 0) {
267  BufferSeg free_seg(
268  buffer_it->start_page + num_pages_requested, excess_pages, FREE);
269  auto temp_it = buffer_it; // this should make a copy and not be a reference
270  // - as we do not want to increment buffer_it
271  temp_it++;
272  slab_segments_[slab_num].insert(temp_it, free_seg);
273  }
274  return buffer_it;
275  }
276  }
277  // If here then we did not find a free buffer of sufficient size in this slab,
278  // return the end iterator
279  return slab_segments_[slab_num].end();
280 }
281 
282 BufferList::iterator BufferMgr::findFreeBuffer(size_t num_bytes) {
283  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
284  if (num_pages_requested > max_num_pages_per_slab_) {
285  throw TooBigForSlab(num_bytes);
286  }
287 
288  size_t num_slabs = slab_segments_.size();
289 
290  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
291  auto seg_it = findFreeBufferInSlab(slab_num, num_pages_requested);
292  if (seg_it != slab_segments_[slab_num].end()) {
293  return seg_it;
294  }
295  }
296 
297  // If we're here then we didn't find a free segment of sufficient size
298  // First we see if we can add another slab
300  try {
302  if (pagesLeft < current_max_slab_page_size_) {
303  current_max_slab_page_size_ = pagesLeft;
304  }
305  if (num_pages_requested <=
306  current_max_slab_page_size_) { // don't try to allocate if the
307  // new slab won't be big enough
308  auto alloc_ms = measure<>::execution(
310  LOG(INFO) << "ALLOCATION slab of " << current_max_slab_page_size_ << " pages ("
311  << current_max_slab_page_size_ * page_size_ << "B) created in "
312  << alloc_ms << " ms " << getStringMgrType() << ":" << device_id_;
313  } else {
314  break;
315  }
316  // if here then addSlab succeeded
317  num_pages_allocated_ += current_max_slab_page_size_;
318  return findFreeBufferInSlab(
319  num_slabs,
320  num_pages_requested); // has to succeed since we made sure to request a slab
321  // big enough to accomodate request
322  } catch (std::runtime_error& error) { // failed to allocate slab
323  LOG(INFO) << "ALLOCATION Attempted slab of " << current_max_slab_page_size_
324  << " pages (" << current_max_slab_page_size_ * page_size_ << "B) failed "
325  << getStringMgrType() << ":" << device_id_;
326  // check if there is any point halving currentMaxSlabSize and trying again
327  // if the request wont fit in half available then let try once at full size
328  // if we have already tries at full size and failed then break as
329  // there could still be room enough for other later request but
330  // not for his current one
331  if (num_pages_requested > current_max_slab_page_size_ / 2 &&
332  current_max_slab_page_size_ != num_pages_requested) {
333  current_max_slab_page_size_ = num_pages_requested;
334  } else {
337  (min_num_pages_per_slab_)) { // should be a constant
338  allocations_capped_ = true;
339  // dump out the slabs and their sizes
340  LOG(INFO) << "ALLOCATION Capped " << current_max_slab_page_size_
341  << " Minimum size = " << (min_num_pages_per_slab_) << " "
342  << getStringMgrType() << ":" << device_id_;
343  }
344  }
345  }
346  }
347 
349  throw FailedToCreateFirstSlab(num_bytes);
350  }
351 
352  // If here then we can't add a slab - so we need to evict
353 
354  size_t min_score = std::numeric_limits<size_t>::max();
355  // We're going for lowest score here, like golf
356  // This is because score is the sum of the lastTouched score for all pages evicted.
357  // Evicting fewer pages and older pages will lower the score
358  BufferList::iterator best_eviction_start = slab_segments_[0].end();
359  int best_eviction_start_slab = -1;
360  int slab_num = 0;
361 
362  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
363  ++slab_it, ++slab_num) {
364  for (auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) {
365  // Note there are some shortcuts we could take here - like we should never consider
366  // a USED buffer coming after a free buffer as we would have used the FREE buffer,
367  // but we won't worry about this for now
368 
369  // We can't evict pinned buffers - only normal usedbuffers
370 
371  // if (buffer_it->mem_status == FREE || buffer_it->buffer->getPinCount() == 0) {
372  size_t page_count = 0;
373  size_t score = 0;
374  bool solution_found = false;
375  auto evict_it = buffer_it;
376  for (; evict_it != slab_segments_[slab_num].end(); ++evict_it) {
377  // pinCount should never go up - only down because we have
378  // global lock on buffer pool and pin count only increments
379  // on getChunk
380  if (evict_it->mem_status == USED && evict_it->buffer->getPinCount() > 0) {
381  break;
382  }
383  page_count += evict_it->num_pages;
384  if (evict_it->mem_status == USED) {
385  // MAT changed from
386  // score += evictIt->lastTouched;
387  // Issue was thrashing when going from 8M fragment size chunks back to 64M
388  // basically the large chunks were being evicted prior to small as many small
389  // chunk score was larger than one large chunk so it always would evict a large
390  // chunk so under memory pressure a query would evict its own current chunks and
391  // cause reloads rather than evict several smaller unused older chunks.
392  score = std::max(score, static_cast<size_t>(evict_it->last_touched));
393  }
394  if (page_count >= num_pages_requested) {
395  solution_found = true;
396  break;
397  }
398  }
399  if (solution_found && score < min_score) {
400  min_score = score;
401  best_eviction_start = buffer_it;
402  best_eviction_start_slab = slab_num;
403  } else if (evict_it == slab_segments_[slab_num].end()) {
404  // this means that every segment after this will fail as well, so our search has
405  // proven futile
406  // throw std::runtime_error ("Couldn't evict chunks to get free space");
407  break;
408  // in reality we should try to rearrange the buffer to get more contiguous free
409  // space
410  }
411  // other possibility is ending at PINNED - do nothing in this case
412  //}
413  }
414  }
415  if (best_eviction_start == slab_segments_[0].end()) {
416  LOG(ERROR) << "ALLOCATION failed to find " << num_bytes << "B throwing out of memory "
417  << getStringMgrType() << ":" << device_id_;
418  VLOG(2) << printSlabs();
419  throw OutOfMemory(num_bytes);
420  }
421  LOG(INFO) << "ALLOCATION failed to find " << num_bytes << "B free. Forcing Eviction."
422  << " Eviction start " << best_eviction_start->start_page
423  << " Number pages requested " << num_pages_requested
424  << " Best Eviction Start Slab " << best_eviction_start_slab << " "
425  << getStringMgrType() << ":" << device_id_;
426  best_eviction_start =
427  evict(best_eviction_start, num_pages_requested, best_eviction_start_slab);
428  return best_eviction_start;
429 }
430 
431 std::string BufferMgr::printSlab(size_t slab_num) {
432  std::ostringstream tss;
433  // size_t lastEnd = 0;
434  tss << "Slab St.Page Pages Touch" << std::endl;
435  for (auto segment : slab_segments_[slab_num]) {
436  tss << setfill(' ') << setw(4) << slab_num;
437  // tss << " BSN: " << setfill(' ') << setw(2) << segment.slab_num;
438  tss << setfill(' ') << setw(8) << segment.start_page;
439  tss << setfill(' ') << setw(8) << segment.num_pages;
440  // tss << " GAP: " << setfill(' ') << setw(7) << segment.start_page - lastEnd;
441  // lastEnd = segment.start_page + segment.num_pages;
442  tss << setfill(' ') << setw(7) << segment.last_touched;
443  // tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
444  if (segment.mem_status == FREE) {
445  tss << " FREE"
446  << " ";
447  } else {
448  tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
449  tss << " USED - Chunk: ";
450 
451  for (auto&& key_elem : segment.chunk_key) {
452  tss << key_elem << ",";
453  }
454  }
455  tss << std::endl;
456  }
457  return tss.str();
458 }
459 
460 std::string BufferMgr::printSlabs() {
461  std::ostringstream tss;
462  tss << std::endl
463  << "Slabs Contents: "
464  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
465  size_t num_slabs = slab_segments_.size();
466  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
467  tss << printSlab(slab_num);
468  }
469  tss << "--------------------" << std::endl;
470  return tss.str();
471 }
472 
474  bool pinned_exists = false;
475  for (auto& segment_list : slab_segments_) {
476  for (auto& segment : segment_list) {
477  if (segment.mem_status == FREE) {
478  // no need to free
479  } else if (segment.buffer->getPinCount() < 1) {
480  deleteBuffer(segment.chunk_key, true);
481  } else {
482  pinned_exists = true;
483  }
484  }
485  }
486  if (!pinned_exists) {
487  // lets actually clear the buffer from memory
488  freeAllMem();
489  clear();
490  reinit();
491  }
492 }
493 
494 // return the maximum size this buffer can be in bytes
497 }
498 
499 // return how large the buffer are currently allocated
502 }
503 
504 //
506  return allocations_capped_;
507 }
508 
510  return page_size_;
511 }
512 
513 // return the size of the chunks in use in bytes
515  size_t in_use = 0;
516  for (auto& segment_list : slab_segments_) {
517  for (auto& segment : segment_list) {
518  if (segment.mem_status != FREE) {
519  in_use += segment.num_pages * page_size_;
520  }
521  }
522  }
523  return in_use;
524 }
525 
526 std::string BufferMgr::printSeg(BufferList::iterator& seg_it) {
527  std::ostringstream tss;
528  tss << "SN: " << setfill(' ') << setw(2) << seg_it->slab_num;
529  tss << " SP: " << setfill(' ') << setw(7) << seg_it->start_page;
530  tss << " NP: " << setfill(' ') << setw(7) << seg_it->num_pages;
531  tss << " LT: " << setfill(' ') << setw(7) << seg_it->last_touched;
532  tss << " PC: " << setfill(' ') << setw(2) << seg_it->buffer->getPinCount();
533  if (seg_it->mem_status == FREE) {
534  tss << " FREE"
535  << " ";
536  } else {
537  tss << " USED - Chunk: ";
538  for (auto vec_it = seg_it->chunk_key.begin(); vec_it != seg_it->chunk_key.end();
539  ++vec_it) {
540  tss << *vec_it << ",";
541  }
542  tss << std::endl;
543  }
544  return tss.str();
545 }
546 
547 std::string BufferMgr::printMap() {
548  std::ostringstream tss;
549  int seg_num = 1;
550  tss << std::endl
551  << "Map Contents: "
552  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
553  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
554  for (auto seg_it = chunk_index_.begin(); seg_it != chunk_index_.end();
555  ++seg_it, ++seg_num) {
556  // tss << "Map Entry " << seg_num << ": ";
557  // for (auto vec_it = seg_it->first.begin(); vec_it != seg_it->first.end();
558  // ++vec_it)
559  // {
560  // tss << *vec_it << ",";
561  // }
562  // tss << " " << std::endl;
563  tss << printSeg(seg_it->second);
564  }
565  tss << "--------------------" << std::endl;
566  return tss.str();
567 }
568 
570  int seg_num = 1;
571  int slab_num = 1;
572  LOG(INFO) << std::endl << " " << getStringMgrType() << ":" << device_id_;
573  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
574  ++slab_it, ++slab_num) {
575  LOG(INFO) << "Slab Num: " << slab_num << " " << getStringMgrType() << ":"
576  << device_id_;
577  for (auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) {
578  LOG(INFO) << "Segment: " << seg_num << " " << getStringMgrType() << ":"
579  << device_id_;
580  printSeg(seg_it);
581  LOG(INFO) << " " << getStringMgrType() << ":" << device_id_;
582  }
583  LOG(INFO) << "--------------------"
584  << " " << getStringMgrType() << ":" << device_id_;
585  }
586 }
587 
589  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
590  if (chunk_index_.find(key) == chunk_index_.end()) {
591  return false;
592  } else {
593  return true;
594  }
595 }
596 
598 void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) {
599  // Note: purge is unused
600  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
601 
602  // lookup the buffer for the Chunk in chunk_index_
603  auto buffer_it = chunk_index_.find(key);
604  CHECK(buffer_it != chunk_index_.end());
605  auto seg_it = buffer_it->second;
606  chunk_index_.erase(buffer_it);
607  chunk_index_lock.unlock();
608  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
609  if (seg_it->buffer) {
610  delete seg_it->buffer; // Delete Buffer for segment
611  seg_it->buffer = 0;
612  }
613  removeSegment(seg_it);
614 }
615 
616 void BufferMgr::deleteBuffersWithPrefix(const ChunkKey& key_prefix, const bool) {
617  // Note: purge is unused
618  // lookup the buffer for the Chunk in chunk_index_
619  std::lock_guard<std::mutex> sized_segs_lock(
620  sized_segs_mutex_); // Take this lock early to prevent deadlock with
621  // reserveBuffer which needs segs_mutex_ and then
622  // chunk_index_mutex_
623  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
624  auto startChunkIt = chunk_index_.lower_bound(key_prefix);
625  if (startChunkIt == chunk_index_.end()) {
626  return;
627  }
628 
629  auto buffer_it = startChunkIt;
630  while (buffer_it != chunk_index_.end() &&
631  std::search(buffer_it->first.begin(),
632  buffer_it->first.begin() + key_prefix.size(),
633  key_prefix.begin(),
634  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
635  auto seg_it = buffer_it->second;
636  if (seg_it->buffer) {
637  if (seg_it->buffer->getPinCount() != 0) {
638  // leave the buffer and buffer segment in place, they are in use elsewhere. once
639  // unpinned, the buffer will be inaccessible and evicted
640  buffer_it++;
641  continue;
642  }
643  delete seg_it->buffer; // Delete Buffer for segment
644  seg_it->buffer = nullptr;
645  }
646  removeSegment(seg_it);
647  chunk_index_.erase(buffer_it++);
648  }
649 }
650 
651 void BufferMgr::removeSegment(BufferList::iterator& seg_it) {
652  // Note: does not delete buffer as this may be moved somewhere else
653  int slab_num = seg_it->slab_num;
654  // cout << "Slab num: " << slabNum << endl;
655  if (slab_num < 0) {
656  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
657  unsized_segs_.erase(seg_it);
658  } else {
659  if (seg_it != slab_segments_[slab_num].begin()) {
660  auto prev_it = std::prev(seg_it);
661  // LOG(INFO) << "PrevIt: " << " " << getStringMgrType() << ":" << device_id_;
662  // printSeg(prev_it);
663  if (prev_it->mem_status == FREE) {
664  seg_it->start_page = prev_it->start_page;
665  seg_it->num_pages += prev_it->num_pages;
666  slab_segments_[slab_num].erase(prev_it);
667  }
668  }
669  auto next_it = std::next(seg_it);
670  if (next_it != slab_segments_[slab_num].end()) {
671  if (next_it->mem_status == FREE) {
672  seg_it->num_pages += next_it->num_pages;
673  slab_segments_[slab_num].erase(next_it);
674  }
675  }
676  seg_it->mem_status = FREE;
677  // seg_it->pinCount = 0;
678  seg_it->buffer = 0;
679  }
680 }
681 
683  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
684  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
685 
686  for (auto& chunk_itr : chunk_index_) {
687  // checks that buffer is actual chunk (not just buffer) and is dirty
688  auto& buffer_itr = chunk_itr.second;
689  if (buffer_itr->chunk_key[0] != -1 && buffer_itr->buffer->isDirty()) {
690  parent_mgr_->putBuffer(buffer_itr->chunk_key, buffer_itr->buffer);
691  buffer_itr->buffer->clearDirtyBits();
692  }
693  }
694 }
695 
696 void BufferMgr::checkpoint(const int db_id, const int tb_id) {
697  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
698  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
699 
700  ChunkKey key_prefix;
701  key_prefix.push_back(db_id);
702  key_prefix.push_back(tb_id);
703  auto start_chunk_it = chunk_index_.lower_bound(key_prefix);
704  if (start_chunk_it == chunk_index_.end()) {
705  return;
706  }
707 
708  auto buffer_it = start_chunk_it;
709  while (buffer_it != chunk_index_.end() &&
710  std::search(buffer_it->first.begin(),
711  buffer_it->first.begin() + key_prefix.size(),
712  key_prefix.begin(),
713  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
714  if (buffer_it->second->chunk_key[0] != -1 &&
715  buffer_it->second->buffer->isDirty()) { // checks that buffer is actual chunk
716  // (not just buffer) and is dirty
717 
718  parent_mgr_->putBuffer(buffer_it->second->chunk_key, buffer_it->second->buffer);
719  buffer_it->second->buffer->clearDirtyBits();
720  }
721  buffer_it++;
722  }
723 }
724 
727 AbstractBuffer* BufferMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
728  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
729 
730  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
731  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
732  auto buffer_it = chunk_index_.find(key);
733  bool found_buffer = buffer_it != chunk_index_.end();
734  chunk_index_lock.unlock();
735  if (found_buffer) {
736  CHECK(buffer_it->second->buffer);
737  buffer_it->second->buffer->pin();
738  sized_segs_lock.unlock();
739 
740  buffer_it->second->last_touched = buffer_epoch_++; // race
741 
742  if (buffer_it->second->buffer->size() < num_bytes) {
743  // need to fetch part of buffer we don't have - up to numBytes
744  parent_mgr_->fetchBuffer(key, buffer_it->second->buffer, num_bytes);
745  }
746  return buffer_it->second->buffer;
747  } else { // If wasn't in pool then we need to fetch it
748  sized_segs_lock.unlock();
749  // createChunk pins for us
750  AbstractBuffer* buffer = createBuffer(key, page_size_, num_bytes);
751  try {
752  parent_mgr_->fetchBuffer(
753  key, buffer, num_bytes); // this should put buffer in a BufferSegment
754  } catch (const foreign_storage::ForeignStorageException& error) {
755  deleteBuffer(key); // buffer failed to load, ensure it is cleaned up
756  LOG(WARNING) << "Get chunk - Could not load chunk " << keyToString(key)
757  << " from foreign storage. Error was " << error.what();
758  throw error;
759  } catch (const std::exception& error) {
760  LOG(FATAL) << "Get chunk - Could not find chunk " << keyToString(key)
761  << " in buffer pool or parent buffer pools. Error was " << error.what();
762  }
763  return buffer;
764  }
765 }
766 
768  AbstractBuffer* dest_buffer,
769  const size_t num_bytes) {
770  std::unique_lock<std::mutex> lock(global_mutex_); // granular lock
771  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
772  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
773 
774  auto buffer_it = chunk_index_.find(key);
775  bool found_buffer = buffer_it != chunk_index_.end();
776  chunk_index_lock.unlock();
777  AbstractBuffer* buffer;
778  if (!found_buffer) {
779  sized_segs_lock.unlock();
780  CHECK(parent_mgr_ != 0);
781  buffer = createBuffer(key, page_size_, num_bytes); // will pin buffer
782  try {
783  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
784  } catch (std::runtime_error& error) {
785  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
786  }
787  } else {
788  buffer = buffer_it->second->buffer;
789  buffer->pin();
790  if (num_bytes > buffer->size()) {
791  try {
792  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
793  } catch (std::runtime_error& error) {
794  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
795  }
796  }
797  sized_segs_lock.unlock();
798  }
799  lock.unlock();
800  buffer->copyTo(dest_buffer, num_bytes);
801  buffer->unPin();
802 }
803 
805  AbstractBuffer* src_buffer,
806  const size_t num_bytes) {
807  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
808  auto buffer_it = chunk_index_.find(key);
809  bool found_buffer = buffer_it != chunk_index_.end();
810  chunk_index_lock.unlock();
811  AbstractBuffer* buffer;
812  if (!found_buffer) {
813  buffer = createBuffer(key, page_size_);
814  } else {
815  buffer = buffer_it->second->buffer;
816  }
817  size_t old_buffer_size = buffer->size();
818  size_t new_buffer_size = num_bytes == 0 ? src_buffer->size() : num_bytes;
819  CHECK(!buffer->isDirty());
820 
821  if (src_buffer->isUpdated()) {
822  //@todo use dirty flags to only flush pages of chunk that need to
823  // be flushed
824  buffer->write((int8_t*)src_buffer->getMemoryPtr(),
825  new_buffer_size,
826  0,
827  src_buffer->getType(),
828  src_buffer->getDeviceId());
829  } else if (src_buffer->isAppended()) {
830  CHECK(old_buffer_size < new_buffer_size);
831  buffer->append((int8_t*)src_buffer->getMemoryPtr() + old_buffer_size,
832  new_buffer_size - old_buffer_size,
833  src_buffer->getType(),
834  src_buffer->getDeviceId());
835  } else {
836  UNREACHABLE();
837  }
838  src_buffer->clearDirtyBits();
839  buffer->syncEncoder(src_buffer);
840  return buffer;
841 }
842 
844  std::lock_guard<std::mutex> lock(buffer_id_mutex_);
845  return max_buffer_id_++;
846 }
847 
849 AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) {
850  std::lock_guard<std::mutex> lock(global_mutex_);
851  ChunkKey chunk_key = {-1, getBufferId()};
852  return createBuffer(chunk_key, page_size_, num_bytes);
853 }
854 
856  std::lock_guard<std::mutex> lock(global_mutex_); // hack for now
857  Buffer* casted_buffer = dynamic_cast<Buffer*>(buffer);
858  if (casted_buffer == 0) {
859  LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
860  }
861  deleteBuffer(casted_buffer->seg_it_->chunk_key);
862 }
863 
865  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
866  return chunk_index_.size();
867 }
868 
869 size_t BufferMgr::size() {
870  return num_pages_allocated_;
871 }
872 
874  return max_buffer_pool_size_;
875 }
876 
878  return max_slab_size_;
879 }
880 
882  const ChunkKey& key_prefix) {
883  LOG(FATAL) << "getChunkMetadataVecForPrefix not supported for BufferMgr.";
884 }
885 
886 const std::vector<BufferList>& BufferMgr::getSlabSegments() {
887  return slab_segments_;
888 }
889 
890 void BufferMgr::removeTableRelatedDS(const int db_id, const int table_id) {
891  UNREACHABLE();
892 }
893 } // namespace Buffer_Namespace
size_t getAllocated() override
Definition: BufferMgr.cpp:500
AbstractBufferMgr * parent_mgr_
Definition: BufferMgr.h:210
std::vector< int > ChunkKey
Definition: types.h:37
~BufferMgr() override
Destructor.
Definition: BufferMgr.cpp:82
virtual void allocateBuffer(BufferList::iterator seg_it, const size_t page_size, const size_t num_bytes)=0
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t page_size=0, const size_t initial_size=0) override
Creates a chunk with the specified key and page size.
Definition: BufferMgr.cpp:111
BufferList::iterator seg_it_
Definition: Buffer.h:161
BufferList::iterator findFreeBuffer(size_t num_bytes)
Gets a buffer of required size and returns an iterator to it.
Definition: BufferMgr.cpp:282
const size_t max_buffer_pool_size_
Definition: BufferMgr.h:173
size_t getMaxSize() override
Definition: BufferMgr.cpp:495
void syncEncoder(const AbstractBuffer *src_buffer)
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:804
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
size_t getNumChunks() override
Definition: BufferMgr.cpp:864
#define LOG(tag)
Definition: Logger.h:203
virtual void addSlab(const size_t slab_size)=0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
void removeSegment(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:651
unsigned int buffer_epoch_
Definition: BufferMgr.h:212
#define UNREACHABLE()
Definition: Logger.h:253
std::vector< BufferList > slab_segments_
Definition: BufferMgr.h:181
BufferList::iterator findFreeBufferInSlab(const size_t slab_num, const size_t num_pages_requested)
Definition: BufferMgr.cpp:254
size_t size()
Returns the total number of bytes allocated.
Definition: BufferMgr.cpp:869
std::map< ChunkKey, BufferList::iterator > chunk_index_
Definition: BufferMgr.h:203
std::string printSeg(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:526
const size_t min_slab_size_
max number of bytes allocated for the buffer pool
Definition: BufferMgr.h:174
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
Definition: BufferMgr.cpp:199
bool isAllocationCapped() override
Definition: BufferMgr.cpp:505
std::string printSlabs() override
Definition: BufferMgr.cpp:460
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
Definition: BufferMgr.cpp:588
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t num_bytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: BufferMgr.cpp:727
BufferList::iterator evict(BufferList::iterator &evict_start, const size_t num_pages_requested, const int slab_num)
Definition: BufferMgr.cpp:153
void free(AbstractBuffer *buffer) override
Definition: BufferMgr.cpp:855
void checkpoint() override
Definition: BufferMgr.cpp:682
std::mutex unsized_segs_mutex_
Definition: BufferMgr.h:199
void removeTableRelatedDS(const int db_id, const int table_id) override
Definition: BufferMgr.cpp:890
const std::vector< BufferList > & getSlabSegments()
Definition: BufferMgr.cpp:886
void deleteBuffersWithPrefix(const ChunkKey &key_prefix, const bool purge=true) override
Definition: BufferMgr.cpp:616
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
size_t getInUseSize() override
Definition: BufferMgr.cpp:514
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:209
AbstractBuffer * alloc(const size_t num_bytes=0) override
client is responsible for deleting memory allocated for b-&gt;mem_
Definition: BufferMgr.cpp:849
const size_t max_slab_size_
Definition: BufferMgr.h:176
std::string printSlab(size_t slab_num)
Definition: BufferMgr.cpp:431
void fetchBuffer(const ChunkKey &key, AbstractBuffer *dest_buffer, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:767
std::string keyToString(const ChunkKey &key)
Definition: BufferMgr.cpp:37
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata_vec, const ChunkKey &key_prefix) override
Definition: BufferMgr.cpp:881
std::vector< int8_t * > slabs_
Definition: BufferMgr.h:179
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: BufferMgr.cpp:598
#define VLOG(n)
Definition: Logger.h:303
Note(s): Forbid Copying Idiom 4.1.
Definition: Buffer.h:42
virtual void freeAllMem()=0