OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
BufferMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
23 
24 #include <algorithm>
25 #include <iomanip>
26 #include <limits>
27 
29 #include "Shared/Logger.h"
30 #include "Shared/measure.h"
31 
32 using namespace std;
33 
34 namespace Buffer_Namespace {
35 
36 std::string BufferMgr::keyToString(const ChunkKey& key) {
37  std::ostringstream oss;
38 
39  oss << " key: ";
40  for (auto sub_key : key) {
41  oss << sub_key << ",";
42  }
43  return oss.str();
44 }
45 
47 BufferMgr::BufferMgr(const int device_id,
48  const size_t max_buffer_size,
49  const size_t max_slab_size,
50  const size_t page_size,
51  AbstractBufferMgr* parent_mgr)
52  : AbstractBufferMgr(device_id)
53  , page_size_(page_size)
54  , max_buffer_size_(max_buffer_size)
55  , num_pages_allocated_(0)
56  , max_slab_size_(max_slab_size)
57  , allocations_capped_(false)
58  , parent_mgr_(parent_mgr)
59  , max_buffer_id_(0)
60  , buffer_epoch_(0) {
61  CHECK(max_buffer_size_ > 0 && max_slab_size_ > 0 && page_size_ > 0 &&
62  max_slab_size_ % page_size_ == 0);
66  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
67  // fail - this is the high water mark
68 }
69 
72  clear();
73 }
74 
78  max_num_pages_per_slab_; // current_max_slab_page_size_ will drop as allocations
79  // fail - this is the high water mark
80  allocations_capped_ = false;
81 }
82 
84  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
85  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
86  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
87  for (auto& buf : chunk_index_) {
88  delete buf.second->buffer;
89  }
90 
91  chunk_index_.clear();
92  slabs_.clear();
93  slab_segments_.clear();
94  unsized_segs_.clear();
95  buffer_epoch_ = 0;
96 }
97 
100  const size_t chunk_page_size,
101  const size_t initial_size) {
102  // LOG(INFO) << printMap();
103  size_t actual_chunk_page_size = chunk_page_size;
104  if (actual_chunk_page_size == 0) {
105  actual_chunk_page_size = page_size_;
106  }
107 
108  // chunk_page_size is just for recording dirty pages
109  {
110  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
111  CHECK(chunk_index_.find(chunk_key) == chunk_index_.end());
112  BufferSeg buffer_seg(BufferSeg(-1, 0, USED));
113  buffer_seg.chunk_key = chunk_key;
114  std::lock_guard<std::mutex> unsizedSegsLock(unsized_segs_mutex_);
115  unsized_segs_.push_back(buffer_seg); // race condition?
116  chunk_index_[chunk_key] =
117  std::prev(unsized_segs_.end(),
118  1); // need to do this before allocating Buffer because doing so could
119  // change the segment used
120  }
121  // following should be safe outside the lock b/c first thing Buffer
122  // constructor does is pin (and its still in unsized segs at this point
123  // so can't be evicted)
124  try {
125  allocateBuffer(chunk_index_[chunk_key], actual_chunk_page_size, initial_size);
126  } catch (const OutOfMemory&) {
127  auto buffer_it = chunk_index_.find(chunk_key);
128  CHECK(buffer_it != chunk_index_.end());
129  buffer_it->second->buffer =
130  nullptr; // constructor failed for the buffer object so make sure to mark it null
131  // so deleteBuffer doesn't try to delete it
132  deleteBuffer(chunk_key);
133  throw;
134  }
135  CHECK(initial_size == 0 || chunk_index_[chunk_key]->buffer->getMemoryPtr());
136  // chunk_index_[chunk_key]->buffer->pin();
137  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
138  return chunk_index_[chunk_key]->buffer;
139 }
140 
141 BufferList::iterator BufferMgr::evict(BufferList::iterator& evict_start,
142  const size_t num_pages_requested,
143  const int slab_num) {
144  // We can assume here that buffer for evictStart either doesn't exist
145  // (evictStart is first buffer) or was not free, so don't need ot merge
146  // it
147  auto evict_it = evict_start;
148  size_t num_pages = 0;
149  size_t start_page = evict_start->start_page;
150  while (num_pages < num_pages_requested) {
151  if (evict_it->mem_status == USED) {
152  CHECK(evict_it->buffer->getPinCount() < 1);
153  }
154  num_pages += evict_it->num_pages;
155  if (evict_it->mem_status == USED && evict_it->chunk_key.size() > 0) {
156  chunk_index_.erase(evict_it->chunk_key);
157  }
158  evict_it = slab_segments_[slab_num].erase(
159  evict_it); // erase operations returns next iterator - safe if we ever move
160  // to a vector (as opposed to erase(evict_it++)
161  }
162  BufferSeg data_seg(
163  start_page, num_pages_requested, USED, buffer_epoch_++); // until we can
164  // data_seg.pinCount++;
165  data_seg.slab_num = slab_num;
166  auto data_seg_it =
167  slab_segments_[slab_num].insert(evict_it, data_seg); // Will insert before evict_it
168  if (num_pages_requested < num_pages) {
169  size_t excess_pages = num_pages - num_pages_requested;
170  if (evict_it != slab_segments_[slab_num].end() &&
171  evict_it->mem_status == FREE) { // need to merge with current page
172  evict_it->start_page = start_page + num_pages_requested;
173  evict_it->num_pages += excess_pages;
174  } else { // need to insert a free seg before evict_it for excess_pages
175  BufferSeg free_seg(start_page + num_pages_requested, excess_pages, FREE);
176  slab_segments_[slab_num].insert(evict_it, free_seg);
177  }
178  }
179  return data_seg_it;
180 }
181 
182 BufferList::iterator BufferMgr::reserveBuffer(
183  BufferList::iterator& seg_it,
184  const size_t num_bytes) { // assumes buffer is already pinned
185 
186  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
187  size_t num_pages_extra_needed = num_pages_requested - seg_it->num_pages;
188 
189  if (num_pages_requested < seg_it->num_pages) {
190  // We already have enough pages in existing segment
191  return seg_it;
192  }
193  // First check for free segment after seg_it
194  int slab_num = seg_it->slab_num;
195  if (slab_num >= 0) { // not dummy page
196  BufferList::iterator next_it = std::next(seg_it);
197  if (next_it != slab_segments_[slab_num].end() && next_it->mem_status == FREE &&
198  next_it->num_pages >= num_pages_extra_needed) {
199  // Then we can just use the next BufferSeg which happens to be free
200  size_t leftover_pages = next_it->num_pages - num_pages_extra_needed;
201  seg_it->num_pages = num_pages_requested;
202  next_it->num_pages = leftover_pages;
203  next_it->start_page = seg_it->start_page + seg_it->num_pages;
204  return seg_it;
205  }
206  }
207  // If we're here then we couldn't keep buffer in existing slot
208  // need to find new segment, copy data over, and then delete old
209  auto new_seg_it = findFreeBuffer(num_bytes);
210 
211  // Below should be in copy constructor for BufferSeg?
212  new_seg_it->buffer = seg_it->buffer;
213  new_seg_it->chunk_key = seg_it->chunk_key;
214  int8_t* old_mem = new_seg_it->buffer->mem_;
215  new_seg_it->buffer->mem_ =
216  slabs_[new_seg_it->slab_num] + new_seg_it->start_page * page_size_;
217 
218  // now need to copy over memory
219  // only do this if the old segment is valid (i.e. not new w/ unallocated buffer
220  if (seg_it->start_page >= 0 && seg_it->buffer->mem_ != 0) {
221  new_seg_it->buffer->writeData(old_mem,
222  new_seg_it->buffer->size(),
223  0,
224  new_seg_it->buffer->getType(),
225  device_id_);
226  }
227  // Decrement pin count to reverse effect above
228  removeSegment(seg_it);
229  {
230  std::lock_guard<std::mutex> lock(chunk_index_mutex_);
231  chunk_index_[new_seg_it->chunk_key] = new_seg_it;
232  }
233 
234  return new_seg_it;
235 }
236 
237 BufferList::iterator BufferMgr::findFreeBufferInSlab(const size_t slab_num,
238  const size_t num_pages_requested) {
239  for (auto buffer_it = slab_segments_[slab_num].begin();
240  buffer_it != slab_segments_[slab_num].end();
241  ++buffer_it) {
242  if (buffer_it->mem_status == FREE && buffer_it->num_pages >= num_pages_requested) {
243  // startPage doesn't change
244  size_t excess_pages = buffer_it->num_pages - num_pages_requested;
245  buffer_it->num_pages = num_pages_requested;
246  buffer_it->mem_status = USED;
247  buffer_it->last_touched = buffer_epoch_++;
248  buffer_it->slab_num = slab_num;
249  if (excess_pages > 0) {
250  BufferSeg free_seg(
251  buffer_it->start_page + num_pages_requested, excess_pages, FREE);
252  auto temp_it = buffer_it; // this should make a copy and not be a reference
253  // - as we do not want to increment buffer_it
254  temp_it++;
255  slab_segments_[slab_num].insert(temp_it, free_seg);
256  }
257  return buffer_it;
258  }
259  }
260  // If here then we did not find a free buffer of sufficient size in this slab,
261  // return the end iterator
262  return slab_segments_[slab_num].end();
263 }
264 
265 BufferList::iterator BufferMgr::findFreeBuffer(size_t num_bytes) {
266  size_t num_pages_requested = (num_bytes + page_size_ - 1) / page_size_;
267  if (num_pages_requested > max_num_pages_per_slab_) {
268  throw TooBigForSlab(num_bytes);
269  }
270 
271  size_t num_slabs = slab_segments_.size();
272 
273  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
274  auto seg_it = findFreeBufferInSlab(slab_num, num_pages_requested);
275  if (seg_it != slab_segments_[slab_num].end()) {
276  return seg_it;
277  }
278  }
279 
280  // If we're here then we didn't find a free segment of sufficient size
281  // First we see if we can add another slab
283  try {
284  size_t pagesLeft = max_num_pages_ - num_pages_allocated_;
285  if (pagesLeft < current_max_slab_page_size_) {
286  current_max_slab_page_size_ = pagesLeft;
287  }
288  if (num_pages_requested <=
289  current_max_slab_page_size_) { // don't try to allocate if the
290  // new slab won't be big enough
291  auto alloc_ms = measure<>::execution(
293  LOG(INFO) << "ALLOCATION slab of " << current_max_slab_page_size_ << " pages ("
294  << current_max_slab_page_size_ * page_size_ << "B) created in "
295  << alloc_ms << " ms " << getStringMgrType() << ":" << device_id_;
296  } else {
297  break;
298  }
299  // if here then addSlab succeeded
300  num_pages_allocated_ += current_max_slab_page_size_;
301  return findFreeBufferInSlab(
302  num_slabs,
303  num_pages_requested); // has to succeed since we made sure to request a slab
304  // big enough to accomodate request
305  } catch (std::runtime_error& error) { // failed to allocate slab
306  LOG(INFO) << "ALLOCATION Attempted slab of " << current_max_slab_page_size_
307  << " pages (" << current_max_slab_page_size_ * page_size_ << "B) failed "
308  << getStringMgrType() << ":" << device_id_;
309  // check if there is any point halving currentMaxSlabSize and trying again
310  // if the request wont fit in half available then let try once at full size
311  // if we have already tries at full size and failed then break as
312  // there could still be room enough for other later request but
313  // not for his current one
314  if (num_pages_requested > current_max_slab_page_size_ / 2 &&
315  current_max_slab_page_size_ != num_pages_requested) {
316  current_max_slab_page_size_ = num_pages_requested;
317  } else {
320  (max_num_pages_per_slab_ / 8)) { // should be a constant
321  allocations_capped_ = true;
322  // dump out the slabs and their sizes
323  LOG(INFO) << "ALLOCATION Capped " << current_max_slab_page_size_
324  << " Minimum size = " << (max_num_pages_per_slab_ / 8) << " "
325  << getStringMgrType() << ":" << device_id_;
326  }
327  }
328  }
329  }
330 
332  throw FailedToCreateFirstSlab(num_bytes);
333  }
334 
335  // If here then we can't add a slab - so we need to evict
336 
337  size_t min_score = std::numeric_limits<size_t>::max();
338  // We're going for lowest score here, like golf
339  // This is because score is the sum of the lastTouched score for all pages evicted.
340  // Evicting fewer pages and older pages will lower the score
341  BufferList::iterator best_eviction_start = slab_segments_[0].end();
342  int best_eviction_start_slab = -1;
343  int slab_num = 0;
344 
345  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
346  ++slab_it, ++slab_num) {
347  for (auto buffer_it = slab_it->begin(); buffer_it != slab_it->end(); ++buffer_it) {
348  // Note there are some shortcuts we could take here - like we should never consider
349  // a USED buffer coming after a free buffer as we would have used the FREE buffer,
350  // but we won't worry about this for now
351 
352  // We can't evict pinned buffers - only normal usedbuffers
353 
354  // if (buffer_it->mem_status == FREE || buffer_it->buffer->getPinCount() == 0) {
355  size_t page_count = 0;
356  size_t score = 0;
357  bool solution_found = false;
358  auto evict_it = buffer_it;
359  for (; evict_it != slab_segments_[slab_num].end(); ++evict_it) {
360  // pinCount should never go up - only down because we have
361  // global lock on buffer pool and pin count only increments
362  // on getChunk
363  if (evict_it->mem_status == USED && evict_it->buffer->getPinCount() > 0) {
364  break;
365  }
366  page_count += evict_it->num_pages;
367  if (evict_it->mem_status == USED) {
368  // MAT changed from
369  // score += evictIt->lastTouched;
370  // Issue was thrashing when going from 8M fragment size chunks back to 64M
371  // basically the large chunks were being evicted prior to small as many small
372  // chunk score was larger than one large chunk so it always would evict a large
373  // chunk so under memory pressure a query would evict its own current chunks and
374  // cause reloads rather than evict several smaller unused older chunks.
375  score = std::max(score, static_cast<size_t>(evict_it->last_touched));
376  }
377  if (page_count >= num_pages_requested) {
378  solution_found = true;
379  break;
380  }
381  }
382  if (solution_found && score < min_score) {
383  min_score = score;
384  best_eviction_start = buffer_it;
385  best_eviction_start_slab = slab_num;
386  } else if (evict_it == slab_segments_[slab_num].end()) {
387  // this means that every segment after this will fail as well, so our search has
388  // proven futile
389  // throw std::runtime_error ("Couldn't evict chunks to get free space");
390  break;
391  // in reality we should try to rearrange the buffer to get more contiguous free
392  // space
393  }
394  // other possibility is ending at PINNED - do nothing in this case
395  //}
396  }
397  }
398  if (best_eviction_start == slab_segments_[0].end()) {
399  LOG(ERROR) << "ALLOCATION failed to find " << num_bytes << "B throwing out of memory "
400  << getStringMgrType() << ":" << device_id_;
401  VLOG(2) << printSlabs();
402  throw OutOfMemory(num_bytes);
403  }
404  LOG(INFO) << "ALLOCATION failed to find " << num_bytes << "B free. Forcing Eviction."
405  << " Eviction start " << best_eviction_start->start_page
406  << " Number pages requested " << num_pages_requested
407  << " Best Eviction Start Slab " << best_eviction_start_slab << " "
408  << getStringMgrType() << ":" << device_id_;
409  best_eviction_start =
410  evict(best_eviction_start, num_pages_requested, best_eviction_start_slab);
411  return best_eviction_start;
412 }
413 
414 std::string BufferMgr::printSlab(size_t slab_num) {
415  std::ostringstream tss;
416  // size_t lastEnd = 0;
417  tss << "Slab St.Page Pages Touch" << std::endl;
418  for (auto segment : slab_segments_[slab_num]) {
419  tss << setfill(' ') << setw(4) << slab_num;
420  // tss << " BSN: " << setfill(' ') << setw(2) << segment.slab_num;
421  tss << setfill(' ') << setw(8) << segment.start_page;
422  tss << setfill(' ') << setw(8) << segment.num_pages;
423  // tss << " GAP: " << setfill(' ') << setw(7) << segment.start_page - lastEnd;
424  // lastEnd = segment.start_page + segment.num_pages;
425  tss << setfill(' ') << setw(7) << segment.last_touched;
426  // tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
427  if (segment.mem_status == FREE) {
428  tss << " FREE"
429  << " ";
430  } else {
431  tss << " PC: " << setfill(' ') << setw(2) << segment.buffer->getPinCount();
432  tss << " USED - Chunk: ";
433 
434  for (auto&& key_elem : segment.chunk_key) {
435  tss << key_elem << ",";
436  }
437  }
438  tss << std::endl;
439  }
440  return tss.str();
441 }
442 
443 std::string BufferMgr::printSlabs() {
444  std::ostringstream tss;
445  tss << std::endl
446  << "Slabs Contents: "
447  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
448  size_t num_slabs = slab_segments_.size();
449  for (size_t slab_num = 0; slab_num != num_slabs; ++slab_num) {
450  tss << printSlab(slab_num);
451  }
452  tss << "--------------------" << std::endl;
453  return tss.str();
454 }
455 
457  bool pinned_exists = false;
458  for (auto& segment_list : slab_segments_) {
459  for (auto& segment : segment_list) {
460  if (segment.mem_status == FREE) {
461  // no need to free
462  } else if (segment.buffer->getPinCount() < 1) {
463  deleteBuffer(segment.chunk_key, true);
464  } else {
465  pinned_exists = true;
466  }
467  }
468  }
469  if (!pinned_exists) {
470  // lets actually clear the buffer from memory
471  freeAllMem();
472  clear();
473  reinit();
474  }
475 }
476 
477 // return the maximum size this buffer can be in bytes
479  return page_size_ * max_num_pages_;
480 }
481 
482 // return how large the buffer are currently allocated
485 }
486 
487 //
489  return allocations_capped_;
490 }
491 
493  return page_size_;
494 }
495 
496 // return the size of the chunks in use in bytes
498  size_t in_use = 0;
499  for (auto& segment_list : slab_segments_) {
500  for (auto& segment : segment_list) {
501  if (segment.mem_status != FREE) {
502  in_use += segment.num_pages * page_size_;
503  }
504  }
505  }
506  return in_use;
507 }
508 
509 std::string BufferMgr::printSeg(BufferList::iterator& seg_it) {
510  std::ostringstream tss;
511  tss << "SN: " << setfill(' ') << setw(2) << seg_it->slab_num;
512  tss << " SP: " << setfill(' ') << setw(7) << seg_it->start_page;
513  tss << " NP: " << setfill(' ') << setw(7) << seg_it->num_pages;
514  tss << " LT: " << setfill(' ') << setw(7) << seg_it->last_touched;
515  tss << " PC: " << setfill(' ') << setw(2) << seg_it->buffer->getPinCount();
516  if (seg_it->mem_status == FREE) {
517  tss << " FREE"
518  << " ";
519  } else {
520  tss << " USED - Chunk: ";
521  for (auto vec_it = seg_it->chunk_key.begin(); vec_it != seg_it->chunk_key.end();
522  ++vec_it) {
523  tss << *vec_it << ",";
524  }
525  tss << std::endl;
526  }
527  return tss.str();
528 }
529 
530 std::string BufferMgr::printMap() {
531  std::ostringstream tss;
532  int seg_num = 1;
533  tss << std::endl
534  << "Map Contents: "
535  << " " << getStringMgrType() << ":" << device_id_ << std::endl;
536  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
537  for (auto seg_it = chunk_index_.begin(); seg_it != chunk_index_.end();
538  ++seg_it, ++seg_num) {
539  // tss << "Map Entry " << seg_num << ": ";
540  // for (auto vec_it = seg_it->first.begin(); vec_it != seg_it->first.end();
541  // ++vec_it)
542  // {
543  // tss << *vec_it << ",";
544  // }
545  // tss << " " << std::endl;
546  tss << printSeg(seg_it->second);
547  }
548  tss << "--------------------" << std::endl;
549  return tss.str();
550 }
551 
553  int seg_num = 1;
554  int slab_num = 1;
555  LOG(INFO) << std::endl << " " << getStringMgrType() << ":" << device_id_;
556  for (auto slab_it = slab_segments_.begin(); slab_it != slab_segments_.end();
557  ++slab_it, ++slab_num) {
558  LOG(INFO) << "Slab Num: " << slab_num << " " << getStringMgrType() << ":"
559  << device_id_;
560  for (auto seg_it = slab_it->begin(); seg_it != slab_it->end(); ++seg_it, ++seg_num) {
561  LOG(INFO) << "Segment: " << seg_num << " " << getStringMgrType() << ":"
562  << device_id_;
563  printSeg(seg_it);
564  LOG(INFO) << " " << getStringMgrType() << ":" << device_id_;
565  }
566  LOG(INFO) << "--------------------"
567  << " " << getStringMgrType() << ":" << device_id_;
568  }
569 }
570 
572  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
573  if (chunk_index_.find(key) == chunk_index_.end()) {
574  return false;
575  } else {
576  return true;
577  }
578 }
579 
581 void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) {
582  // Note: purge is unused
583  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
584 
585  // lookup the buffer for the Chunk in chunk_index_
586  auto buffer_it = chunk_index_.find(key);
587  CHECK(buffer_it != chunk_index_.end());
588  auto seg_it = buffer_it->second;
589  chunk_index_.erase(buffer_it);
590  chunk_index_lock.unlock();
591  std::lock_guard<std::mutex> sized_segs_lock(sized_segs_mutex_);
592  if (seg_it->buffer) {
593  delete seg_it->buffer; // Delete Buffer for segment
594  seg_it->buffer = 0;
595  }
596  removeSegment(seg_it);
597 }
598 
599 void BufferMgr::deleteBuffersWithPrefix(const ChunkKey& key_prefix, const bool) {
600  // Note: purge is unused
601  // lookup the buffer for the Chunk in chunk_index_
602  std::lock_guard<std::mutex> sized_segs_lock(
603  sized_segs_mutex_); // Take this lock early to prevent deadlock with
604  // reserveBuffer which needs segs_mutex_ and then
605  // chunk_index_mutex_
606  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
607  auto startChunkIt = chunk_index_.lower_bound(key_prefix);
608  if (startChunkIt == chunk_index_.end()) {
609  return;
610  }
611 
612  auto buffer_it = startChunkIt;
613  while (buffer_it != chunk_index_.end() &&
614  std::search(buffer_it->first.begin(),
615  buffer_it->first.begin() + key_prefix.size(),
616  key_prefix.begin(),
617  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
618  auto seg_it = buffer_it->second;
619  if (seg_it->buffer) {
620  delete seg_it->buffer; // Delete Buffer for segment
621  seg_it->buffer = 0;
622  }
623  removeSegment(seg_it);
624  chunk_index_.erase(buffer_it++);
625  }
626 }
627 
628 void BufferMgr::removeSegment(BufferList::iterator& seg_it) {
629  // Note: does not delete buffer as this may be moved somewhere else
630  int slab_num = seg_it->slab_num;
631  // cout << "Slab num: " << slabNum << endl;
632  if (slab_num < 0) {
633  std::lock_guard<std::mutex> unsized_segs_lock(unsized_segs_mutex_);
634  unsized_segs_.erase(seg_it);
635  } else {
636  if (seg_it != slab_segments_[slab_num].begin()) {
637  auto prev_it = std::prev(seg_it);
638  // LOG(INFO) << "PrevIt: " << " " << getStringMgrType() << ":" << device_id_;
639  // printSeg(prev_it);
640  if (prev_it->mem_status == FREE) {
641  seg_it->start_page = prev_it->start_page;
642  seg_it->num_pages += prev_it->num_pages;
643  slab_segments_[slab_num].erase(prev_it);
644  }
645  }
646  auto next_it = std::next(seg_it);
647  if (next_it != slab_segments_[slab_num].end()) {
648  if (next_it->mem_status == FREE) {
649  seg_it->num_pages += next_it->num_pages;
650  slab_segments_[slab_num].erase(next_it);
651  }
652  }
653  seg_it->mem_status = FREE;
654  // seg_it->pinCount = 0;
655  seg_it->buffer = 0;
656  }
657 }
658 
660  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
661  std::lock_guard<std::mutex> chunkIndexLock(chunk_index_mutex_);
662 
663  for (auto& chunk_itr : chunk_index_) {
664  // checks that buffer is actual chunk (not just buffer) and is dirty
665  auto& buffer_itr = chunk_itr.second;
666  if (buffer_itr->chunk_key[0] != -1 && buffer_itr->buffer->is_dirty_) {
667  parent_mgr_->putBuffer(buffer_itr->chunk_key, buffer_itr->buffer);
668  buffer_itr->buffer->clearDirtyBits();
669  }
670  }
671 }
672 
673 void BufferMgr::checkpoint(const int db_id, const int tb_id) {
674  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
675  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
676 
677  ChunkKey key_prefix;
678  key_prefix.push_back(db_id);
679  key_prefix.push_back(tb_id);
680  auto start_chunk_it = chunk_index_.lower_bound(key_prefix);
681  if (start_chunk_it == chunk_index_.end()) {
682  return;
683  }
684 
685  auto buffer_it = start_chunk_it;
686  while (buffer_it != chunk_index_.end() &&
687  std::search(buffer_it->first.begin(),
688  buffer_it->first.begin() + key_prefix.size(),
689  key_prefix.begin(),
690  key_prefix.end()) != buffer_it->first.begin() + key_prefix.size()) {
691  if (buffer_it->second->chunk_key[0] != -1 &&
692  buffer_it->second->buffer->is_dirty_) { // checks that buffer is actual chunk
693  // (not just buffer) and is dirty
694 
695  parent_mgr_->putBuffer(buffer_it->second->chunk_key, buffer_it->second->buffer);
696  buffer_it->second->buffer->clearDirtyBits();
697  }
698  buffer_it++;
699  }
700 }
701 
704 AbstractBuffer* BufferMgr::getBuffer(const ChunkKey& key, const size_t num_bytes) {
705  std::lock_guard<std::mutex> lock(global_mutex_); // granular lock
706 
707  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
708  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
709  auto buffer_it = chunk_index_.find(key);
710  bool found_buffer = buffer_it != chunk_index_.end();
711  chunk_index_lock.unlock();
712  if (found_buffer) {
713  CHECK(buffer_it->second->buffer);
714  buffer_it->second->buffer->pin();
715  sized_segs_lock.unlock();
716 
717  buffer_it->second->last_touched = buffer_epoch_++; // race
718 
719  if (buffer_it->second->buffer->size() < num_bytes) {
720  // need to fetch part of buffer we don't have - up to numBytes
721  parent_mgr_->fetchBuffer(key, buffer_it->second->buffer, num_bytes);
722  }
723  return buffer_it->second->buffer;
724  } else { // If wasn't in pool then we need to fetch it
725  sized_segs_lock.unlock();
726  // createChunk pins for us
727  AbstractBuffer* buffer = createBuffer(key, page_size_, num_bytes);
728  try {
729  parent_mgr_->fetchBuffer(
730  key, buffer, num_bytes); // this should put buffer in a BufferSegment
731  } catch (std::runtime_error& error) {
732  LOG(FATAL) << "Get chunk - Could not find chunk " << keyToString(key)
733  << " in buffer pool or parent buffer pools. Error was " << error.what();
734  }
735  return buffer;
736  }
737 }
738 
740  AbstractBuffer* dest_buffer,
741  const size_t num_bytes) {
742  std::unique_lock<std::mutex> lock(global_mutex_); // granular lock
743  std::unique_lock<std::mutex> sized_segs_lock(sized_segs_mutex_);
744  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
745 
746  auto buffer_it = chunk_index_.find(key);
747  bool found_buffer = buffer_it != chunk_index_.end();
748  chunk_index_lock.unlock();
749  AbstractBuffer* buffer;
750  if (!found_buffer) {
751  sized_segs_lock.unlock();
752  CHECK(parent_mgr_ != 0);
753  buffer = createBuffer(key, page_size_, num_bytes); // will pin buffer
754  try {
755  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
756  } catch (std::runtime_error& error) {
757  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
758  }
759  } else {
760  buffer = buffer_it->second->buffer;
761  buffer->pin();
762  if (num_bytes > buffer->size()) {
763  try {
764  parent_mgr_->fetchBuffer(key, buffer, num_bytes);
765  } catch (std::runtime_error& error) {
766  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
767  }
768  }
769  sized_segs_lock.unlock();
770  }
771  size_t chunk_size = num_bytes == 0 ? buffer->size() : num_bytes;
772  lock.unlock();
773  dest_buffer->reserve(chunk_size);
774  if (buffer->isUpdated()) {
775  buffer->read(dest_buffer->getMemoryPtr(),
776  chunk_size,
777  0,
778  dest_buffer->getType(),
779  dest_buffer->getDeviceId());
780  } else {
781  buffer->read(dest_buffer->getMemoryPtr() + dest_buffer->size(),
782  chunk_size - dest_buffer->size(),
783  dest_buffer->size(),
784  dest_buffer->getType(),
785  dest_buffer->getDeviceId());
786  }
787  dest_buffer->setSize(chunk_size);
788  dest_buffer->syncEncoder(buffer);
789  buffer->unPin();
790 }
791 
793  AbstractBuffer* src_buffer,
794  const size_t num_bytes) {
795  std::unique_lock<std::mutex> chunk_index_lock(chunk_index_mutex_);
796  auto buffer_it = chunk_index_.find(key);
797  bool found_buffer = buffer_it != chunk_index_.end();
798  chunk_index_lock.unlock();
799  AbstractBuffer* buffer;
800  if (!found_buffer) {
801  buffer = createBuffer(key, page_size_);
802  } else {
803  buffer = buffer_it->second->buffer;
804  }
805  size_t old_buffer_size = buffer->size();
806  size_t new_buffer_size = num_bytes == 0 ? src_buffer->size() : num_bytes;
807  CHECK(!buffer->isDirty());
808 
809  if (src_buffer->isUpdated()) {
810  //@todo use dirty flags to only flush pages of chunk that need to
811  // be flushed
812  buffer->write((int8_t*)src_buffer->getMemoryPtr(),
813  new_buffer_size,
814  0,
815  src_buffer->getType(),
816  src_buffer->getDeviceId());
817  } else if (src_buffer->isAppended()) {
818  CHECK(old_buffer_size < new_buffer_size);
819  buffer->append((int8_t*)src_buffer->getMemoryPtr() + old_buffer_size,
820  new_buffer_size - old_buffer_size,
821  src_buffer->getType(),
822  src_buffer->getDeviceId());
823  }
824  src_buffer->clearDirtyBits();
825  buffer->syncEncoder(src_buffer);
826  return buffer;
827 }
828 
830  std::lock_guard<std::mutex> lock(buffer_id_mutex_);
831  return max_buffer_id_++;
832 }
833 
835 AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) {
836  std::lock_guard<std::mutex> lock(global_mutex_);
837  ChunkKey chunk_key = {-1, getBufferId()};
838  return createBuffer(chunk_key, page_size_, num_bytes);
839 }
840 
842  std::lock_guard<std::mutex> lock(global_mutex_); // hack for now
843  Buffer* casted_buffer = dynamic_cast<Buffer*>(buffer);
844  if (casted_buffer == 0) {
845  LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
846  }
847  deleteBuffer(casted_buffer->seg_it_->chunk_key);
848 }
849 
851  std::lock_guard<std::mutex> chunk_index_lock(chunk_index_mutex_);
852  return chunk_index_.size();
853 }
854 
855 size_t BufferMgr::size() {
856  return num_pages_allocated_;
857 }
858 
860  return max_buffer_size_;
861 }
862 
864  return max_slab_size_;
865 }
866 
868  std::vector<std::pair<ChunkKey, ChunkMetadata>>& chunk_metadata_vec) {
869  LOG(FATAL) << "getChunkMetadataVec not supported for BufferMgr.";
870 }
871 
873  std::vector<std::pair<ChunkKey, ChunkMetadata>>& chunk_metadata_vec,
874  const ChunkKey& key_prefix) {
875  LOG(FATAL) << "getChunkMetadataVecForPrefix not supported for BufferMgr.";
876 }
877 
878 const std::vector<BufferList>& BufferMgr::getSlabSegments() {
879  return slab_segments_;
880 }
881 } // namespace Buffer_Namespace
size_t getAllocated() override
Definition: BufferMgr.cpp:483
AbstractBufferMgr * parent_mgr_
Definition: BufferMgr.h:207
std::vector< int > ChunkKey
Definition: types.h:35
~BufferMgr() override
Destructor.
Definition: BufferMgr.cpp:71
virtual void allocateBuffer(BufferList::iterator seg_it, const size_t page_size, const size_t num_bytes)=0
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t page_size=0, const size_t initial_size=0) override
Creates a chunk with the specified key and page size.
Definition: BufferMgr.cpp:99
BufferList::iterator seg_it_
Definition: Buffer.h:162
BufferList::iterator findFreeBuffer(size_t num_bytes)
Gets a buffer of required size and returns an iterator to it.
Definition: BufferMgr.cpp:265
void getChunkMetadataVec(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunk_metadata_vec) override
Definition: BufferMgr.cpp:867
size_t getMaxSize() override
Definition: BufferMgr.cpp:478
void syncEncoder(const AbstractBuffer *src_buffer)
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:792
static TimeT::rep execution(F func, Args &&...args)
Definition: sample.cpp:29
size_t getNumChunks() override
Definition: BufferMgr.cpp:850
#define LOG(tag)
Definition: Logger.h:185
virtual void addSlab(const size_t slab_size)=0
virtual size_t size() const =0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
void removeSegment(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:628
unsigned int buffer_epoch_
Definition: BufferMgr.h:209
std::vector< BufferList > slab_segments_
Definition: BufferMgr.h:177
BufferList::iterator findFreeBufferInSlab(const size_t slab_num, const size_t num_pages_requested)
Definition: BufferMgr.cpp:237
size_t size()
Returns the total number of bytes allocated.
Definition: BufferMgr.cpp:855
std::map< ChunkKey, BufferList::iterator > chunk_index_
Definition: BufferMgr.h:198
virtual bool isAppended() const
std::string printSeg(BufferList::iterator &seg_it)
Definition: BufferMgr.cpp:509
virtual bool isDirty() const
virtual void read(int8_t *const dst, const size_t num_bytes, const size_t offset=0, const MemoryLevel dst_buffer_type=CPU_LEVEL, const int dst_device_id=-1)=0
CHECK(cgen_state)
BufferList::iterator reserveBuffer(BufferList::iterator &seg_it, const size_t num_bytes)
Definition: BufferMgr.cpp:182
bool isAllocationCapped() override
Definition: BufferMgr.cpp:488
size_t max_num_pages_
max number of bytes allocated for the buffer pool
Definition: BufferMgr.h:200
std::string printSlabs() override
Definition: BufferMgr.cpp:443
An AbstractBuffer is a unit of data management for a data manager.
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
Definition: BufferMgr.cpp:571
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t num_bytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: BufferMgr.cpp:704
BufferList::iterator evict(BufferList::iterator &evict_start, const size_t num_pages_requested, const int slab_num)
Definition: BufferMgr.cpp:141
virtual bool isUpdated() const
void free(AbstractBuffer *buffer) override
Definition: BufferMgr.cpp:841
void checkpoint() override
Definition: BufferMgr.cpp:659
std::mutex unsized_segs_mutex_
Definition: BufferMgr.h:194
const std::vector< BufferList > & getSlabSegments()
Definition: BufferMgr.cpp:878
void deleteBuffersWithPrefix(const ChunkKey &key_prefix, const bool purge=true) override
Definition: BufferMgr.cpp:599
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
void setSize(const size_t size)
size_t getInUseSize() override
Definition: BufferMgr.cpp:497
bool g_enable_watchdog false
Definition: Execute.cpp:71
AbstractBuffer * alloc(const size_t num_bytes=0) override
client is responsible for deleting memory allocated for b-&gt;mem_
Definition: BufferMgr.cpp:835
std::string printSlab(size_t slab_num)
Definition: BufferMgr.cpp:414
void clearSlabs() override
Definition: BufferMgr.cpp:456
void fetchBuffer(const ChunkKey &key, AbstractBuffer *dest_buffer, const size_t num_bytes=0) override
Definition: BufferMgr.cpp:739
std::string keyToString(const ChunkKey &key)
Definition: BufferMgr.cpp:36
virtual int getDeviceId() const
std::vector< int8_t * > slabs_
Definition: BufferMgr.h:175
virtual void reserve(size_t num_bytes)=0
void getChunkMetadataVecForKeyPrefix(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunk_metadata_vec, const ChunkKey &key_prefix) override
Definition: BufferMgr.cpp:872
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: BufferMgr.cpp:581
#define VLOG(n)
Definition: Logger.h:280
Note(s): Forbid Copying Idiom 4.1.
Definition: Buffer.h:42
virtual void freeAllMem()=0