OmniSciDB  72180abbfe
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BufferMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 
24 #include <algorithm>
25 #include <iomanip>
26 #include <limits>
27 
29 #include "Shared/Logger.h"
30 #include "Shared/measure.h"
31 
32 using namespace std;
33 
34 namespace Buffer_Namespace {
35 
36 std::string BufferMgr::keyToString(const ChunkKey& key) {
37  std::ostringstream oss;
38 
39  oss << " key: ";
40  for (auto sub_key : key) {
41  oss << sub_key << ",";
42  }
43  return oss.str();
44 }
45 
47 BufferMgr::BufferMgr(const int device_id,
48  const size_t max_buffer_pool_size,
49  const size_t min_slab_size,
50  const size_t max_slab_size,
51  const size_t page_size,
52  AbstractBufferMgr* parent_mgr)
53  : AbstractBufferMgr(device_id)
54  , max_buffer_pool_size_(max_buffer_pool_size)
55  , min_slab_size_(min_slab_size)
56  , max_slab_size_(max_slab_size)
57  , page_size_(page_size)
58  , num_pages_allocated_(0)
59  , allocations_capped_(false)
60  , parent_mgr_(parent_mgr)
61  , max_buffer_id_(0)
62  , buffer_epoch_(0) {
64  CHECK(page_size_ > 0);
65  // TODO change checks on run-time configurable slab size variables to exceptions
66  CHECK(min_slab_size_ > 0);
67  CHECK(max_slab_size_ > 0);
71 
76  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
77  // fail - this is the high water mark
78 }
79 
82  clear();
83 }
84 
88  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
89  // fail - this is the high water mark
90  allocations_capped_ = false;
91 }
92 
94  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
95  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
96  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
97  for (auto& buf : chunk_index_) {
98  delete buf.second->buffer;
99  }
100 
101  chunk_index_.clear();
102  slabs_.clear();
103  slab_segments_.clear();
104  unsized_segs_.clear();
105  buffer_epoch_ = 0;
106 }
107 
110  const size_t chunk_page_size,
111  const size_t initial_size) {
112  // LOG(INFO) << printMap();
113  size_t actual_chunk_page_size = chunk_page_size;
114  if (actual_chunk_page_size == 0) {
115  actual_chunk_page_size = page_size_;
116  }
117 
118  // chunk_page_size is just for recording dirty pages
119  {
120  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
121  CHECK(chunk_index_.find(chunk_key) == chunk_index_.end());
122  BufferSeg buffer_seg(BufferSeg(-1, 0, USED));
123  buffer_seg.chunk_key = chunk_key;
124  std::lock_guard<std::mutex> unsizedSegsLock(unsized_segs_mutex_);
125  unsized_segs_.push_back(buffer_seg); // race condition?
126  chunk_index_[chunk_key] =
127  std::prev(unsized_segs_.end(),
128  1); // need to do this before allocating Buffer because doing so could
129  // change the segment used
130  }
131  // following should be safe outside the lock b/c first thing Buffer
132  // constructor does is pin (and its still in unsized segs at this point
133  // so can't be evicted)
134  try {
135  allocateBuffer(chunk_index_[chunk_key], actual_chunk_page_size, initial_size);
136  } catch (const OutOfMemory&) {
137  auto buffer_it = chunk_index_.find(chunk_key);
138  CHECK(buffer_it != chunk_index_.end());
139  buffer_it->second->buffer =
140  nullptr; // constructor failed for the buffer object so make sure to mark it null
141  // so deleteBuffer doesn't try to delete it
142  deleteBuffer(chunk_key);
143  throw;
144  }
145  CHECK(initial_size == 0 || chunk_index_[chunk_key]->buffer->getMemoryPtr());
146  // chunk_index_[chunk_key]->buffer->pin();
147  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
148  return chunk_index_[chunk_key]->buffer;
149 }
150 
151 BufferList::iterator BufferMgr::evict(BufferList::iterator& evict_start,
152  const size_t num_pages_requested,
153  const int slab_num) {
154  // We can assume here that buffer for evictStart either doesn't exist
155  // (evictStart is first buffer) or was not free, so don't need ot merge
156  // it
157  auto evict_it = evict_start;
158  size_t num_pages = 0;
159  size_t start_page = evict_start->start_page;
160  while (num_pages < num_pages_requested) {
161  if (evict_it->mem_status == USED) {
162  CHECK(evict_it->buffer->getPinCount() < 1);
163  }
164  num_pages += evict_it->num_pages;
165  if (evict_it->mem_status == USED && evict_it->chunk_key.size() > 0) {
166  chunk_index_.erase(evict_it->chunk_key);
167  }
168  evict_it = slab_segments_[slab_num].erase(
169  evict_it); // erase operations returns next iterator - safe if we ever move
170  // to a vector (as opposed to erase(evict_it++)
171  }
172  BufferSeg data_seg(
173  start_page, num_pages_requested, USED, buffer_epoch_++); // until we can
174  // data_seg.pinCount++;
175  data_seg.slab_num = slab_num;
176  auto data_seg_it =
177  slab_segments_[slab_num].insert(evict_it, data_seg); // Will insert before evict_it
178  if (num_pages_requested < num_pages) {
179  size_t excess_pages = num_pages - num_pages_requested;
180  if (evict_it != slab_segments_[slab_num].end() &&
181  evict_it->mem_status == FREE) { // need to merge with current page
182  evict_it->start_page = start_page + num_pages_requested;
183  evict_it->num_pages += excess_pages;
184  } else { // need to insert a free seg before evict_it for excess_pages
185  BufferSeg free_seg(start_page + num_pages_requested, excess_pages, FREE);
186  slab_segments_[slab_num].insert(evict_it, free_seg);
187  }
188  }
189  return data_seg_it;
190 }
191 
192 BufferList::iterator BufferMgr::reserveBuffer(
193  BufferList::iterator& seg_it,
194  const size_t num_bytes) { // assumes buffer is already pinned
195 
196  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
197  size_t num_pages_extra_needed = num_pages_requested - seg_it->num_pages;
198 
199  if (num_pages_requested < seg_it->num_pages) {
200  // We already have enough pages in existing segment
201  return seg_it;
202  }
203  // First check for free segment after seg_it
204  int slab_num = seg_it->slab_num;
205  if (slab_num >= 0) { // not dummy page
206  BufferList::iterator next_it = std::next(seg_it);
207  if (next_it != slab_segments_[slab_num].end() && next_it->mem_status == FREE &&
208  next_it->num_pages >= num_pages_extra_needed) {
209  // Then we can just use the next BufferSeg which happens to be free
210  size_t leftover_pages = next_it->num_pages - num_pages_extra_needed;
211  seg_it->num_pages = num_pages_requested;
212  next_it->num_pages = leftover_pages;
213  next_it->start_page = seg_it->start_page + seg_it->num_pages;
214  return seg_it;
215  }
216  }
217  // If we're here then we couldn't keep buffer in existing slot
218  // need to find new segment, copy data over, and then delete old
219  auto new_seg_it = findFreeBuffer(num_bytes);
220 
221  // Below should be in copy constructor for BufferSeg?
222  new_seg_it->buffer = seg_it->buffer;
223  new_seg_it->chunk_key = seg_it->chunk_key;
224  int8_t* old_mem = new_seg_it->buffer->mem_;
225  new_seg_it->buffer->mem_ =
226  slabs_[new_seg_it->slab_num] + new_seg_it->start_page * page_size_;
227 
228  // now need to copy over memory
229  // only do this if the old segment is valid (i.e. not new w/ unallocated buffer
230  if (seg_it->start_page >= 0 && seg_it->buffer->mem_ != 0) {
231  new_seg_it->buffer->writeData(old_mem,
232  new_seg_it->buffer->size(),
233  0,
234  new_seg_it->buffer->getType(),
235  device_id_);
236  }
237  // Decrement pin count to reverse effect above
238  removeSegment(seg_it);
239  {
240  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
241  chunk_index_[new_seg_it->chunk_key] = new_seg_it;
242  }
243 
244  return new_seg_it;
245 }
246 
247 BufferList::iterator BufferMgr::findFreeBufferInSlab(const size_t slab_num,
248  const size_t num_pages_requested) {
249  for (auto buffer_it = slab_segments_[slab_num].begin();
250  buffer_it != slab_segments_[slab_num].end();
251  ++buffer_it) {
252  if (buffer_it->mem_status == FREE && buffer_it->num_pages >= num_pages_requested) {
253  // startPage doesn't change
254  size_t excess_pages = buffer_it->num_pages - num_pages_requested;
255  buffer_it->num_pages = num_pages_requested;
256  buffer_it->mem_status = USED;
257  buffer_it->last_touched = buffer_epoch_++;
258  buffer_it->slab_num = slab_num;
259  if (excess_pages > 0) {
260  BufferSeg free_seg(
261  buffer_it->start_page + num_pages_requested, excess_pages, FREE);
262  auto temp_it = buffer_it; // this should make a copy and not be a reference
263  // - as we do not want to increment buffer_it
264  temp_it++;
265  slab_segments_[slab_num].insert(temp_it, free_seg);
266  }
267  return buffer_it;
268  }
269  }
270  // If here then we did not find a free buffer of sufficient size in this slab,
271  // return the end iterator
272  return slab_segments_[slab_num].end();
273 }
274 
275 BufferList::iterator BufferMgr::findFreeBuffer(size_t num_bytes) {
276  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
277  if (num_pages_requested > max_num_pages_per_slab_) {
278  throw TooBigForSlab(num_bytes);
279  }
280 
281  size_t num_slabs = slab_segments_.size();
282 
283  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
284  auto seg_it = findFreeBufferInSlab(slab_num, num_pages_requested);
285  if (seg_it != slab_segments_[slab_num].end()) {
286  return seg_it;
287  }
288  }
289 
290  // If we're here then we didn't find a free segment of sufficient size
291  // First we see if we can add another slab
293  try {
295  if (pagesLeft < current_max_slab_page_size_) {
296  current_max_slab_page_size_ = pagesLeft;
297  }
298  if (num_pages_requested <=
299  current_max_slab_page_size_) { // don't try to allocate if the
300  // new slab won't be big enough
301  auto alloc_ms = measure<>::execution(
303  LOG(INFO) << "ALLOCATION slab of " << current_max_slab_page_size_ << " pages ("
304  << current_max_slab_page_size_ * page_size_ << "B) created in "
305  << alloc_ms << " ms " << getStringMgrType() << ":" << device_id_;
306  } else {
307  break;
308  }
309  // if here then addSlab succeeded
310  num_pages_allocated_ += current_max_slab_page_size_;
311  return findFreeBufferInSlab(
312  num_slabs,
313  num_pages_requested); // has to succeed since we made sure to request a slab
314  // big enough to accomodate request
315  } catch (std::runtime_error& error) { // failed to allocate slab
316  LOG(INFO) << "ALLOCATION Attempted slab of " << current_max_slab_page_size_
317  << " pages (" << current_max_slab_page_size_ * page_size_ << "B) failed "
318  << getStringMgrType() << ":" << device_id_;
319  // check if there is any point halving currentMaxSlabSize and trying again
320  // if the request wont fit in half available then let try once at full size
321  // if we have already tries at full size and failed then break as
322  // there could still be room enough for other later request but
323  // not for his current one
324  if (num_pages_requested > current_max_slab_page_size_ / 2 &&
325  current_max_slab_page_size_ != num_pages_requested) {
326  current_max_slab_page_size_ = num_pages_requested;
327  } else {
330  (min_num_pages_per_slab_)) { // should be a constant
331  allocations_capped_ = true;
332  // dump out the slabs and their sizes
333  LOG(INFO) << "ALLOCATION Capped " << current_max_slab_page_size_
334  << " Minimum size = " << (min_num_pages_per_slab_) << " "
335  << getStringMgrType() << ":" << device_id_;
336  }
337  }
338  }
339  }
340 
342  throw FailedToCreateFirstSlab(num_bytes);
343  }
344 
345  // If here then we can't add a slab - so we need to evict
346 
347  size_t min_score = std::numeric_limits<size_t>::max();
348  // We're going for lowest score here, like golf
349  // This is because score is the sum of the lastTouched score for all pages evicted.
350  // Evicting fewer pages and older pages will lower the score
351  BufferList::iterator best_eviction_start = slab_segments_[0].end();
352  int best_eviction_start_slab = -1;
353  int slab_num = 0;
354 
355  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
356  ++slab_it, ++slab_num) {
357  for (auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) {
358  // Note there are some shortcuts we could take here - like we should never consider
359  // a USED buffer coming after a free buffer as we would have used the FREE buffer,
360  // but we won't worry about this for now
361 
362  // We can't evict pinned buffers - only normal usedbuffers
363 
364  // if (buffer_it->mem_status == FREE || buffer_it->buffer->getPinCount() == 0) {
365  size_t page_count = 0;
366  size_t score = 0;
367  bool solution_found = false;
368  auto evict_it = buffer_it;
369  for (; evict_it != slab_segments_[slab_num].end(); ++evict_it) {
370  // pinCount should never go up - only down because we have
371  // global lock on buffer pool and pin count only increments
372  // on getChunk
373  if (evict_it->mem_status == USED && evict_it->buffer->getPinCount() > 0) {
374  break;
375  }
376  page_count += evict_it->num_pages;
377  if (evict_it->mem_status == USED) {
378  // MAT changed from
379  // score += evictIt->lastTouched;
380  // Issue was thrashing when going from 8M fragment size chunks back to 64M
381  // basically the large chunks were being evicted prior to small as many small
382  // chunk score was larger than one large chunk so it always would evict a large
383  // chunk so under memory pressure a query would evict its own current chunks and
384  // cause reloads rather than evict several smaller unused older chunks.
385  score = std::max(score, static_cast<size_t>(evict_it->last_touched));
386  }
387  if (page_count >= num_pages_requested) {
388  solution_found = true;
389  break;
390  }
391  }
392  if (solution_found && score < min_score) {
393  min_score = score;
394  best_eviction_start = buffer_it;
395  best_eviction_start_slab = slab_num;
396  } else if (evict_it == slab_segments_[slab_num].end()) {
397  // this means that every segment after this will fail as well, so our search has
398  // proven futile
399  // throw std::runtime_error ("Couldn't evict chunks to get free space");
400  break;
401  // in reality we should try to rearrange the buffer to get more contiguous free
402  // space
403  }
404  // other possibility is ending at PINNED - do nothing in this case
405  //}
406  }
407  }
408  if (best_eviction_start == slab_segments_[0].end()) {
409  LOG(ERROR) << "ALLOCATION failed to find " << num_bytes << "B throwing out of memory "
410  << getStringMgrType() << ":" << device_id_;
411  VLOG(2) << printSlabs();
412  throw OutOfMemory(num_bytes);
413  }
414  LOG(INFO) << "ALLOCATION failed to find " << num_bytes << "B free. Forcing Eviction."
415  << " Eviction start " << best_eviction_start->start_page
416  << " Number pages requested " << num_pages_requested
417  << " Best Eviction Start Slab " << best_eviction_start_slab << " "
418  << getStringMgrType() << ":" << device_id_;
419  best_eviction_start =
420  evict(best_eviction_start, num_pages_requested, best_eviction_start_slab);
421  return best_eviction_start;
422 }
423 
424 std::string BufferMgr::printSlab(size_t slab_num) {
425  std::ostringstream tss;
426  // size_t lastEnd = 0;
427  tss << "Slab St.Page Pages Touch" << std::endl;
428  for (auto segment : slab_segments_[slab_num]) {
429  tss << setfill(' ') << setw(4) << slab_num;
430  // tss << " BSN: " << setfill(' ') << setw(2) << segment.slab_num;
431  tss << setfill(' ') << setw(8) << segment.start_page;
432  tss << setfill(' ') << setw(8) << segment.num_pages;
433  // tss << " GAP: " << setfill(' ') << setw(7) << segment.start_page - lastEnd;
434  // lastEnd = segment.start_page + segment.num_pages;
435  tss << setfill(' ') << setw(7) << segment.last_touched;
436  // tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
437  if (segment.mem_status == FREE) {
438  tss << " FREE"
439  << " ";
440  } else {
441  tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
442  tss << " USED - Chunk: ";
443 
444  for (auto&& key_elem : segment.chunk_key) {
445  tss << key_elem << ",";
446  }
447  }
448  tss << std::endl;
449  }
450  return tss.str();
451 }
452 
453 std::string BufferMgr::printSlabs() {
454  std::ostringstream tss;
455  tss << std::endl
456  << "Slabs Contents: "
457  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
458  size_t num_slabs = slab_segments_.size();
459  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
460  tss << printSlab(slab_num);
461  }
462  tss << "--------------------" << std::endl;
463  return tss.str();
464 }
465 
467  bool pinned_exists = false;
468  for (auto& segment_list : slab_segments_) {
469  for (auto& segment : segment_list) {
470  if (segment.mem_status == FREE) {
471  // no need to free
472  } else if (segment.buffer->getPinCount() < 1) {
473  deleteBuffer(segment.chunk_key, true);
474  } else {
475  pinned_exists = true;
476  }
477  }
478  }
479  if (!pinned_exists) {
480  // lets actually clear the buffer from memory
481  freeAllMem();
482  clear();
483  reinit();
484  }
485 }
486 
487 // return the maximum size this buffer can be in bytes
490 }
491 
492 // return how large the buffer are currently allocated
495 }
496 
497 //
499  return allocations_capped_;
500 }
501 
503  return page_size_;
504 }
505 
506 // return the size of the chunks in use in bytes
508  size_t in_use = 0;
509  for (auto& segment_list : slab_segments_) {
510  for (auto& segment : segment_list) {
511  if (segment.mem_status != FREE) {
512  in_use += segment.num_pages * page_size_;
513  }
514  }
515  }
516  return in_use;
517 }
518 
519 std::string BufferMgr::printSeg(BufferList::iterator& seg_it) {
520  std::ostringstream tss;
521  tss << "SN: " << setfill(' ') << setw(2) << seg_it->slab_num;
522  tss << " SP: " << setfill(' ') << setw(7) << seg_it->start_page;
523  tss << " NP: " << setfill(' ') << setw(7) << seg_it->num_pages;
524  tss << " LT: " << setfill(' ') << setw(7) << seg_it->last_touched;
525  tss << " PC: " << setfill(' ') << setw(2) << seg_it->buffer->getPinCount();
526  if (seg_it->mem_status == FREE) {
527  tss << " FREE"
528  << " ";
529  } else {
530  tss << " USED - Chunk: ";
531  for (auto vec_it = seg_it->chunk_key.begin(); vec_it != seg_it->chunk_key.end();
532  ++vec_it) {
533  tss << *vec_it << ",";
534  }
535  tss << std::endl;
536  }
537  return tss.str();
538 }
539 
540 std::string BufferMgr::printMap() {
541  std::ostringstream tss;
542  int seg_num = 1;
543  tss << std::endl
544  << "Map Contents: "
545  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
546  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
547  for (auto seg_it = chunk_index_.begin(); seg_it != chunk_index_.end();
548  ++seg_it, ++seg_num) {
549  // tss << "Map Entry " << seg_num << ": ";
550  // for (auto vec_it = seg_it->first.begin(); vec_it != seg_it->first.end();
551  // ++vec_it)
552  // {
553  // tss << *vec_it << ",";
554  // }
555  // tss << " " << std::endl;
556  tss << printSeg(seg_it->second);
557  }
558  tss << "--------------------" << std::endl;
559  return tss.str();
560 }
561 
563  int seg_num = 1;
564  int slab_num = 1;
565  LOG(INFO) << std::endl << " " << getStringMgrType() << ":" << device_id_;
566  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
567  ++slab_it, ++slab_num) {
568  LOG(INFO) << "Slab Num: " << slab_num << " " << getStringMgrType() << ":"
569  << device_id_;
570  for (auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) {
571  LOG(INFO) << "Segment: " << seg_num << " " << getStringMgrType() << ":"
572  << device_id_;
573  printSeg(seg_it);
574  LOG(INFO) << " " << getStringMgrType() << ":" << device_id_;
575  }
576  LOG(INFO) << "--------------------"
577  << " " << getStringMgrType() << ":" << device_id_;
578  }
579 }
580 
582  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
583  if (chunk_index_.find(key) == chunk_index_.end()) {
584  return false;
585  } else {
586  return true;
587  }
588 }
589 
591 void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) {
592  // Note: purge is unused
593  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
594 
595  // lookup the buffer for the Chunk in chunk_index_
596  auto buffer_it = chunk_index_.find(key);
597  CHECK(buffer_it != chunk_index_.end());
598  auto seg_it = buffer_it->second;
599  chunk_index_.erase(buffer_it);
600  chunk_index_lock.unlock();
601  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
602  if (seg_it->buffer) {
603  delete seg_it->buffer; // Delete Buffer for segment
604  seg_it->buffer = 0;
605  }
606  removeSegment(seg_it);
607 }
608 
609 void BufferMgr::deleteBuffersWithPrefix(const ChunkKey& key_prefix, const bool) {
610  // Note: purge is unused
611  // lookup the buffer for the Chunk in chunk_index_
612  std::lock_guard<std::mutex> sized_segs_lock(
613  sized_segs_mutex_); // Take this lock early to prevent deadlock with
614  // reserveBuffer which needs segs_mutex_ and then
615  // chunk_index_mutex_
616  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
617  auto startChunkIt = chunk_index_.lower_bound(key_prefix);
618  if (startChunkIt == chunk_index_.end()) {
619  return;
620  }
621 
622  auto buffer_it = startChunkIt;
623  while (buffer_it != chunk_index_.end() &&
624  std::search(buffer_it->first.begin(),
625  buffer_it->first.begin() + key_prefix.size(),
626  key_prefix.begin(),
627  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
628  auto seg_it = buffer_it->second;
629  if (seg_it->buffer) {
630  delete seg_it->buffer; // Delete Buffer for segment
631  seg_it->buffer = 0;
632  }
633  removeSegment(seg_it);
634  chunk_index_.erase(buffer_it++);
635  }
636 }
637 
638 void BufferMgr::removeSegment(BufferList::iterator& seg_it) {
639  // Note: does not delete buffer as this may be moved somewhere else
640  int slab_num = seg_it->slab_num;
641  // cout << "Slab num: " << slabNum << endl;
642  if (slab_num < 0) {
643  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
644  unsized_segs_.erase(seg_it);
645  } else {
646  if (seg_it != slab_segments_[slab_num].begin()) {
647  auto prev_it = std::prev(seg_it);
648  // LOG(INFO) << "PrevIt: " << " " << getStringMgrType() << ":" << device_id_;
649  // printSeg(prev_it);
650  if (prev_it->mem_status == FREE) {
651  seg_it->start_page = prev_it->start_page;
652  seg_it->num_pages += prev_it->num_pages;
653  slab_segments_[slab_num].erase(prev_it);
654  }
655  }
656  auto next_it = std::next(seg_it);
657  if (next_it != slab_segments_[slab_num].end()) {
658  if (next_it->mem_status == FREE) {
659  seg_it->num_pages += next_it->num_pages;
660  slab_segments_[slab_num].erase(next_it);
661  }
662  }
663  seg_it->mem_status = FREE;
664  // seg_it->pinCount = 0;
665  seg_it->buffer = 0;
666  }
667 }
668 
670  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
671  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
672 
673  for (auto& chunk_itr : chunk_index_) {
674  // checks that buffer is actual chunk (not just buffer) and is dirty
675  auto& buffer_itr = chunk_itr.second;
676  if (buffer_itr->chunk_key[0] != -1 && buffer_itr->buffer->is_dirty_) {
677  parent_mgr_->putBuffer(buffer_itr->chunk_key, buffer_itr->buffer);
678  buffer_itr->buffer->clearDirtyBits();
679  }
680  }
681 }
682 
683 void BufferMgr::checkpoint(const int db_id, const int tb_id) {
684  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
685  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
686 
687  ChunkKey key_prefix;
688  key_prefix.push_back(db_id);
689  key_prefix.push_back(tb_id);
690  auto start_chunk_it = chunk_index_.lower_bound(key_prefix);
691  if (start_chunk_it == chunk_index_.end()) {
692  return;
693  }
694 
695  auto buffer_it = start_chunk_it;
696  while (buffer_it != chunk_index_.end() &&
697  std::search(buffer_it->first.begin(),
698  buffer_it->first.begin() + key_prefix.size(),
699  key_prefix.begin(),
700  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
701  if (buffer_it->second->chunk_key[0] != -1 &&
702  buffer_it->second->buffer->is_dirty_) { // checks that buffer is actual chunk
703  // (not just buffer) and is dirty
704 
705  parent_mgr_->putBuffer(buffer_it->second->chunk_key, buffer_it->second->buffer);
706  buffer_it->second->buffer->clearDirtyBits();
707  }
708  buffer_it++;
709  }
710 }
711 
714 AbstractBuffer* BufferMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
715  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
716 
717  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
718  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
719  auto buffer_it = chunk_index_.find(key);
720  bool found_buffer = buffer_it != chunk_index_.end();
721  chunk_index_lock.unlock();
722  if (found_buffer) {
723  CHECK(buffer_it->second->buffer);
724  buffer_it->second->buffer->pin();
725  sized_segs_lock.unlock();
726 
727  buffer_it->second->last_touched = buffer_epoch_++; // race
728 
729  if (buffer_it->second->buffer->size() < num_bytes) {
730  // need to fetch part of buffer we don't have - up to numBytes
731  parent_mgr_->fetchBuffer(key, buffer_it->second->buffer, num_bytes);
732  }
733  return buffer_it->second->buffer;
734  } else { // If wasn't in pool then we need to fetch it
735  sized_segs_lock.unlock();
736  // createChunk pins for us
737  AbstractBuffer* buffer = createBuffer(key, page_size_, num_bytes);
738  try {
739  parent_mgr_->fetchBuffer(
740  key, buffer, num_bytes); // this should put buffer in a BufferSegment
741  } catch (std::runtime_error& error) {
742  LOG(FATAL) << "Get chunk - Could not find chunk " << keyToString(key)
743  << " in buffer pool or parent buffer pools. Error was " << error.what();
744  }
745  return buffer;
746  }
747 }
748 
750  AbstractBuffer* dest_buffer,
751  const size_t num_bytes) {
752  std::unique_lock<std::mutex> lock(global_mutex_); // granular lock
753  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
754  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
755 
756  auto buffer_it = chunk_index_.find(key);
757  bool found_buffer = buffer_it != chunk_index_.end();
758  chunk_index_lock.unlock();
759  AbstractBuffer* buffer;
760  if (!found_buffer) {
761  sized_segs_lock.unlock();
762  CHECK(parent_mgr_ != 0);
763  buffer = createBuffer(key, page_size_, num_bytes); // will pin buffer
764  try {
765  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
766  } catch (std::runtime_error& error) {
767  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
768  }
769  } else {
770  buffer = buffer_it->second->buffer;
771  buffer->pin();
772  if (num_bytes > buffer->size()) {
773  try {
774  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
775  } catch (std::runtime_error& error) {
776  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
777  }
778  }
779  sized_segs_lock.unlock();
780  }
781  size_t chunk_size = num_bytes == 0 ? buffer->size() : num_bytes;
782  lock.unlock();
783  dest_buffer->reserve(chunk_size);
784  if (buffer->isUpdated()) {
785  buffer->read(dest_buffer->getMemoryPtr(),
786  chunk_size,
787  0,
788  dest_buffer->getType(),
789  dest_buffer->getDeviceId());
790  } else {
791  buffer->read(dest_buffer->getMemoryPtr() + dest_buffer->size(),
792  chunk_size - dest_buffer->size(),
793  dest_buffer->size(),
794  dest_buffer->getType(),
795  dest_buffer->getDeviceId());
796  }
797  dest_buffer->setSize(chunk_size);
798  dest_buffer->syncEncoder(buffer);
799  buffer->unPin();
800 }
801 
803  AbstractBuffer* src_buffer,
804  const size_t num_bytes) {
805  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
806  auto buffer_it = chunk_index_.find(key);
807  bool found_buffer = buffer_it != chunk_index_.end();
808  chunk_index_lock.unlock();
809  AbstractBuffer* buffer;
810  if (!found_buffer) {
811  buffer = createBuffer(key, page_size_);
812  } else {
813  buffer = buffer_it->second->buffer;
814  }
815  size_t old_buffer_size = buffer->size();
816  size_t new_buffer_size = num_bytes == 0 ? src_buffer->size() : num_bytes;
817  CHECK(!buffer->isDirty());
818 
819  if (src_buffer->isUpdated()) {
820  //@todo use dirty flags to only flush pages of chunk that need to
821  // be flushed
822  buffer->write((int8_t*)src_buffer->getMemoryPtr(),
823  new_buffer_size,
824  0,
825  src_buffer->getType(),
826  src_buffer->getDeviceId());
827  } else if (src_buffer->isAppended()) {
828  CHECK(old_buffer_size < new_buffer_size);
829  buffer->append((int8_t*)src_buffer->getMemoryPtr() + old_buffer_size,
830  new_buffer_size - old_buffer_size,
831  src_buffer->getType(),
832  src_buffer->getDeviceId());
833  }
834  src_buffer->clearDirtyBits();
835  buffer->syncEncoder(src_buffer);
836  return buffer;
837 }
838 
840  std::lock_guard<std::mutex> lock(buffer_id_mutex_);
841  return max_buffer_id_++;
842 }
843 
845 AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) {
846  std::lock_guard<std::mutex> lock(global_mutex_);
847  ChunkKey chunk_key = {-1, getBufferId()};
848  return createBuffer(chunk_key, page_size_, num_bytes);
849 }
850 
852  std::lock_guard<std::mutex> lock(global_mutex_); // hack for now
853  Buffer* casted_buffer = dynamic_cast<Buffer*>(buffer);
854  if (casted_buffer == 0) {
855  LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
856  }
857  deleteBuffer(casted_buffer->seg_it_->chunk_key);
858 }
859 
861  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
862  return chunk_index_.size();
863 }
864 
865 size_t BufferMgr::size() {
866  return num_pages_allocated_;
867 }
868 
870  return max_buffer_pool_size_;
871 }
872 
874  return max_slab_size_;
875 }
876 
878  LOG(FATAL) << "getChunkMetadataVec not supported for BufferMgr.";
879 }
880 
882  const ChunkKey& key_prefix) {
883  LOG(FATAL) << "getChunkMetadataVecForPrefix not supported for BufferMgr.";
884 }
885 
886 const std::vector<BufferList>& BufferMgr::getSlabSegments() {
887  return slab_segments_;
888 }
889 
890 void BufferMgr::removeTableRelatedDS(const int db_id, const int table_id) {
891  UNREACHABLE();
892 }
893 } // namespace Buffer_Namespace
size_t getAllocated() override
Definition: BufferMgr.cpp:493
AbstractBufferMgr * parent_mgr_
Definition: BufferMgr.h:211
std::vector< int > ChunkKey
Definition: types.h:35
~BufferMgr() override
Destructor.
Definition: BufferMgr.cpp:81
virtual void allocateBuffer(BufferList::iterator seg_it, const size_t page_size, const size_t num_bytes)=0
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t page_size=0, const size_t initial_size=0) override
Creates a chunk with the specified key and page size.
Definition: BufferMgr.cpp:109
BufferList::iterator seg_it_
Definition: Buffer.h:162
BufferList::iterator findFreeBuffer(size_t num_bytes)
Gets a buffer of required size and returns an iterator to it.
Definition: BufferMgr.cpp:275
const size_t max_buffer_pool_size_
Definition: BufferMgr.h:176
size_t getMaxSize() override
Definition: BufferMgr.cpp:488
void syncEncoder(const AbstractBuffer *src_buffer)
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:802
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
size_t getNumChunks() override
Definition: BufferMgr.cpp:860
void getChunkMetadataVec(ChunkMetadataVector &chunk_metadata_vec) override
Definition: BufferMgr.cpp:877
#define LOG(tag)
Definition: Logger.h:188
virtual void addSlab(const size_t slab_size)=0
virtual size_t size() const =0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
void removeSegment(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:638
unsigned int buffer_epoch_
Definition: BufferMgr.h:213
#define UNREACHABLE()
Definition: Logger.h:241
std::vector< BufferList > slab_segments_
Definition: BufferMgr.h:184
BufferList::iterator findFreeBufferInSlab(const size_t slab_num, const size_t num_pages_requested)
Definition: BufferMgr.cpp:247
size_t size()
Returns the total number of bytes allocated.
Definition: BufferMgr.cpp:865
std::map< ChunkKey, BufferList::iterator > chunk_index_
Definition: BufferMgr.h:204
virtual bool isAppended() const
std::string printSeg(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:519
virtual bool isDirty() const
const size_t min_slab_size_
max number of bytes allocated for the buffer pool
Definition: BufferMgr.h:177
virtual void read(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int dst_device_id=-1)=0
CHECK(cgen_state)
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
Definition: BufferMgr.cpp:192
bool isAllocationCapped() override
Definition: BufferMgr.cpp:498
std::string printSlabs() override
Definition: BufferMgr.cpp:453
std::vector< std::pair< ChunkKey, std::shared_ptr< ChunkMetadata >>> ChunkMetadataVector
An AbstractBuffer is a unit of data management for a data manager.
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
Definition: BufferMgr.cpp:581
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t num_bytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: BufferMgr.cpp:714
BufferList::iterator evict(BufferList::iterator &evict_start, const size_t num_pages_requested, const int slab_num)
Definition: BufferMgr.cpp:151
virtual bool isUpdated() const
void free(AbstractBuffer *buffer) override
Definition: BufferMgr.cpp:851
void checkpoint() override
Definition: BufferMgr.cpp:669
std::mutex unsized_segs_mutex_
Definition: BufferMgr.h:200
void removeTableRelatedDS(const int db_id, const int table_id) override
Definition: BufferMgr.cpp:890
const std::vector< BufferList > & getSlabSegments()
Definition: BufferMgr.cpp:886
void deleteBuffersWithPrefix(const ChunkKey &key_prefix, const bool purge=true) override
Definition: BufferMgr.cpp:609
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void setSize(const size_t size)
size_t getInUseSize() override
Definition: BufferMgr.cpp:507
bool g_enable_watchdog false
Definition: Execute.cpp:74
AbstractBuffer * alloc(const size_t num_bytes=0) override
client is responsible for deleting memory allocated for b-&gt;mem_
Definition: BufferMgr.cpp:845
const size_t max_slab_size_
Definition: BufferMgr.h:179
std::string printSlab(size_t slab_num)
Definition: BufferMgr.cpp:424
void clearSlabs() override
Definition: BufferMgr.cpp:466
void fetchBuffer(const ChunkKey &key, AbstractBuffer *dest_buffer, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:749
std::string keyToString(const ChunkKey &key)
Definition: BufferMgr.cpp:36
virtual int getDeviceId() const
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector &chunk_metadata_vec, const ChunkKey &key_prefix) override
Definition: BufferMgr.cpp:881
std::vector< int8_t * > slabs_
Definition: BufferMgr.h:182
virtual void reserve(size_t num_bytes)=0
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: BufferMgr.cpp:591
#define VLOG(n)
Definition: Logger.h:291
Note(s): Forbid Copying Idiom 4.1.
Definition: Buffer.h:42
virtual void freeAllMem()=0