OmniSciDB  94e8789169
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BufferMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 
24 #include <algorithm>
25 #include <iomanip>
26 #include <limits>
27 
30 #include "Logger/Logger.h"
31 #include "Shared/measure.h"
32 
33 using namespace std;
34 
35 namespace Buffer_Namespace {
36 
37 std::string BufferMgr::keyToString(const ChunkKey& key) {
38  std::ostringstream oss;
39 
40  oss << " key: ";
41  for (auto sub_key : key) {
42  oss << sub_key << ",";
43  }
44  return oss.str();
45 }
46 
48 BufferMgr::BufferMgr(const int device_id,
49  const size_t max_buffer_pool_size,
50  const size_t min_slab_size,
51  const size_t max_slab_size,
52  const size_t page_size,
53  AbstractBufferMgr* parent_mgr)
54  : AbstractBufferMgr(device_id)
55  , max_buffer_pool_size_(max_buffer_pool_size)
56  , min_slab_size_(min_slab_size)
57  , max_slab_size_(max_slab_size)
58  , page_size_(page_size)
59  , num_pages_allocated_(0)
60  , allocations_capped_(false)
61  , parent_mgr_(parent_mgr)
62  , max_buffer_id_(0)
63  , buffer_epoch_(0) {
65  CHECK(page_size_ > 0);
66  // TODO change checks on run-time configurable slab size variables to exceptions
67  CHECK(min_slab_size_ > 0);
68  CHECK(max_slab_size_ > 0);
72 
77  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
78  // fail - this is the high water mark
79 }
80 
83  clear();
84 }
85 
89  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
90  // fail - this is the high water mark
91  allocations_capped_ = false;
92 }
93 
95  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
96  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
97  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
98  for (auto& buf : chunk_index_) {
99  delete buf.second->buffer;
100  }
101 
102  chunk_index_.clear();
103  slabs_.clear();
104  slab_segments_.clear();
105  unsized_segs_.clear();
106  buffer_epoch_ = 0;
107 }
108 
111  const size_t chunk_page_size,
112  const size_t initial_size) {
113  // LOG(INFO) << printMap();
114  size_t actual_chunk_page_size = chunk_page_size;
115  if (actual_chunk_page_size == 0) {
116  actual_chunk_page_size = page_size_;
117  }
118 
119  // chunk_page_size is just for recording dirty pages
120  {
121  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
122  CHECK(chunk_index_.find(chunk_key) == chunk_index_.end());
123  BufferSeg buffer_seg(BufferSeg(-1, 0, USED));
124  buffer_seg.chunk_key = chunk_key;
125  std::lock_guard<std::mutex> unsizedSegsLock(unsized_segs_mutex_);
126  unsized_segs_.push_back(buffer_seg); // race condition?
127  chunk_index_[chunk_key] =
128  std::prev(unsized_segs_.end(),
129  1); // need to do this before allocating Buffer because doing so could
130  // change the segment used
131  }
132  // following should be safe outside the lock b/c first thing Buffer
133  // constructor does is pin (and its still in unsized segs at this point
134  // so can't be evicted)
135  try {
136  allocateBuffer(chunk_index_[chunk_key], actual_chunk_page_size, initial_size);
137  } catch (const OutOfMemory&) {
138  auto buffer_it = chunk_index_.find(chunk_key);
139  CHECK(buffer_it != chunk_index_.end());
140  buffer_it->second->buffer =
141  nullptr; // constructor failed for the buffer object so make sure to mark it null
142  // so deleteBuffer doesn't try to delete it
143  deleteBuffer(chunk_key);
144  throw;
145  }
146  CHECK(initial_size == 0 || chunk_index_[chunk_key]->buffer->getMemoryPtr());
147  // chunk_index_[chunk_key]->buffer->pin();
148  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
149  return chunk_index_[chunk_key]->buffer;
150 }
151 
152 BufferList::iterator BufferMgr::evict(BufferList::iterator& evict_start,
153  const size_t num_pages_requested,
154  const int slab_num) {
155  // We can assume here that buffer for evictStart either doesn't exist
156  // (evictStart is first buffer) or was not free, so don't need ot merge
157  // it
158  auto evict_it = evict_start;
159  size_t num_pages = 0;
160  size_t start_page = evict_start->start_page;
161  while (num_pages < num_pages_requested) {
162  if (evict_it->mem_status == USED) {
163  CHECK(evict_it->buffer->getPinCount() < 1);
164  }
165  num_pages += evict_it->num_pages;
166  if (evict_it->mem_status == USED && evict_it->chunk_key.size() > 0) {
167  chunk_index_.erase(evict_it->chunk_key);
168  }
169  evict_it = slab_segments_[slab_num].erase(
170  evict_it); // erase operations returns next iterator - safe if we ever move
171  // to a vector (as opposed to erase(evict_it++)
172  }
173  BufferSeg data_seg(
174  start_page, num_pages_requested, USED, buffer_epoch_++); // until we can
175  // data_seg.pinCount++;
176  data_seg.slab_num = slab_num;
177  auto data_seg_it =
178  slab_segments_[slab_num].insert(evict_it, data_seg); // Will insert before evict_it
179  if (num_pages_requested < num_pages) {
180  size_t excess_pages = num_pages - num_pages_requested;
181  if (evict_it != slab_segments_[slab_num].end() &&
182  evict_it->mem_status == FREE) { // need to merge with current page
183  evict_it->start_page = start_page + num_pages_requested;
184  evict_it->num_pages += excess_pages;
185  } else { // need to insert a free seg before evict_it for excess_pages
186  BufferSeg free_seg(start_page + num_pages_requested, excess_pages, FREE);
187  slab_segments_[slab_num].insert(evict_it, free_seg);
188  }
189  }
190  return data_seg_it;
191 }
192 
193 BufferList::iterator BufferMgr::reserveBuffer(
194  BufferList::iterator& seg_it,
195  const size_t num_bytes) { // assumes buffer is already pinned
196 
197  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
198  size_t num_pages_extra_needed = num_pages_requested - seg_it->num_pages;
199 
200  if (num_pages_requested < seg_it->num_pages) {
201  // We already have enough pages in existing segment
202  return seg_it;
203  }
204  // First check for free segment after seg_it
205  int slab_num = seg_it->slab_num;
206  if (slab_num >= 0) { // not dummy page
207  BufferList::iterator next_it = std::next(seg_it);
208  if (next_it != slab_segments_[slab_num].end() && next_it->mem_status == FREE &&
209  next_it->num_pages >= num_pages_extra_needed) {
210  // Then we can just use the next BufferSeg which happens to be free
211  size_t leftover_pages = next_it->num_pages - num_pages_extra_needed;
212  seg_it->num_pages = num_pages_requested;
213  next_it->num_pages = leftover_pages;
214  next_it->start_page = seg_it->start_page + seg_it->num_pages;
215  return seg_it;
216  }
217  }
218  // If we're here then we couldn't keep buffer in existing slot
219  // need to find new segment, copy data over, and then delete old
220  auto new_seg_it = findFreeBuffer(num_bytes);
221 
222  // Below should be in copy constructor for BufferSeg?
223  new_seg_it->buffer = seg_it->buffer;
224  new_seg_it->chunk_key = seg_it->chunk_key;
225  int8_t* old_mem = new_seg_it->buffer->mem_;
226  new_seg_it->buffer->mem_ =
227  slabs_[new_seg_it->slab_num] + new_seg_it->start_page * page_size_;
228 
229  // now need to copy over memory
230  // only do this if the old segment is valid (i.e. not new w/ unallocated buffer
231  if (seg_it->start_page >= 0 && seg_it->buffer->mem_ != 0) {
232  new_seg_it->buffer->writeData(old_mem,
233  new_seg_it->buffer->size(),
234  0,
235  new_seg_it->buffer->getType(),
236  device_id_);
237  }
238  // Decrement pin count to reverse effect above
239  removeSegment(seg_it);
240  {
241  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
242  chunk_index_[new_seg_it->chunk_key] = new_seg_it;
243  }
244 
245  return new_seg_it;
246 }
247 
248 BufferList::iterator BufferMgr::findFreeBufferInSlab(const size_t slab_num,
249  const size_t num_pages_requested) {
250  for (auto buffer_it = slab_segments_[slab_num].begin();
251  buffer_it != slab_segments_[slab_num].end();
252  ++buffer_it) {
253  if (buffer_it->mem_status == FREE && buffer_it->num_pages >= num_pages_requested) {
254  // startPage doesn't change
255  size_t excess_pages = buffer_it->num_pages - num_pages_requested;
256  buffer_it->num_pages = num_pages_requested;
257  buffer_it->mem_status = USED;
258  buffer_it->last_touched = buffer_epoch_++;
259  buffer_it->slab_num = slab_num;
260  if (excess_pages > 0) {
261  BufferSeg free_seg(
262  buffer_it->start_page + num_pages_requested, excess_pages, FREE);
263  auto temp_it = buffer_it; // this should make a copy and not be a reference
264  // - as we do not want to increment buffer_it
265  temp_it++;
266  slab_segments_[slab_num].insert(temp_it, free_seg);
267  }
268  return buffer_it;
269  }
270  }
271  // If here then we did not find a free buffer of sufficient size in this slab,
272  // return the end iterator
273  return slab_segments_[slab_num].end();
274 }
275 
276 BufferList::iterator BufferMgr::findFreeBuffer(size_t num_bytes) {
277  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
278  if (num_pages_requested > max_num_pages_per_slab_) {
279  throw TooBigForSlab(num_bytes);
280  }
281 
282  size_t num_slabs = slab_segments_.size();
283 
284  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
285  auto seg_it = findFreeBufferInSlab(slab_num, num_pages_requested);
286  if (seg_it != slab_segments_[slab_num].end()) {
287  return seg_it;
288  }
289  }
290 
291  // If we're here then we didn't find a free segment of sufficient size
292  // First we see if we can add another slab
294  try {
296  if (pagesLeft < current_max_slab_page_size_) {
297  current_max_slab_page_size_ = pagesLeft;
298  }
299  if (num_pages_requested <=
300  current_max_slab_page_size_) { // don't try to allocate if the
301  // new slab won't be big enough
302  auto alloc_ms = measure<>::execution(
304  LOG(INFO) << "ALLOCATION slab of " << current_max_slab_page_size_ << " pages ("
305  << current_max_slab_page_size_ * page_size_ << "B) created in "
306  << alloc_ms << " ms " << getStringMgrType() << ":" << device_id_;
307  } else {
308  break;
309  }
310  // if here then addSlab succeeded
311  num_pages_allocated_ += current_max_slab_page_size_;
312  return findFreeBufferInSlab(
313  num_slabs,
314  num_pages_requested); // has to succeed since we made sure to request a slab
315  // big enough to accomodate request
316  } catch (std::runtime_error& error) { // failed to allocate slab
317  LOG(INFO) << "ALLOCATION Attempted slab of " << current_max_slab_page_size_
318  << " pages (" << current_max_slab_page_size_ * page_size_ << "B) failed "
319  << getStringMgrType() << ":" << device_id_;
320  // check if there is any point halving currentMaxSlabSize and trying again
321  // if the request wont fit in half available then let try once at full size
322  // if we have already tries at full size and failed then break as
323  // there could still be room enough for other later request but
324  // not for his current one
325  if (num_pages_requested > current_max_slab_page_size_ / 2 &&
326  current_max_slab_page_size_ != num_pages_requested) {
327  current_max_slab_page_size_ = num_pages_requested;
328  } else {
331  (min_num_pages_per_slab_)) { // should be a constant
332  allocations_capped_ = true;
333  // dump out the slabs and their sizes
334  LOG(INFO) << "ALLOCATION Capped " << current_max_slab_page_size_
335  << " Minimum size = " << (min_num_pages_per_slab_) << " "
336  << getStringMgrType() << ":" << device_id_;
337  }
338  }
339  }
340  }
341 
343  throw FailedToCreateFirstSlab(num_bytes);
344  }
345 
346  // If here then we can't add a slab - so we need to evict
347 
348  size_t min_score = std::numeric_limits<size_t>::max();
349  // We're going for lowest score here, like golf
350  // This is because score is the sum of the lastTouched score for all pages evicted.
351  // Evicting fewer pages and older pages will lower the score
352  BufferList::iterator best_eviction_start = slab_segments_[0].end();
353  int best_eviction_start_slab = -1;
354  int slab_num = 0;
355 
356  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
357  ++slab_it, ++slab_num) {
358  for (auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) {
359  // Note there are some shortcuts we could take here - like we should never consider
360  // a USED buffer coming after a free buffer as we would have used the FREE buffer,
361  // but we won't worry about this for now
362 
363  // We can't evict pinned buffers - only normal usedbuffers
364 
365  // if (buffer_it->mem_status == FREE || buffer_it->buffer->getPinCount() == 0) {
366  size_t page_count = 0;
367  size_t score = 0;
368  bool solution_found = false;
369  auto evict_it = buffer_it;
370  for (; evict_it != slab_segments_[slab_num].end(); ++evict_it) {
371  // pinCount should never go up - only down because we have
372  // global lock on buffer pool and pin count only increments
373  // on getChunk
374  if (evict_it->mem_status == USED && evict_it->buffer->getPinCount() > 0) {
375  break;
376  }
377  page_count += evict_it->num_pages;
378  if (evict_it->mem_status == USED) {
379  // MAT changed from
380  // score += evictIt->lastTouched;
381  // Issue was thrashing when going from 8M fragment size chunks back to 64M
382  // basically the large chunks were being evicted prior to small as many small
383  // chunk score was larger than one large chunk so it always would evict a large
384  // chunk so under memory pressure a query would evict its own current chunks and
385  // cause reloads rather than evict several smaller unused older chunks.
386  score = std::max(score, static_cast<size_t>(evict_it->last_touched));
387  }
388  if (page_count >= num_pages_requested) {
389  solution_found = true;
390  break;
391  }
392  }
393  if (solution_found && score < min_score) {
394  min_score = score;
395  best_eviction_start = buffer_it;
396  best_eviction_start_slab = slab_num;
397  } else if (evict_it == slab_segments_[slab_num].end()) {
398  // this means that every segment after this will fail as well, so our search has
399  // proven futile
400  // throw std::runtime_error ("Couldn't evict chunks to get free space");
401  break;
402  // in reality we should try to rearrange the buffer to get more contiguous free
403  // space
404  }
405  // other possibility is ending at PINNED - do nothing in this case
406  //}
407  }
408  }
409  if (best_eviction_start == slab_segments_[0].end()) {
410  LOG(ERROR) << "ALLOCATION failed to find " << num_bytes << "B throwing out of memory "
411  << getStringMgrType() << ":" << device_id_;
412  VLOG(2) << printSlabs();
413  throw OutOfMemory(num_bytes);
414  }
415  LOG(INFO) << "ALLOCATION failed to find " << num_bytes << "B free. Forcing Eviction."
416  << " Eviction start " << best_eviction_start->start_page
417  << " Number pages requested " << num_pages_requested
418  << " Best Eviction Start Slab " << best_eviction_start_slab << " "
419  << getStringMgrType() << ":" << device_id_;
420  best_eviction_start =
421  evict(best_eviction_start, num_pages_requested, best_eviction_start_slab);
422  return best_eviction_start;
423 }
424 
425 std::string BufferMgr::printSlab(size_t slab_num) {
426  std::ostringstream tss;
427  // size_t lastEnd = 0;
428  tss << "Slab St.Page Pages Touch" << std::endl;
429  for (auto segment : slab_segments_[slab_num]) {
430  tss << setfill(' ') << setw(4) << slab_num;
431  // tss << " BSN: " << setfill(' ') << setw(2) << segment.slab_num;
432  tss << setfill(' ') << setw(8) << segment.start_page;
433  tss << setfill(' ') << setw(8) << segment.num_pages;
434  // tss << " GAP: " << setfill(' ') << setw(7) << segment.start_page - lastEnd;
435  // lastEnd = segment.start_page + segment.num_pages;
436  tss << setfill(' ') << setw(7) << segment.last_touched;
437  // tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
438  if (segment.mem_status == FREE) {
439  tss << " FREE"
440  << " ";
441  } else {
442  tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
443  tss << " USED - Chunk: ";
444 
445  for (auto&& key_elem : segment.chunk_key) {
446  tss << key_elem << ",";
447  }
448  }
449  tss << std::endl;
450  }
451  return tss.str();
452 }
453 
454 std::string BufferMgr::printSlabs() {
455  std::ostringstream tss;
456  tss << std::endl
457  << "Slabs Contents: "
458  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
459  size_t num_slabs = slab_segments_.size();
460  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
461  tss << printSlab(slab_num);
462  }
463  tss << "--------------------" << std::endl;
464  return tss.str();
465 }
466 
468  bool pinned_exists = false;
469  for (auto& segment_list : slab_segments_) {
470  for (auto& segment : segment_list) {
471  if (segment.mem_status == FREE) {
472  // no need to free
473  } else if (segment.buffer->getPinCount() < 1) {
474  deleteBuffer(segment.chunk_key, true);
475  } else {
476  pinned_exists = true;
477  }
478  }
479  }
480  if (!pinned_exists) {
481  // lets actually clear the buffer from memory
482  freeAllMem();
483  clear();
484  reinit();
485  }
486 }
487 
488 // return the maximum size this buffer can be in bytes
491 }
492 
493 // return how large the buffer are currently allocated
496 }
497 
498 //
500  return allocations_capped_;
501 }
502 
504  return page_size_;
505 }
506 
507 // return the size of the chunks in use in bytes
509  size_t in_use = 0;
510  for (auto& segment_list : slab_segments_) {
511  for (auto& segment : segment_list) {
512  if (segment.mem_status != FREE) {
513  in_use += segment.num_pages * page_size_;
514  }
515  }
516  }
517  return in_use;
518 }
519 
520 std::string BufferMgr::printSeg(BufferList::iterator& seg_it) {
521  std::ostringstream tss;
522  tss << "SN: " << setfill(' ') << setw(2) << seg_it->slab_num;
523  tss << " SP: " << setfill(' ') << setw(7) << seg_it->start_page;
524  tss << " NP: " << setfill(' ') << setw(7) << seg_it->num_pages;
525  tss << " LT: " << setfill(' ') << setw(7) << seg_it->last_touched;
526  tss << " PC: " << setfill(' ') << setw(2) << seg_it->buffer->getPinCount();
527  if (seg_it->mem_status == FREE) {
528  tss << " FREE"
529  << " ";
530  } else {
531  tss << " USED - Chunk: ";
532  for (auto vec_it = seg_it->chunk_key.begin(); vec_it != seg_it->chunk_key.end();
533  ++vec_it) {
534  tss << *vec_it << ",";
535  }
536  tss << std::endl;
537  }
538  return tss.str();
539 }
540 
541 std::string BufferMgr::printMap() {
542  std::ostringstream tss;
543  int seg_num = 1;
544  tss << std::endl
545  << "Map Contents: "
546  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
547  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
548  for (auto seg_it = chunk_index_.begin(); seg_it != chunk_index_.end();
549  ++seg_it, ++seg_num) {
550  // tss << "Map Entry " << seg_num << ": ";
551  // for (auto vec_it = seg_it->first.begin(); vec_it != seg_it->first.end();
552  // ++vec_it)
553  // {
554  // tss << *vec_it << ",";
555  // }
556  // tss << " " << std::endl;
557  tss << printSeg(seg_it->second);
558  }
559  tss << "--------------------" << std::endl;
560  return tss.str();
561 }
562 
564  int seg_num = 1;
565  int slab_num = 1;
566  LOG(INFO) << std::endl << " " << getStringMgrType() << ":" << device_id_;
567  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
568  ++slab_it, ++slab_num) {
569  LOG(INFO) << "Slab Num: " << slab_num << " " << getStringMgrType() << ":"
570  << device_id_;
571  for (auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) {
572  LOG(INFO) << "Segment: " << seg_num << " " << getStringMgrType() << ":"
573  << device_id_;
574  printSeg(seg_it);
575  LOG(INFO) << " " << getStringMgrType() << ":" << device_id_;
576  }
577  LOG(INFO) << "--------------------"
578  << " " << getStringMgrType() << ":" << device_id_;
579  }
580 }
581 
583  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
584  if (chunk_index_.find(key) == chunk_index_.end()) {
585  return false;
586  } else {
587  return true;
588  }
589 }
590 
592 void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) {
593  // Note: purge is unused
594  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
595 
596  // lookup the buffer for the Chunk in chunk_index_
597  auto buffer_it = chunk_index_.find(key);
598  CHECK(buffer_it != chunk_index_.end());
599  auto seg_it = buffer_it->second;
600  chunk_index_.erase(buffer_it);
601  chunk_index_lock.unlock();
602  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
603  if (seg_it->buffer) {
604  delete seg_it->buffer; // Delete Buffer for segment
605  seg_it->buffer = 0;
606  }
607  removeSegment(seg_it);
608 }
609 
610 void BufferMgr::deleteBuffersWithPrefix(const ChunkKey& key_prefix, const bool) {
611  // Note: purge is unused
612  // lookup the buffer for the Chunk in chunk_index_
613  std::lock_guard<std::mutex> sized_segs_lock(
614  sized_segs_mutex_); // Take this lock early to prevent deadlock with
615  // reserveBuffer which needs segs_mutex_ and then
616  // chunk_index_mutex_
617  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
618  auto startChunkIt = chunk_index_.lower_bound(key_prefix);
619  if (startChunkIt == chunk_index_.end()) {
620  return;
621  }
622 
623  auto buffer_it = startChunkIt;
624  while (buffer_it != chunk_index_.end() &&
625  std::search(buffer_it->first.begin(),
626  buffer_it->first.begin() + key_prefix.size(),
627  key_prefix.begin(),
628  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
629  auto seg_it = buffer_it->second;
630  if (seg_it->buffer) {
631  delete seg_it->buffer; // Delete Buffer for segment
632  seg_it->buffer = nullptr;
633  }
634  removeSegment(seg_it);
635  chunk_index_.erase(buffer_it++);
636  }
637 }
638 
639 void BufferMgr::removeSegment(BufferList::iterator& seg_it) {
640  // Note: does not delete buffer as this may be moved somewhere else
641  int slab_num = seg_it->slab_num;
642  // cout << "Slab num: " << slabNum << endl;
643  if (slab_num < 0) {
644  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
645  unsized_segs_.erase(seg_it);
646  } else {
647  if (seg_it != slab_segments_[slab_num].begin()) {
648  auto prev_it = std::prev(seg_it);
649  // LOG(INFO) << "PrevIt: " << " " << getStringMgrType() << ":" << device_id_;
650  // printSeg(prev_it);
651  if (prev_it->mem_status == FREE) {
652  seg_it->start_page = prev_it->start_page;
653  seg_it->num_pages += prev_it->num_pages;
654  slab_segments_[slab_num].erase(prev_it);
655  }
656  }
657  auto next_it = std::next(seg_it);
658  if (next_it != slab_segments_[slab_num].end()) {
659  if (next_it->mem_status == FREE) {
660  seg_it->num_pages += next_it->num_pages;
661  slab_segments_[slab_num].erase(next_it);
662  }
663  }
664  seg_it->mem_status = FREE;
665  // seg_it->pinCount = 0;
666  seg_it->buffer = 0;
667  }
668 }
669 
671  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
672  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
673 
674  for (auto& chunk_itr : chunk_index_) {
675  // checks that buffer is actual chunk (not just buffer) and is dirty
676  auto& buffer_itr = chunk_itr.second;
677  if (buffer_itr->chunk_key[0] != -1 && buffer_itr->buffer->isDirty()) {
678  parent_mgr_->putBuffer(buffer_itr->chunk_key, buffer_itr->buffer);
679  buffer_itr->buffer->clearDirtyBits();
680  }
681  }
682 }
683 
684 void BufferMgr::checkpoint(const int db_id, const int tb_id) {
685  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
686  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
687 
688  ChunkKey key_prefix;
689  key_prefix.push_back(db_id);
690  key_prefix.push_back(tb_id);
691  auto start_chunk_it = chunk_index_.lower_bound(key_prefix);
692  if (start_chunk_it == chunk_index_.end()) {
693  return;
694  }
695 
696  auto buffer_it = start_chunk_it;
697  while (buffer_it != chunk_index_.end() &&
698  std::search(buffer_it->first.begin(),
699  buffer_it->first.begin() + key_prefix.size(),
700  key_prefix.begin(),
701  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
702  if (buffer_it->second->chunk_key[0] != -1 &&
703  buffer_it->second->buffer->isDirty()) { // checks that buffer is actual chunk
704  // (not just buffer) and is dirty
705 
706  parent_mgr_->putBuffer(buffer_it->second->chunk_key, buffer_it->second->buffer);
707  buffer_it->second->buffer->clearDirtyBits();
708  }
709  buffer_it++;
710  }
711 }
712 
715 AbstractBuffer* BufferMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
716  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
717 
718  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
719  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
720  auto buffer_it = chunk_index_.find(key);
721  bool found_buffer = buffer_it != chunk_index_.end();
722  chunk_index_lock.unlock();
723  if (found_buffer) {
724  CHECK(buffer_it->second->buffer);
725  buffer_it->second->buffer->pin();
726  sized_segs_lock.unlock();
727 
728  buffer_it->second->last_touched = buffer_epoch_++; // race
729 
730  if (buffer_it->second->buffer->size() < num_bytes) {
731  // need to fetch part of buffer we don't have - up to numBytes
732  parent_mgr_->fetchBuffer(key, buffer_it->second->buffer, num_bytes);
733  }
734  return buffer_it->second->buffer;
735  } else { // If wasn't in pool then we need to fetch it
736  sized_segs_lock.unlock();
737  // createChunk pins for us
738  AbstractBuffer* buffer = createBuffer(key, page_size_, num_bytes);
739  try {
740  parent_mgr_->fetchBuffer(
741  key, buffer, num_bytes); // this should put buffer in a BufferSegment
742  } catch (const foreign_storage::ForeignStorageException& error) {
743  deleteBuffer(key); // buffer failed to load, ensure it is cleaned up
744  LOG(WARNING) << "Get chunk - Could not load chunk " << keyToString(key)
745  << " from foreign storage. Error was " << error.what();
746  throw error;
747  } catch (const std::exception& error) {
748  LOG(FATAL) << "Get chunk - Could not find chunk " << keyToString(key)
749  << " in buffer pool or parent buffer pools. Error was " << error.what();
750  }
751  return buffer;
752  }
753 }
754 
756  AbstractBuffer* dest_buffer,
757  const size_t num_bytes) {
758  std::unique_lock<std::mutex> lock(global_mutex_); // granular lock
759  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
760  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
761 
762  auto buffer_it = chunk_index_.find(key);
763  bool found_buffer = buffer_it != chunk_index_.end();
764  chunk_index_lock.unlock();
765  AbstractBuffer* buffer;
766  if (!found_buffer) {
767  sized_segs_lock.unlock();
768  CHECK(parent_mgr_ != 0);
769  buffer = createBuffer(key, page_size_, num_bytes); // will pin buffer
770  try {
771  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
772  } catch (std::runtime_error& error) {
773  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
774  }
775  } else {
776  buffer = buffer_it->second->buffer;
777  buffer->pin();
778  if (num_bytes > buffer->size()) {
779  try {
780  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
781  } catch (std::runtime_error& error) {
782  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
783  }
784  }
785  sized_segs_lock.unlock();
786  }
787  lock.unlock();
788  buffer->copyTo(dest_buffer, num_bytes);
789  buffer->unPin();
790 }
791 
793  AbstractBuffer* src_buffer,
794  const size_t num_bytes) {
795  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
796  auto buffer_it = chunk_index_.find(key);
797  bool found_buffer = buffer_it != chunk_index_.end();
798  chunk_index_lock.unlock();
799  AbstractBuffer* buffer;
800  if (!found_buffer) {
801  buffer = createBuffer(key, page_size_);
802  } else {
803  buffer = buffer_it->second->buffer;
804  }
805  size_t old_buffer_size = buffer->size();
806  size_t new_buffer_size = num_bytes == 0 ? src_buffer->size() : num_bytes;
807  CHECK(!buffer->isDirty());
808 
809  if (src_buffer->isUpdated()) {
810  //@todo use dirty flags to only flush pages of chunk that need to
811  // be flushed
812  buffer->write((int8_t*)src_buffer->getMemoryPtr(),
813  new_buffer_size,
814  0,
815  src_buffer->getType(),
816  src_buffer->getDeviceId());
817  } else if (src_buffer->isAppended()) {
818  CHECK(old_buffer_size < new_buffer_size);
819  buffer->append((int8_t*)src_buffer->getMemoryPtr() + old_buffer_size,
820  new_buffer_size - old_buffer_size,
821  src_buffer->getType(),
822  src_buffer->getDeviceId());
823  } else {
824  UNREACHABLE();
825  }
826  src_buffer->clearDirtyBits();
827  buffer->syncEncoder(src_buffer);
828  return buffer;
829 }
830 
832  std::lock_guard<std::mutex> lock(buffer_id_mutex_);
833  return max_buffer_id_++;
834 }
835 
837 AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) {
838  std::lock_guard<std::mutex> lock(global_mutex_);
839  ChunkKey chunk_key = {-1, getBufferId()};
840  return createBuffer(chunk_key, page_size_, num_bytes);
841 }
842 
844  std::lock_guard<std::mutex> lock(global_mutex_); // hack for now
845  Buffer* casted_buffer = dynamic_cast<Buffer*>(buffer);
846  if (casted_buffer == 0) {
847  LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
848  }
849  deleteBuffer(casted_buffer->seg_it_->chunk_key);
850 }
851 
853  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
854  return chunk_index_.size();
855 }
856 
857 size_t BufferMgr::size() {
858  return num_pages_allocated_;
859 }
860 
862  return max_buffer_pool_size_;
863 }
864 
866  return max_slab_size_;
867 }
868 
870  const ChunkKey& key_prefix) {
871  LOG(FATAL) << "getChunkMetadataVecForPrefix not supported for BufferMgr.";
872 }
873 
874 const std::vector<BufferList>& BufferMgr::getSlabSegments() {
875  return slab_segments_;
876 }
877 
878 void BufferMgr::removeTableRelatedDS(const int db_id, const int table_id) {
879  UNREACHABLE();
880 }
881 } // namespace Buffer_Namespace
size_t getAllocated() override
Definition: BufferMgr.cpp:494
AbstractBufferMgr * parent_mgr_
Definition: BufferMgr.h:210
std::vector< int > ChunkKey
Definition: types.h:37
~BufferMgr() override
Destructor.
Definition: BufferMgr.cpp:82
virtual void allocateBuffer(BufferList::iterator seg_it, const size_t page_size, const size_t num_bytes)=0
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t page_size=0, const size_t initial_size=0) override
Creates a chunk with the specified key and page size.
Definition: BufferMgr.cpp:110
BufferList::iterator seg_it_
Definition: Buffer.h:158
BufferList::iterator findFreeBuffer(size_t num_bytes)
Gets a buffer of required size and returns an iterator to it.
Definition: BufferMgr.cpp:276
const size_t max_buffer_pool_size_
Definition: BufferMgr.h:175
size_t getMaxSize() override
Definition: BufferMgr.cpp:489
void syncEncoder(const AbstractBuffer *src_buffer)
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:792
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
size_t getNumChunks() override
Definition: BufferMgr.cpp:852
#define LOG(tag)
Definition: Logger.h:188
virtual void addSlab(const size_t slab_size)=0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
void removeSegment(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:639
unsigned int buffer_epoch_
Definition: BufferMgr.h:212
#define UNREACHABLE()
Definition: Logger.h:241
std::vector< BufferList > slab_segments_
Definition: BufferMgr.h:183
BufferList::iterator findFreeBufferInSlab(const size_t slab_num, const size_t num_pages_requested)
Definition: BufferMgr.cpp:248
size_t size()
Returns the total number of bytes allocated.
Definition: BufferMgr.cpp:857
std::map< ChunkKey, BufferList::iterator > chunk_index_
Definition: BufferMgr.h:203
std::string printSeg(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:520
const size_t min_slab_size_
max number of bytes allocated for the buffer pool
Definition: BufferMgr.h:176
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
Definition: BufferMgr.cpp:193
bool isAllocationCapped() override
Definition: BufferMgr.cpp:499
std::string printSlabs() override
Definition: BufferMgr.cpp:454
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
Definition: BufferMgr.cpp:582
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t num_bytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: BufferMgr.cpp:715
BufferList::iterator evict(BufferList::iterator &evict_start, const size_t num_pages_requested, const int slab_num)
Definition: BufferMgr.cpp:152
void free(AbstractBuffer *buffer) override
Definition: BufferMgr.cpp:843
void checkpoint() override
Definition: BufferMgr.cpp:670
std::mutex unsized_segs_mutex_
Definition: BufferMgr.h:199
void removeTableRelatedDS(const int db_id, const int table_id) override
Definition: BufferMgr.cpp:878
const std::vector< BufferList > & getSlabSegments()
Definition: BufferMgr.cpp:874
void deleteBuffersWithPrefix(const ChunkKey &key_prefix, const bool purge=true) override
Definition: BufferMgr.cpp:610
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void copyTo(AbstractBuffer *destination_buffer, const size_t num_bytes=0)
size_t getInUseSize() override
Definition: BufferMgr.cpp:508
bool g_enable_watchdog false
Definition: Execute.cpp:76
#define CHECK(condition)
Definition: Logger.h:197
AbstractBuffer * alloc(const size_t num_bytes=0) override
client is responsible for deleting memory allocated for b-&gt;mem_
Definition: BufferMgr.cpp:837
const size_t max_slab_size_
Definition: BufferMgr.h:178
std::string printSlab(size_t slab_num)
Definition: BufferMgr.cpp:425
void clearSlabs() override
Definition: BufferMgr.cpp:467
void fetchBuffer(const ChunkKey &key, AbstractBuffer *dest_buffer, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:755
std::string keyToString(const ChunkKey &key)
Definition: BufferMgr.cpp:37
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata_vec, const ChunkKey &key_prefix) override
Definition: BufferMgr.cpp:869
std::vector< int8_t * > slabs_
Definition: BufferMgr.h:181
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: BufferMgr.cpp:592
#define VLOG(n)
Definition: Logger.h:291
Note(s): Forbid Copying Idiom 4.1.
Definition: Buffer.h:42
virtual void freeAllMem()=0