OmniSciDB  04ee39c94c
BufferMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2017 MapD Technologies, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
22 #include "BufferMgr.h"
23 #include "Buffer.h"
24 #include "Shared/Logger.h"
25 #include "Shared/measure.h"
26 
27 #include <algorithm>
28 #include <iomanip>
29 #include <limits>
30 
31 using namespace std;
32 
33 namespace Buffer_Namespace {
34 
35 std::string BufferMgr::keyToString(const ChunkKey& key) {
36  std::ostringstream oss;
37 
38  oss << " key: ";
39  for (auto subKey : key) {
40  oss << subKey << ",";
41  }
42  return oss.str();
43 }
44 
46 BufferMgr::BufferMgr(const int deviceId,
47  const size_t maxBufferSize,
48  const size_t maxSlabSize,
49  const size_t pageSize,
50  AbstractBufferMgr* parentMgr)
51  : AbstractBufferMgr(deviceId)
52  , pageSize_(pageSize)
53  , maxBufferSize_(maxBufferSize)
54  , numPagesAllocated_(0)
55  , maxSlabSize_(maxSlabSize)
56  , allocationsCapped_(false)
57  , parentMgr_(parentMgr)
58  , maxBufferId_(0)
59  , bufferEpoch_(0) {
60  CHECK(maxBufferSize_ > 0 && maxSlabSize_ > 0 && pageSize_ > 0 &&
61  maxSlabSize_ % pageSize_ == 0);
65  maxNumPagesPerSlab_; // currentMaxSlabPageSize_ will drop as allocations fail -
66  // this is the high water mark
67 }
68 
71  clear();
72 }
73 
77  maxNumPagesPerSlab_; // currentMaxSlabPageSize_ will drop as allocations fail -
78  // this is the high water mark
79  allocationsCapped_ = false;
80 }
81 
83  std::lock_guard<std::mutex> sizedSegsLock(sizedSegsMutex_);
84  std::lock_guard<std::mutex> chunkIndexLock(chunkIndexMutex_);
85  std::lock_guard<std::mutex> unsizedSegsLock(unsizedSegsMutex_);
86  for (auto bufferIt = chunkIndex_.begin(); bufferIt != chunkIndex_.end(); ++bufferIt) {
87  delete bufferIt->second->buffer;
88  }
89  chunkIndex_.clear();
90  slabs_.clear();
91  slabSegments_.clear();
92  unsizedSegs_.clear();
93  bufferEpoch_ = 0;
94 }
95 
98  const size_t chunkPageSize,
99  const size_t initialSize) {
100  // LOG(INFO) << printMap();
101  size_t actualChunkPageSize = chunkPageSize;
102  if (actualChunkPageSize == 0) {
103  actualChunkPageSize = pageSize_;
104  }
105 
106  // ChunkPageSize here is just for recording dirty pages
107  {
108  std::lock_guard<std::mutex> lock(chunkIndexMutex_);
109  CHECK(chunkIndex_.find(chunkKey) == chunkIndex_.end());
110  BufferSeg bufferSeg(BufferSeg(-1, 0, USED));
111  bufferSeg.chunkKey = chunkKey;
112  std::lock_guard<std::mutex> unsizedSegsLock(unsizedSegsMutex_);
113  unsizedSegs_.push_back(bufferSeg); // race condition?
114  chunkIndex_[chunkKey] =
115  std::prev(unsizedSegs_.end(),
116  1); // need to do this before allocating Buffer because doing so could
117  // change the segment used
118  }
119  // following should be safe outside the lock b/c first thing Buffer
120  // constructor does is pin (and its still in unsized segs at this point
121  // so can't be evicted)
122  try {
123  allocateBuffer(chunkIndex_[chunkKey], actualChunkPageSize, initialSize);
124  } catch (const OutOfMemory&) {
125  auto bufferIt = chunkIndex_.find(chunkKey);
126  CHECK(bufferIt != chunkIndex_.end());
127  bufferIt->second->buffer =
128  0; // constructor failed for the buffer object so make sure to mark it zero so
129  // deleteBuffer doesn't try to delete it
130  deleteBuffer(chunkKey);
131  throw;
132  }
133  CHECK(initialSize == 0 || chunkIndex_[chunkKey]->buffer->getMemoryPtr());
134  // chunkIndex_[chunkKey]->buffer->pin();
135  std::lock_guard<std::mutex> lock(chunkIndexMutex_);
136  return chunkIndex_[chunkKey]->buffer;
137 }
138 
139 BufferList::iterator BufferMgr::evict(BufferList::iterator& evictStart,
140  const size_t numPagesRequested,
141  const int slabNum) {
142  // We can assume here that buffer for evictStart either doesn't exist
143  // (evictStart is first buffer) or was not free, so don't need ot merge
144  // it
145  auto evictIt = evictStart;
146  size_t numPages = 0;
147  size_t startPage = evictStart->startPage;
148  while (numPages < numPagesRequested) {
149  if (evictIt->memStatus == USED) {
150  CHECK(evictIt->buffer->getPinCount() < 1);
151  }
152  numPages += evictIt->numPages;
153  if (evictIt->memStatus == USED && evictIt->chunkKey.size() > 0) {
154  chunkIndex_.erase(evictIt->chunkKey);
155  }
156  evictIt = slabSegments_[slabNum].erase(
157  evictIt); // erase operations returns next iterator - safe if we ever move
158  // to a vector (as opposed to erase(evictIt++)
159  }
160  BufferSeg dataSeg(startPage, numPagesRequested, USED, bufferEpoch_++); // until we can
161  // dataSeg.pinCount++;
162  dataSeg.slabNum = slabNum;
163  auto dataSegIt =
164  slabSegments_[slabNum].insert(evictIt, dataSeg); // Will insert before evictIt
165  if (numPagesRequested < numPages) {
166  size_t excessPages = numPages - numPagesRequested;
167  if (evictIt != slabSegments_[slabNum].end() &&
168  evictIt->memStatus == FREE) { // need to merge with current page
169  evictIt->startPage = startPage + numPagesRequested;
170  evictIt->numPages += excessPages;
171  } else { // need to insert a free seg before evictIt for excessPages
172  BufferSeg freeSeg(startPage + numPagesRequested, excessPages, FREE);
173  slabSegments_[slabNum].insert(evictIt, freeSeg);
174  }
175  }
176  return dataSegIt;
177 }
178 
179 BufferList::iterator BufferMgr::reserveBuffer(
180  BufferList::iterator& segIt,
181  const size_t numBytes) { // assumes buffer is already pinned
182 
183  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
184  size_t numPagesExtraNeeded = numPagesRequested - segIt->numPages;
185 
186  if (numPagesRequested <
187  segIt->numPages) { // We already have enough pages in existing segment
188  return segIt;
189  }
190  // First check for freeSeg after segIt
191  int slabNum = segIt->slabNum;
192  if (slabNum >= 0) { // not dummy page
193  BufferList::iterator nextIt = std::next(segIt);
194  if (nextIt != slabSegments_[slabNum].end() && nextIt->memStatus == FREE &&
195  nextIt->numPages >= numPagesExtraNeeded) { // Then we can just use the next
196  // BufferSeg which happens to be free
197  size_t leftoverPages = nextIt->numPages - numPagesExtraNeeded;
198  segIt->numPages = numPagesRequested;
199  nextIt->numPages = leftoverPages;
200  nextIt->startPage = segIt->startPage + segIt->numPages;
201  return segIt;
202  }
203  }
204  /* If we're here then we couldn't keep
205  * buffer in existing slot - need to find
206  * new segment, copy data over, and then
207  * delete old
208  */
209 
210  auto newSegIt = findFreeBuffer(numBytes);
211 
212  /* Below should be in copy constructor for BufferSeg?*/
213  newSegIt->buffer = segIt->buffer;
214  // newSegIt->buffer->segIt_ = newSegIt;
215  newSegIt->chunkKey = segIt->chunkKey;
216  int8_t* oldMem = newSegIt->buffer->mem_;
217  newSegIt->buffer->mem_ = slabs_[newSegIt->slabNum] + newSegIt->startPage * pageSize_;
218 
219  // now need to copy over memory
220  // only do this if the old segment is valid (i.e. not new w/
221  // unallocated buffer
222  if (segIt->startPage >= 0 && segIt->buffer->mem_ != 0) {
223  newSegIt->buffer->writeData(
224  oldMem, newSegIt->buffer->size(), 0, newSegIt->buffer->getType(), deviceId_);
225  }
226  // Deincrement pin count to reverse effect above
227  removeSegment(segIt);
228  {
229  std::lock_guard<std::mutex> lock(chunkIndexMutex_);
230  chunkIndex_[newSegIt->chunkKey] = newSegIt;
231  }
232 
233  return newSegIt;
234 }
235 
236 BufferList::iterator BufferMgr::findFreeBufferInSlab(const size_t slabNum,
237  const size_t numPagesRequested) {
238  for (auto bufferIt = slabSegments_[slabNum].begin();
239  bufferIt != slabSegments_[slabNum].end();
240  ++bufferIt) {
241  if (bufferIt->memStatus == FREE && bufferIt->numPages >= numPagesRequested) {
242  // startPage doesn't change
243  size_t excessPages = bufferIt->numPages - numPagesRequested;
244  bufferIt->numPages = numPagesRequested;
245  bufferIt->memStatus = USED;
246  bufferIt->lastTouched = bufferEpoch_++;
247  bufferIt->slabNum = slabNum;
248  if (excessPages > 0) {
249  BufferSeg freeSeg(bufferIt->startPage + numPagesRequested, excessPages, FREE);
250  auto tempIt = bufferIt; // this should make a copy and not be a reference
251  // - as we do not want to increment bufferIt
252  tempIt++;
253  slabSegments_[slabNum].insert(tempIt, freeSeg);
254  }
255  return bufferIt;
256  }
257  }
258  // If here then we did not find a free buffer of
259  // sufficient size in
260  // this slab, return the end iterator
261  return slabSegments_[slabNum].end();
262 }
263 
264 BufferList::iterator BufferMgr::findFreeBuffer(size_t numBytes) {
265  size_t numPagesRequested = (numBytes + pageSize_ - 1) / pageSize_;
266  if (numPagesRequested > maxNumPagesPerSlab_) {
267  throw TooBigForSlab(numBytes);
268  }
269 
270  size_t numSlabs = slabSegments_.size();
271 
272  for (size_t slabNum = 0; slabNum != numSlabs; ++slabNum) {
273  auto segIt = findFreeBufferInSlab(slabNum, numPagesRequested);
274  if (segIt != slabSegments_[slabNum].end()) {
275  return segIt;
276  }
277  }
278 
279  // If we're here then we didn't find a free segment of sufficient size
280  // First we see if we can add another slab
282  try {
283  size_t pagesLeft = maxNumPages_ - numPagesAllocated_;
284  if (pagesLeft < currentMaxSlabPageSize_) {
285  currentMaxSlabPageSize_ = pagesLeft;
286  }
287  if (numPagesRequested <= currentMaxSlabPageSize_) { // don't try to allocate if the
288  // new slab won't be big enough
289  auto alloc_ms =
291  LOG(INFO) << "ALLOCATION slab of " << currentMaxSlabPageSize_ << " pages ("
292  << currentMaxSlabPageSize_ * pageSize_ << "B) created in " << alloc_ms
293  << " ms " << getStringMgrType() << ":" << deviceId_;
294  } else {
295  break;
296  }
297  // if here then addSlab succeeded
298  numPagesAllocated_ += currentMaxSlabPageSize_;
299  return findFreeBufferInSlab(
300  numSlabs,
301  numPagesRequested); // has to succeed since we made sure to request a slab big
302  // enough to accomodate request
303  } catch (std::runtime_error& error) { // failed to allocate slab
304  LOG(INFO) << "ALLOCATION Attempted slab of " << currentMaxSlabPageSize_
305  << " pages (" << currentMaxSlabPageSize_ * pageSize_ << "B) failed "
306  << getStringMgrType() << ":" << deviceId_;
307  // check if there is any point halving currentMaxSlabSize and trying again
308  // if the request wont fit in half available then let try once at full size
309  // if we have already tries at full size and failed then break as
310  // there could still be room enough for other later request but
311  // not for his current one
312  if (numPagesRequested > currentMaxSlabPageSize_ / 2 &&
313  currentMaxSlabPageSize_ != numPagesRequested) {
314  currentMaxSlabPageSize_ = numPagesRequested;
315  } else {
318  (maxNumPagesPerSlab_ / 8)) { // should be a constant
319  allocationsCapped_ = true;
320  // dump out the slabs and their sizes
321  LOG(INFO) << "ALLOCATION Capped " << currentMaxSlabPageSize_
322  << " Minimum size = " << (maxNumPagesPerSlab_ / 8) << " "
323  << getStringMgrType() << ":" << deviceId_;
324  }
325  }
326  }
327  }
328 
330  throw FailedToCreateFirstSlab(numBytes);
331  }
332 
333  // If here then we can't add a slab - so we need to evict
334 
335  size_t minScore = std::numeric_limits<size_t>::max();
336  // We're going for lowest score here, like golf
337  // This is because score is the sum of the lastTouched score for all
338  // pages evicted. Evicting less pages and older pages will lower the
339  // score
340  BufferList::iterator bestEvictionStart = slabSegments_[0].end();
341  int bestEvictionStartSlab = -1;
342  int slabNum = 0;
343 
344  for (auto slabIt = slabSegments_.begin(); slabIt != slabSegments_.end();
345  ++slabIt, ++slabNum) {
346  for (auto bufferIt = slabIt->begin(); bufferIt != slabIt->end(); ++bufferIt) {
347  /* Note there are some shortcuts we could take here - like we
348  * should never consider a USED buffer coming after a free buffer
349  * as we would have used the FREE buffer, but we won't worry about
350  * this for now
351  */
352 
353  // We can't evict pinned buffers - only normal used
354  // buffers
355 
356  // if (bufferIt->memStatus == FREE || bufferIt->buffer->getPinCount() == 0) {
357  size_t pageCount = 0;
358  size_t score = 0;
359  bool solutionFound = false;
360  auto evictIt = bufferIt;
361  for (; evictIt != slabSegments_[slabNum].end(); ++evictIt) {
362  // pinCount should never go up - only down because we have
363  // global lock on buffer pool and pin count only increments
364  // on getChunk
365  if (evictIt->memStatus == USED && evictIt->buffer->getPinCount() > 0) {
366  break;
367  }
368  pageCount += evictIt->numPages;
369  if (evictIt->memStatus == USED) {
370  // MAT changed from
371  // score += evictIt->lastTouched;
372  // Issue was thrashing when going from 8M fragment size chunks back to 64M
373  // basically the large chunks were being evicted prior to small as many small
374  // chunk score was larger than one large chunk so it always would evict a large
375  // chunk so under memory pressure a query would evict its own current chunks and
376  // cause reloads rather than evict several smaller unused older chunks.
377  score = std::max(score, static_cast<size_t>(evictIt->lastTouched));
378  }
379  if (pageCount >= numPagesRequested) {
380  solutionFound = true;
381  break;
382  }
383  }
384  if (solutionFound && score < minScore) {
385  minScore = score;
386  bestEvictionStart = bufferIt;
387  bestEvictionStartSlab = slabNum;
388  } else if (evictIt == slabSegments_[slabNum].end()) {
389  // this means that every segment after this will fail as
390  // well, so our search has proven futile
391  // throw std::runtime_error ("Couldn't evict chunks to get free space");
392  break;
393  // in reality we should try to rearrange the buffer to get
394  // more contiguous free space
395  }
396  // other possibility is ending at PINNED - do nothing in this
397  // case
398  //}
399  }
400  }
401  if (bestEvictionStart == slabSegments_[0].end()) {
402  LOG(ERROR) << "ALLOCATION failed to find " << numBytes << "B throwing out of memory "
403  << getStringMgrType() << ":" << deviceId_;
404  VLOG(2) << printSlabs();
405  throw OutOfMemory(numBytes);
406  }
407  LOG(INFO) << "ALLOCATION failed to find " << numBytes << "B free. Forcing Eviction."
408  << " Eviction start " << bestEvictionStart->startPage
409  << " Number pages requested " << numPagesRequested
410  << " Best Eviction Start Slab " << bestEvictionStartSlab << " "
411  << getStringMgrType() << ":" << deviceId_;
412  bestEvictionStart = evict(bestEvictionStart, numPagesRequested, bestEvictionStartSlab);
413  return bestEvictionStart;
414 }
415 
416 std::string BufferMgr::printSlab(size_t slabNum) {
417  std::ostringstream tss;
418  // size_t lastEnd = 0;
419  tss << "Slab St.Page Pages Touch" << std::endl;
420  for (auto segIt = slabSegments_[slabNum].begin(); segIt != slabSegments_[slabNum].end();
421  ++segIt) {
422  tss << setfill(' ') << setw(4) << slabNum;
423  // tss << " BSN: " << setfill(' ') << setw(2) << segIt->slabNum;
424  tss << setfill(' ') << setw(8) << segIt->startPage;
425  tss << setfill(' ') << setw(8) << segIt->numPages;
426  // tss << " GAP: " << setfill(' ') << setw(7) << segIt->startPage - lastEnd;
427  // lastEnd = segIt->startPage + segIt->numPages;
428  tss << setfill(' ') << setw(7) << segIt->lastTouched;
429  // tss << " PC: " << setfill(' ') << setw(2) << segIt->buffer->getPinCount();
430  if (segIt->memStatus == FREE) {
431  tss << " FREE"
432  << " ";
433  } else {
434  tss << " PC: " << setfill(' ') << setw(2) << segIt->buffer->getPinCount();
435  tss << " USED - Chunk: ";
436 
437  for (auto vecIt = segIt->chunkKey.begin(); vecIt != segIt->chunkKey.end();
438  ++vecIt) {
439  tss << *vecIt << ",";
440  }
441  }
442  tss << std::endl;
443  }
444  return tss.str();
445 }
446 
447 std::string BufferMgr::printSlabs() {
448  std::ostringstream tss;
449  tss << std::endl
450  << "Slabs Contents: "
451  << " " << getStringMgrType() << ":" << deviceId_ << std::endl;
452  size_t numSlabs = slabSegments_.size();
453  for (size_t slabNum = 0; slabNum != numSlabs; ++slabNum) {
454  tss << printSlab(slabNum);
455  }
456  tss << "--------------------" << std::endl;
457  return tss.str();
458 }
459 
461  bool pinnedExists = false;
462  size_t numSlabs = slabSegments_.size();
463  for (size_t slabNum = 0; slabNum != numSlabs; ++slabNum) {
464  for (auto segIt = slabSegments_[slabNum].begin();
465  segIt != slabSegments_[slabNum].end();
466  ++segIt) {
467  if (segIt->memStatus == FREE) {
468  // no need to free
469  } else if (segIt->buffer->getPinCount() < 1) {
470  deleteBuffer(segIt->chunkKey, true);
471  } else {
472  pinnedExists = true;
473  }
474  }
475  }
476  if (!pinnedExists) {
477  // lets actually clear the buffer from memory
478  freeAllMem();
479  clear();
480  reinit();
481  }
482 }
483 
484 // return the maximum size this buffer can be in bytes
486  return pageSize_ * maxNumPages_;
487 }
488 
489 // return how large the buffer are currently allocated
491  return numPagesAllocated_ * pageSize_;
492 }
493 
494 //
496  return allocationsCapped_;
497 }
498 
500  return pageSize_;
501 }
502 
503 // return the size of the chunks in use in bytes
505  size_t inUse = 0;
506  size_t numSlabs = slabSegments_.size();
507  for (size_t slabNum = 0; slabNum != numSlabs; ++slabNum) {
508  for (auto segIt = slabSegments_[slabNum].begin();
509  segIt != slabSegments_[slabNum].end();
510  ++segIt) {
511  if (segIt->memStatus != FREE) {
512  inUse += segIt->numPages * pageSize_;
513  }
514  }
515  }
516  return inUse;
517 }
518 
519 std::string BufferMgr::printSeg(BufferList::iterator& segIt) {
520  std::ostringstream tss;
521  tss << "SN: " << setfill(' ') << setw(2) << segIt->slabNum;
522  tss << " SP: " << setfill(' ') << setw(7) << segIt->startPage;
523  tss << " NP: " << setfill(' ') << setw(7) << segIt->numPages;
524  tss << " LT: " << setfill(' ') << setw(7) << segIt->lastTouched;
525  tss << " PC: " << setfill(' ') << setw(2) << segIt->buffer->getPinCount();
526  if (segIt->memStatus == FREE) {
527  tss << " FREE"
528  << " ";
529  } else {
530  tss << " USED - Chunk: ";
531  for (auto vecIt = segIt->chunkKey.begin(); vecIt != segIt->chunkKey.end(); ++vecIt) {
532  tss << *vecIt << ",";
533  }
534  tss << std::endl;
535  }
536  return tss.str();
537 }
538 
539 std::string BufferMgr::printMap() {
540  std::ostringstream tss;
541  int segNum = 1;
542  tss << std::endl
543  << "Map Contents: "
544  << " " << getStringMgrType() << ":" << deviceId_ << std::endl;
545  std::lock_guard<std::mutex> chunkIndexLock(chunkIndexMutex_);
546  for (auto segIt = chunkIndex_.begin(); segIt != chunkIndex_.end(); ++segIt, ++segNum) {
547  // tss << "Map Entry " << segNum << ": ";
548  // for (auto vecIt = segIt->first.begin(); vecIt != segIt->first.end(); ++vecIt) {
549  // tss << *vecIt << ",";
550  // }
551  // tss << " " << std::endl;
552  tss << printSeg(segIt->second);
553  }
554  tss << "--------------------" << std::endl;
555  return tss.str();
556 }
557 
559  int segNum = 1;
560  int slabNum = 1;
561  LOG(INFO) << std::endl << " " << getStringMgrType() << ":" << deviceId_;
562  for (auto slabIt = slabSegments_.begin(); slabIt != slabSegments_.end();
563  ++slabIt, ++slabNum) {
564  LOG(INFO) << "Slab Num: " << slabNum << " " << getStringMgrType() << ":" << deviceId_;
565  for (auto segIt = slabIt->begin(); segIt != slabIt->end(); ++segIt, ++segNum) {
566  LOG(INFO) << "Segment: " << segNum << " " << getStringMgrType() << ":" << deviceId_;
567  printSeg(segIt);
568  LOG(INFO) << " " << getStringMgrType() << ":" << deviceId_;
569  }
570  LOG(INFO) << "--------------------"
571  << " " << getStringMgrType() << ":" << deviceId_;
572  }
573 }
574 
576  std::lock_guard<std::mutex> chunkIndexLock(chunkIndexMutex_);
577  if (chunkIndex_.find(key) == chunkIndex_.end()) {
578  return false;
579  } else {
580  return true;
581  }
582 }
583 
585 void BufferMgr::deleteBuffer(const ChunkKey& key, const bool purge) {
586  std::unique_lock<std::mutex> chunkIndexLock(chunkIndexMutex_);
587  // Note: purge is currently unused
588 
589  // lookup the buffer for the Chunk in chunkIndex_
590  auto bufferIt = chunkIndex_.find(key);
591  // Buffer *buffer = bufferIt->second->buffer;
592  CHECK(bufferIt != chunkIndex_.end());
593  auto segIt = bufferIt->second;
594  chunkIndex_.erase(bufferIt);
595  chunkIndexLock.unlock();
596  std::lock_guard<std::mutex> sizedSegsLock(sizedSegsMutex_);
597  if (segIt->buffer) {
598  delete segIt->buffer; // Delete Buffer for segment
599  segIt->buffer = 0;
600  }
601  removeSegment(segIt);
602 }
603 
604 void BufferMgr::deleteBuffersWithPrefix(const ChunkKey& keyPrefix, const bool purge) {
605  // Note: purge is unused
606  // lookup the buffer for the Chunk in chunkIndex_
607  std::lock_guard<std::mutex> sizedSegsLock(
608  sizedSegsMutex_); // Take this lock early to prevent deadlock with
609  // reserveBuffer which needs segsMutex_ and then
610  // chunkIndexMutex_
611  std::lock_guard<std::mutex> chunkIndexLock(chunkIndexMutex_);
612  auto startChunkIt = chunkIndex_.lower_bound(keyPrefix);
613  if (startChunkIt == chunkIndex_.end()) {
614  return;
615  }
616 
617  auto bufferIt = startChunkIt;
618  while (bufferIt != chunkIndex_.end() &&
619  std::search(bufferIt->first.begin(),
620  bufferIt->first.begin() + keyPrefix.size(),
621  keyPrefix.begin(),
622  keyPrefix.end()) != bufferIt->first.begin() + keyPrefix.size()) {
623  auto segIt = bufferIt->second;
624  if (segIt->buffer) {
625  delete segIt->buffer; // Delete Buffer for segment
626  segIt->buffer = 0;
627  }
628  removeSegment(segIt);
629  chunkIndex_.erase(bufferIt++);
630  }
631 }
632 
633 void BufferMgr::removeSegment(BufferList::iterator& segIt) {
634  // Note: does not delete buffer as this may be moved somewhere else
635  int slabNum = segIt->slabNum;
636  // cout << "Slab num: " << slabNum << endl;
637  if (slabNum < 0) {
638  std::lock_guard<std::mutex> unsizedSegsLock(unsizedSegsMutex_);
639  unsizedSegs_.erase(segIt);
640  } else {
641  if (segIt != slabSegments_[slabNum].begin()) {
642  auto prevIt = std::prev(segIt);
643  // LOG(INFO) << "PrevIt: " << " " << getStringMgrType() << ":" << deviceId_;
644  // printSeg(prevIt);
645  if (prevIt->memStatus == FREE) {
646  segIt->startPage = prevIt->startPage;
647  segIt->numPages += prevIt->numPages;
648  slabSegments_[slabNum].erase(prevIt);
649  }
650  }
651  auto nextIt = std::next(segIt);
652  if (nextIt != slabSegments_[slabNum].end()) {
653  if (nextIt->memStatus == FREE) {
654  segIt->numPages += nextIt->numPages;
655  slabSegments_[slabNum].erase(nextIt);
656  }
657  }
658  segIt->memStatus = FREE;
659  // segIt->pinCount = 0;
660  segIt->buffer = 0;
661  }
662 }
663 
665  std::lock_guard<std::mutex> lock(globalMutex_); // granular lock
666  std::lock_guard<std::mutex> chunkIndexLock(chunkIndexMutex_);
667 
668  for (auto bufferIt = chunkIndex_.begin(); bufferIt != chunkIndex_.end(); ++bufferIt) {
669  if (bufferIt->second->chunkKey[0] != -1 &&
670  bufferIt->second->buffer->isDirty_) { // checks that buffer is actual chunk (not
671  // just buffer) and is dirty
672 
673  parentMgr_->putBuffer(bufferIt->second->chunkKey, bufferIt->second->buffer);
674  bufferIt->second->buffer->clearDirtyBits();
675  }
676  }
677 }
678 
679 void BufferMgr::checkpoint(const int db_id, const int tb_id) {
680  std::lock_guard<std::mutex> lock(globalMutex_); // granular lock
681  std::lock_guard<std::mutex> chunkIndexLock(chunkIndexMutex_);
682 
683  ChunkKey keyPrefix;
684  keyPrefix.push_back(db_id);
685  keyPrefix.push_back(tb_id);
686  auto startChunkIt = chunkIndex_.lower_bound(keyPrefix);
687  if (startChunkIt == chunkIndex_.end()) {
688  return;
689  }
690 
691  auto bufferIt = startChunkIt;
692  while (bufferIt != chunkIndex_.end() &&
693  std::search(bufferIt->first.begin(),
694  bufferIt->first.begin() + keyPrefix.size(),
695  keyPrefix.begin(),
696  keyPrefix.end()) != bufferIt->first.begin() + keyPrefix.size()) {
697  if (bufferIt->second->chunkKey[0] != -1 &&
698  bufferIt->second->buffer->isDirty_) { // checks that buffer is actual chunk (not
699  // just buffer) and is dirty
700 
701  parentMgr_->putBuffer(bufferIt->second->chunkKey, bufferIt->second->buffer);
702  bufferIt->second->buffer->clearDirtyBits();
703  }
704  bufferIt++;
705  }
706 }
707 
710 AbstractBuffer* BufferMgr::getBuffer(const ChunkKey& key, const size_t numBytes) {
711  std::lock_guard<std::mutex> lock(globalMutex_); // granular lock
712 
713  std::unique_lock<std::mutex> sizedSegsLock(sizedSegsMutex_);
714  std::unique_lock<std::mutex> chunkIndexLock(chunkIndexMutex_);
715  auto bufferIt = chunkIndex_.find(key);
716  bool foundBuffer = bufferIt != chunkIndex_.end();
717  chunkIndexLock.unlock();
718  if (foundBuffer) {
719  CHECK(bufferIt->second->buffer);
720  bufferIt->second->buffer->pin();
721  sizedSegsLock.unlock();
722  bufferIt->second->lastTouched = bufferEpoch_++; // race
723  if (bufferIt->second->buffer->size() <
724  numBytes) { // need to fetch part of buffer we don't have - up to numBytes
725  parentMgr_->fetchBuffer(key, bufferIt->second->buffer, numBytes);
726  }
727  return bufferIt->second->buffer;
728  } else { // If wasn't in pool then we need to fetch it
729  sizedSegsLock.unlock();
730  AbstractBuffer* buffer =
731  createBuffer(key, pageSize_, numBytes); // createChunk pins for us
732  try {
733  parentMgr_->fetchBuffer(
734  key, buffer, numBytes); // this should put buffer in a BufferSegment
735  } catch (std::runtime_error& error) {
736  LOG(FATAL) << "Get chunk - Could not find chunk " << keyToString(key)
737  << " in buffer pool or parent buffer pools. Error was " << error.what();
738  }
739  return buffer;
740  }
741 }
742 
744  AbstractBuffer* destBuffer,
745  const size_t numBytes) {
746  std::unique_lock<std::mutex> lock(globalMutex_); // granular lock
747  std::unique_lock<std::mutex> sizedSegsLock(sizedSegsMutex_);
748  std::unique_lock<std::mutex> chunkIndexLock(chunkIndexMutex_);
749 
750  auto bufferIt = chunkIndex_.find(key);
751  bool foundBuffer = bufferIt != chunkIndex_.end();
752  chunkIndexLock.unlock();
753  AbstractBuffer* buffer;
754  if (!foundBuffer) {
755  sizedSegsLock.unlock();
756  CHECK(parentMgr_ != 0);
757  buffer = createBuffer(key, pageSize_, numBytes); // will pin buffer
758  try {
759  parentMgr_->fetchBuffer(key, buffer, numBytes);
760  } catch (std::runtime_error& error) {
761  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
762  }
763  } else {
764  buffer = bufferIt->second->buffer;
765  buffer->pin();
766  if (numBytes > buffer->size()) {
767  try {
768  parentMgr_->fetchBuffer(key, buffer, numBytes);
769  } catch (std::runtime_error& error) {
770  LOG(FATAL) << "Could not fetch parent buffer " << keyToString(key);
771  }
772  }
773  sizedSegsLock.unlock();
774  }
775  size_t chunkSize = numBytes == 0 ? buffer->size() : numBytes;
776  lock.unlock();
777  destBuffer->reserve(chunkSize);
778  if (buffer->isUpdated()) {
779  buffer->read(destBuffer->getMemoryPtr(),
780  chunkSize,
781  0,
782  destBuffer->getType(),
783  destBuffer->getDeviceId());
784  } else {
785  buffer->read(destBuffer->getMemoryPtr() + destBuffer->size(),
786  chunkSize - destBuffer->size(),
787  destBuffer->size(),
788  destBuffer->getType(),
789  destBuffer->getDeviceId());
790  }
791  destBuffer->setSize(chunkSize);
792  destBuffer->syncEncoder(buffer);
793  buffer->unPin();
794 }
795 
797  AbstractBuffer* srcBuffer,
798  const size_t numBytes) {
799  std::unique_lock<std::mutex> chunkIndexLock(chunkIndexMutex_);
800  auto bufferIt = chunkIndex_.find(key);
801  bool foundBuffer = bufferIt != chunkIndex_.end();
802  chunkIndexLock.unlock();
803  AbstractBuffer* buffer;
804  if (!foundBuffer) {
805  buffer = createBuffer(key, pageSize_);
806  } else {
807  buffer = bufferIt->second->buffer;
808  }
809  size_t oldBufferSize = buffer->size();
810  size_t newBufferSize = numBytes == 0 ? srcBuffer->size() : numBytes;
811  CHECK(!buffer->isDirty());
812 
813  if (srcBuffer->isUpdated()) {
814  //@todo use dirty flags to only flush pages of chunk that need to
815  // be flushed
816  buffer->write((int8_t*)srcBuffer->getMemoryPtr(),
817  newBufferSize,
818  0,
819  srcBuffer->getType(),
820  srcBuffer->getDeviceId());
821  } else if (srcBuffer->isAppended()) {
822  CHECK(oldBufferSize < newBufferSize);
823  buffer->append((int8_t*)srcBuffer->getMemoryPtr() + oldBufferSize,
824  newBufferSize - oldBufferSize,
825  srcBuffer->getType(),
826  srcBuffer->getDeviceId());
827  }
828  srcBuffer->clearDirtyBits();
829  buffer->syncEncoder(srcBuffer);
830  return buffer;
831 }
832 
834  std::lock_guard<std::mutex> lock(bufferIdMutex_);
835  return maxBufferId_++;
836 }
837 
839 AbstractBuffer* BufferMgr::alloc(const size_t numBytes) {
840  std::lock_guard<std::mutex> lock(globalMutex_);
841  ChunkKey chunkKey = {-1, getBufferId()};
842  return createBuffer(chunkKey, pageSize_, numBytes);
843 }
844 
846  std::lock_guard<std::mutex> lock(globalMutex_); // hack for now
847  Buffer* castedBuffer = dynamic_cast<Buffer*>(buffer);
848  if (castedBuffer == 0) {
849  LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type.";
850  }
851  deleteBuffer(castedBuffer->segIt_->chunkKey);
852 }
853 
855  std::lock_guard<std::mutex> chunkIndexLock(chunkIndexMutex_);
856  return chunkIndex_.size();
857 }
858 
859 size_t BufferMgr::size() {
860  return numPagesAllocated_;
861 }
862 
864  return maxBufferSize_;
865 }
866 
868  return maxSlabSize_;
869 }
870 
872  std::vector<std::pair<ChunkKey, ChunkMetadata>>& chunkMetadataVec) {
873  LOG(FATAL) << "getChunkMetadataVec not supported for BufferMgr.";
874 }
875 
877  std::vector<std::pair<ChunkKey, ChunkMetadata>>& chunkMetadataVec,
878  const ChunkKey& keyPrefix) {
879  LOG(FATAL) << "getChunkMetadataVecForPrefix not supported for BufferMgr.";
880 }
881 
882 const std::vector<BufferList>& BufferMgr::getSlabSegments() {
883  return slabSegments_;
884 }
885 } // namespace Buffer_Namespace
size_t getAllocated() override
Definition: BufferMgr.cpp:490
std::string printSeg(BufferList::iterator &segIt)
Definition: BufferMgr.cpp:519
~BufferMgr() override
Destructor.
Definition: BufferMgr.cpp:70
std::map< ChunkKey, BufferList::iterator > chunkIndex_
Definition: BufferMgr.h:199
size_t getMaxSize() override
Definition: BufferMgr.cpp:485
AbstractBufferMgr * parentMgr_
Definition: BufferMgr.h:208
virtual void write(int8_t *src, const size_t numBytes, const size_t offset=0, const MemoryLevel srcBufferType=CPU_LEVEL, const int srcDeviceId=-1)=0
size_t getNumChunks() override
Definition: BufferMgr.cpp:854
BufferList::iterator findFreeBuffer(size_t numBytes)
Definition: BufferMgr.cpp:264
std::string printSlab(size_t slabNum)
Definition: BufferMgr.cpp:416
#define LOG(tag)
Definition: Logger.h:182
BufferList unsizedSegs_
Maps sizes of free memory areas to host buffer pool memory addresses.
Definition: BufferMgr.h:216
virtual size_t size() const =0
virtual int8_t * getMemoryPtr()=0
virtual MemoryLevel getType() const =0
void removeSegment(BufferList::iterator &segIt)
Definition: BufferMgr.cpp:633
size_t size()
Returns the total number of bytes allocated.
Definition: BufferMgr.cpp:859
void getChunkMetadataVec(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunkMetadataVec) override
Definition: BufferMgr.cpp:871
virtual void reserve(size_t numBytes)=0
AbstractBuffer * alloc(const size_t numBytes=0) override
client is responsible for deleting memory allocated for b->mem_
Definition: BufferMgr.cpp:839
virtual void addSlab(const size_t slabSize)=0
bool isAllocationCapped() override
Definition: BufferMgr.cpp:495
BufferList::iterator segIt_
Definition: Buffer.h:166
virtual void append(int8_t *src, const size_t numBytes, const MemoryLevel srcBufferType=CPU_LEVEL, const int deviceId=-1)=0
virtual void allocateBuffer(BufferList::iterator segIt, const size_t pageSize, const size_t numBytes)=0
BufferList::iterator findFreeBufferInSlab(const size_t slabNum, const size_t numPagesRequested)
Definition: BufferMgr.cpp:236
AbstractBuffer * putBuffer(const ChunkKey &key, AbstractBuffer *d, const size_t numBytes=0) override
Definition: BufferMgr.cpp:796
std::string printSlabs() override
Definition: BufferMgr.cpp:447
An AbstractBuffer is a unit of data management for a data manager.
bool isBufferOnDevice(const ChunkKey &key) override
Puts the contents of d into the Buffer with ChunkKey key.
Definition: BufferMgr.cpp:575
void getChunkMetadataVecForKeyPrefix(std::vector< std::pair< ChunkKey, ChunkMetadata >> &chunkMetadataVec, const ChunkKey &keyPrefix) override
Definition: BufferMgr.cpp:876
void free(AbstractBuffer *buffer) override
Definition: BufferMgr.cpp:845
void checkpoint() override
Definition: BufferMgr.cpp:664
size_t maxNumPages_
max number of bytes allocated for the buffer pool
Definition: BufferMgr.h:201
virtual void read(int8_t *const dst, const size_t numBytes, const size_t offset=0, const MemoryLevel dstBufferType=CPU_LEVEL, const int dstDeviceId=-1)=0
const std::vector< BufferList > & getSlabSegments()
Definition: BufferMgr.cpp:882
virtual int getDeviceId() const
void setSize(const size_t size)
AbstractBuffer * createBuffer(const ChunkKey &key, const size_t pageSize=0, const size_t initialSize=0) override
Creates a chunk with the specified key and page size.
Definition: BufferMgr.cpp:97
size_t getInUseSize() override
Definition: BufferMgr.cpp:504
AbstractBuffer * getBuffer(const ChunkKey &key, const size_t numBytes=0) override
Returns the a pointer to the chunk with the specified key.
Definition: BufferMgr.cpp:710
#define CHECK(condition)
Definition: Logger.h:187
virtual bool isAppended() const
std::vector< int > ChunkKey
Definition: types.h:35
virtual bool isDirty() const
void fetchBuffer(const ChunkKey &key, AbstractBuffer *destBuffer, const size_t numBytes=0) override
Definition: BufferMgr.cpp:743
static TimeT::rep execution(F func, Args &&... args)
Definition: sample.cpp:29
void deleteBuffersWithPrefix(const ChunkKey &keyPrefix, const bool purge=true) override
Definition: BufferMgr.cpp:604
void clearSlabs() override
Definition: BufferMgr.cpp:460
BufferList::iterator reserveBuffer(BufferList::iterator &segIt, const size_t numBytes)
Definition: BufferMgr.cpp:179
void syncEncoder(const AbstractBuffer *srcBuffer)
std::string keyToString(const ChunkKey &key)
Definition: BufferMgr.cpp:35
std::vector< int8_t * > slabs_
Definition: BufferMgr.h:176
virtual bool isUpdated() const
void deleteBuffer(const ChunkKey &key, const bool purge=true) override
Deletes the chunk with the specified key.
Definition: BufferMgr.cpp:585
#define VLOG(n)
Definition: Logger.h:277
std::vector< BufferList > slabSegments_
Definition: BufferMgr.h:178
Note(s): Forbid Copying Idiom 4.1.
Definition: Buffer.h:45
BufferList::iterator evict(BufferList::iterator &evictStart, const size_t numPagesRequested, const int slabNum)
Definition: BufferMgr.cpp:139
virtual void freeAllMem()=0